Skip to main content
fi-fhir docs

Overview

Event Sourcing / CQRS Patterns

Design document for implementing event sourcing and CQRS (Command Query Responsibility Segregation) patterns in fi-fhir.

Overview

Event sourcing stores the full history of changes as a sequence of events rather than just current state. Combined with CQRS, this enables:

  • Complete audit trail - Every change is recorded (critical for healthcare compliance)
  • Temporal queries - Query state at any point in time ("What was the patient's diagnosis list on 2024-01-15?")
  • Event replay - Rebuild projections from events, fix bugs in projection logic
  • Decoupled read models - Multiple optimized views from the same event stream

Architecture

                    Commands                      Queries
                        │                            │
                        ▼                            ▼
              ┌─────────────────┐          ┌─────────────────┐
              │  Command Handler │          │   Query Handler  │
              └────────┬────────┘          └────────┬────────┘
                       │                            │
                       ▼                            │
              ┌─────────────────┐                   │
              │   Event Store   │                   │
              │  (append-only)  │                   │
              └────────┬────────┘                   │
                       │                            │
                       ▼                            │
              ┌─────────────────┐          ┌───────▼─────────┐
              │   Projector     │─────────▶│    Read Model   │
              │  (event handler)│          │   (optimized)   │
              └─────────────────┘          └─────────────────┘

Event Store Interface

// EventStore is the core event sourcing interface for append-only event storage.
type EventStore interface {
    // Append adds events to a stream. Returns the new stream version.
    // Uses optimistic concurrency: fails if expectedVersion doesn't match.
    Append(ctx context.Context, streamID string, expectedVersion int64, events []StoredEvent) (int64, error)

    // ReadStream reads events from a stream starting at fromVersion.
    ReadStream(ctx context.Context, streamID string, fromVersion int64, maxCount int) ([]StoredEvent, error)

    // ReadAll reads events across all streams for projections.
    ReadAll(ctx context.Context, fromPosition int64, maxCount int) ([]StoredEvent, error)

    // Subscribe returns a channel of new events for real-time projections.
    Subscribe(ctx context.Context, fromPosition int64) (<-chan StoredEvent, error)

    // GetStreamVersion returns the current version of a stream (-1 if not exists).
    GetStreamVersion(ctx context.Context, streamID string) (int64, error)
}

// StoredEvent represents an event as stored in the event store.
type StoredEvent struct {
    // Position is the global ordering position across all streams
    Position int64

    // StreamID identifies the aggregate/entity this event belongs to
    StreamID string

    // StreamVersion is the version within the stream (starts at 0)
    StreamVersion int64

    // EventType is the event type (e.g., "patient_admit", "lab_result")
    EventType string

    // Data is the serialized event payload
    Data []byte

    // Metadata contains correlation IDs, user info, etc.
    Metadata map[string]string

    // Timestamp when the event was stored
    Timestamp time.Time
}

Stream Naming Conventions

Healthcare events naturally map to streams:

Stream PatternDescriptionExample
patient:{mrn}Patient-centric eventspatient:MRN001
encounter:{id}Encounter-specific eventsencounter:ENC-2024-001
claim:{id}Claim lifecycle eventsclaim:CLM-837-001
source:{id}Events from a specific sourcesource:epic_adt

Projection Framework

// Projection builds a read model from events.
type Projection interface {
    // Name returns the projection name (used for checkpointing).
    Name() string

    // Handle processes an event and updates the read model.
    Handle(ctx context.Context, event StoredEvent) error

    // GetCheckpoint returns the last processed position.
    GetCheckpoint(ctx context.Context) (int64, error)

    // SetCheckpoint saves the last processed position.
    SetCheckpoint(ctx context.Context, position int64) error
}

// ProjectionRunner manages projection lifecycle.
type ProjectionRunner struct {
    store       EventStore
    projections []Projection
    batchSize   int
    pollInterval time.Duration
}

// Run starts the projection runner (blocking).
func (r *ProjectionRunner) Run(ctx context.Context) error

Example Projections

1. Patient Timeline Projection

Builds a chronological view of all events for a patient:

type PatientTimeline struct {
    MRN    string
    Events []TimelineEvent
}

type TimelineEvent struct {
    Timestamp   time.Time
    EventType   string
    Summary     string
    SourceEvent StoredEvent
}

type PatientTimelineProjection struct {
    db *sql.DB
}

func (p *PatientTimelineProjection) Handle(ctx context.Context, event StoredEvent) error {
    // Extract patient MRN from event
    mrn := extractMRN(event)
    if mrn == "" {
        return nil // Skip non-patient events
    }

    // Build timeline entry
    summary := buildSummary(event)

    // Upsert into read model
    _, err := p.db.ExecContext(ctx, `
        INSERT INTO patient_timelines (mrn, position, timestamp, event_type, summary, event_data)
        VALUES ($1, $2, $3, $4, $5, $6)
        ON CONFLICT (mrn, position) DO NOTHING
    `, mrn, event.Position, event.Timestamp, event.EventType, summary, event.Data)

    return err
}

2. Event Statistics Projection

Aggregates event counts by type, source, and time window:

type EventStatisticsProjection struct {
    counts map[string]map[string]int64 // [eventType][source] -> count
    mu     sync.RWMutex
}

func (p *EventStatisticsProjection) Handle(ctx context.Context, event StoredEvent) error {
    p.mu.Lock()
    defer p.mu.Unlock()

    source := event.Metadata["source"]
    if p.counts[event.EventType] == nil {
        p.counts[event.EventType] = make(map[string]int64)
    }
    p.counts[event.EventType][source]++

    return nil
}

func (p *EventStatisticsProjection) GetStats() map[string]map[string]int64 {
    p.mu.RLock()
    defer p.mu.RUnlock()

    // Return deep copy
    result := make(map[string]map[string]int64)
    for eventType, sources := range p.counts {
        result[eventType] = make(map[string]int64)
        for source, count := range sources {
            result[eventType][source] = count
        }
    }
    return result
}

3. Active Encounters Projection

Maintains a list of currently active encounters:

type ActiveEncountersProjection struct {
    encounters map[string]*ActiveEncounter // encounterID -> encounter
    mu         sync.RWMutex
}

type ActiveEncounter struct {
    ID        string
    PatientMRN string
    Class     string
    Location  string
    AdmitTime time.Time
    Provider  string
}

func (p *ActiveEncountersProjection) Handle(ctx context.Context, event StoredEvent) error {
    p.mu.Lock()
    defer p.mu.Unlock()

    switch event.EventType {
    case "patient_admit":
        enc := parseAdmitEvent(event)
        p.encounters[enc.ID] = enc

    case "patient_discharge":
        encID := parseDischargeEvent(event)
        delete(p.encounters, encID)

    case "patient_transfer":
        encID, newLocation := parseTransferEvent(event)
        if enc, ok := p.encounters[encID]; ok {
            enc.Location = newLocation
        }
    }

    return nil
}

Snapshot Support

For projections with large state, snapshots avoid replaying all events:

// SnapshotStore manages projection snapshots.
type SnapshotStore interface {
    // Save stores a snapshot of projection state.
    Save(ctx context.Context, projectionName string, position int64, state []byte) error

    // Load retrieves the latest snapshot.
    Load(ctx context.Context, projectionName string) (position int64, state []byte, err error)
}

// SnapshotProjection extends Projection with snapshot support.
type SnapshotProjection interface {
    Projection

    // Serialize returns the current state as bytes.
    Serialize() ([]byte, error)

    // Deserialize restores state from bytes.
    Deserialize(data []byte) error

    // ShouldSnapshot returns true if a snapshot should be taken.
    ShouldSnapshot(eventsProcessed int64) bool
}

PostgreSQL Event Store Implementation

-- Event store schema
CREATE TABLE events (
    position BIGSERIAL PRIMARY KEY,
    stream_id TEXT NOT NULL,
    stream_version BIGINT NOT NULL,
    event_type TEXT NOT NULL,
    data JSONB NOT NULL,
    metadata JSONB DEFAULT '{}',
    timestamp TIMESTAMPTZ DEFAULT NOW(),
    UNIQUE (stream_id, stream_version)
);

CREATE INDEX idx_events_stream ON events (stream_id, stream_version);
CREATE INDEX idx_events_type ON events (event_type);
CREATE INDEX idx_events_timestamp ON events (timestamp);

-- Projection checkpoints
CREATE TABLE projection_checkpoints (
    projection_name TEXT PRIMARY KEY,
    position BIGINT NOT NULL,
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Snapshots
CREATE TABLE projection_snapshots (
    projection_name TEXT PRIMARY KEY,
    position BIGINT NOT NULL,
    state BYTEA NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

Integration with Existing fi-fhir

Workflow Integration

Events from the workflow engine can be stored in the event store:

# workflow.yaml
routes:
  - name: store_all_events
    filter:
      types: ['*']
    actions:
      - type: event_store
        config:
          stream_template: 'patient:{{.Patient.MRN}}'

GraphQL Integration

The GraphQL API can query projections:

type Query {
  # Existing queries...

  # Event sourcing queries
  patientTimeline(mrn: String!, from: DateTime, to: DateTime): [TimelineEvent!]!
  eventStatistics(eventTypes: [EventType!], sources: [String!]): EventStats!
  activeEncounters(location: String): [ActiveEncounter!]!

  # Temporal queries
  patientStateAt(mrn: String!, timestamp: DateTime!): PatientState
}

type TimelineEvent {
  timestamp: DateTime!
  eventType: EventType!
  summary: String!
  rawEvent: Event
}

type EventStats {
  byType: [TypeCount!]!
  bySource: [SourceCount!]!
  total: Int!
}

CLI Commands

# Event store management
fi-fhir eventstore init          # Initialize database schema
fi-fhir eventstore stats         # Show event store statistics
fi-fhir eventstore streams       # List streams
fi-fhir eventstore read <stream> # Read events from a stream

# Projection management
fi-fhir projection list          # List registered projections
fi-fhir projection status        # Show projection positions
fi-fhir projection rebuild <name> # Rebuild projection from scratch
fi-fhir projection run           # Run projection runner

Implementation Plan

Phase 1: Core Event Store ✅

  • EventStore interface in pkg/eventsourcing/store.go
  • StoredEvent type with serialization
  • In-memory implementation for testing (pkg/eventsourcing/memory_store.go)
  • PostgreSQL implementation (pkg/eventsourcing/postgres_store.go)

Phase 2: Projection Framework ✅

  • Projection interface (pkg/eventsourcing/projection.go)
  • ProjectionRunner with checkpointing
  • Snapshot support (pkg/eventsourcing/snapshot.go)
  • Concurrent projection execution

Phase 3: Built-in Projections ✅

  • Patient timeline projection (pkg/eventsourcing/projections/patient_timeline.go)
  • Event statistics projection (pkg/eventsourcing/projections/event_statistics.go)
  • Active encounters projection (pkg/eventsourcing/projections/active_encounters.go)

Phase 4: Integration ✅

  • Workflow action for event store (internal/workflow/event_store.go)
  • GraphQL queries for projections (internal/api/graphql/schema.graphql)
  • CLI commands (cmd/fi-fhir/eventstore.go)

Healthcare Compliance Considerations

  • Immutability: Events are never modified or deleted (append-only)
  • Audit Trail: Complete history of all data changes
  • Temporal Queries: Query state at any point in time for audits
  • Data Retention: Configurable retention policies per stream type
  • Access Logging: All reads can be logged for HIPAA compliance

See Also

References