Skip to main content
fi-fhir docs

Overview

Workflow DSL Planning

This document describes the Workflow DSL for event routing, transformation, and action execution.

Overview

The Workflow DSL provides a declarative way to define event processing pipelines:

Input Formats          Semantic Layer           Workflow Engine         Output/Actions
─────────────         ────────────────         ───────────────         ──────────────
HL7v2    ──┐          ┌─────────────┐          ┌─────────────┐          ┌─> FHIR API
Flatfile ──┼──────────┤ Canonical   ├──────────┤  Workflow   ├──────────┼─> REST Webhook
CSV      ──┤          │ Event Model │          │  Execution  │          ├─> Database
EDI X12  ──┘          └─────────────┘          └─────────────┘          └─> Message Queue

YAML Configuration

Basic Structure

workflow:
  name: patient_adt_routing
  version: '1.0'

  routes:
    - name: patient_admits_to_fhir
      filter:
        event_type: patient_admit
        source: ['epic_adt', 'cerner_adt']
      transform:
        - set_field: patient.status = "active"
        - map_terminology: patient.race
      actions:
        - type: fhir
          endpoint: https://fhir.example.com/r4
          resource: Patient

    - name: critical_labs_alert
      filter:
        event_type: lab_result
        condition: result.interpretation == "critical"
      actions:
        - type: webhook
          url: https://alerts.example.com/critical
          method: POST
        - type: log
          level: warn
          message: 'Critical lab result: {{.Test.Code}} = {{.Result.Value}}'

Filter Expressions

Filters determine which events a route handles:

Filter TypeStatusDescription
event_type✅ ImplementedMatch by event type(s)
source✅ ImplementedMatch by source system(s)
condition✅ ImplementedCEL expressions for complex conditions
filter:
  # Match by event type(s)
  event_type: patient_admit
  event_type: [patient_admit, patient_update, patient_discharge]

  # Match by source system
  source: epic_adt
  source: [epic_adt, cerner_adt]

  # CEL expressions for complex conditions
  condition: event.patient.age >= 65
  condition: event.result.interpretation in ["H", "HH", "critical"]
  condition: has(event.patient.identifiers.mrn) && event.source == "lab_system"

CEL Quick Reference

CEL (Common Expression Language) provides safe, sandboxed expression evaluation. The event variable contains the full event data.

Expression TypeExampleDescription
Equalityevent.type == "patient_admit"Exact string match
String methodsevent.patient.mrn.startsWith("TEST")String prefix check
Comparisonevent.encounter.length > 3Numeric comparison
Membershipevent.source in ["epic", "cerner"]Check if value in list
Logical ANDevent.type == "lab_result" && event.source == "lab_system"Both conditions must match
Logical ORevent.source == "epic" || event.source == "cerner"Either condition matches
Nested accessevent.patient.identifiers.mrnDot notation for nested fields
Has checkhas(event.patient.ssn)Check if field exists
Negation!(event.source == "test")Negate a condition

Implementation: internal/workflow/cel.go - compiled expressions are cached for performance.

Transform Operations

Transforms modify events before action execution. The original event is not modified; a copy is returned.

Transform TypeStatusDescription
set_field✅ ImplementedSet field values with path notation
map_terminology✅ ImplementedMap codes between terminology systems
redact✅ ImplementedRemove sensitive fields

Transforms are applied sequentially, and each receives the output of the previous transform:

transform:
  # Set a field value
  - set_field: patient.status = "active"

  # Map using terminology service
  - map_terminology:
      field: test.code
      from: LOCAL
      to: LOINC

  # Enrich with external lookup
  - enrich:
      field: patient.insurance
      source: eligibility_api
      key: patient.mrn

  # Remove sensitive data
  - redact:
      fields: [patient.ssn, patient.dob]

  # Format transformation
  - format:
      field: patient.phone
      pattern: '+1 (###) ###-####'

Action Types

Action TypeStatusDescription
log✅ ImplementedLog events with Go template messages
webhook✅ ImplementedPOST to REST endpoints with auth
fhir✅ ImplementedPOST to FHIR R4 servers with US Core mapping
email✅ ImplementedSend email notifications (SMTP)
file✅ ImplementedWrite events to local files (JSON/NDJSON)
exec✅ ImplementedRun allowlisted commands (custom integrations)
database✅ ImplementedInsert/upsert to PostgreSQL/MySQL/SQLite
queue✅ ImplementedPublish to Kafka/RabbitMQ/NATS/SQS
event_store✅ ImplementedAppend to event sourcing store

FHIR Action

Send to FHIR server:

- type: fhir
  endpoint: https://fhir.example.com/r4
  operation: create # create, update, upsert
  bundle: 'true' # Send as transaction bundle
  timeout: 30s
  # Static token auth
  token: ${FHIR_TOKEN}
  # OR OAuth2 client credentials (takes priority over token)
  token_url: https://auth.example.com/token
  client_id: ${FHIR_CLIENT_ID}
  client_secret: ${FHIR_CLIENT_SECRET}
  scopes: patient/*.read system/*.write

Config Options:

OptionRequiredDescription
endpointYesFHIR server base URL
operationNocreate (default), update, or upsert
bundleNo"true" to send as transaction bundle
timeoutNoRequest timeout (default: 30s)
tokenNoStatic Bearer token
token_urlNoOAuth2 token endpoint (enables OAuth)
client_idFor OAuthOAuth2 client ID
client_secretFor OAuthOAuth2 client secret
scopesNoOAuth2 scopes (space or comma separated)
authorizationNoCustom Authorization header (highest priority)

OAuth2 Token Refresh:

  • Tokens are cached and reused until 60 seconds before expiry
  • If a 401 Unauthorized is received, the cached token is invalidated and a fresh token is fetched
  • Token fetch requests use retry with exponential backoff (3 attempts)
  • Supports proactive refresh (fetches new token before expiry)

Implementation: internal/workflow/actions.go (fhirAction), internal/workflow/oauth.go (OAuth2)

Email Action

Send email notifications via SMTP:

- type: email
  smtp_host: smtp.example.com
  smtp_port: 587
  starttls: 'true'
  # username/password are optional (PLAIN auth)
  username: ${SMTP_USER}
  password: ${SMTP_PASS}
  from: [email protected]
  to: [email protected], [email protected]
  subject: 'fi-fhir {{.Type}} {{.Patient.MRN}}'
  body: |
    Event={{.Type}}
    Source={{.Source}}
    MRN={{.Patient.MRN}}
  timeout: 30s

Config Options:

OptionRequiredDescription
smtp_hostYesSMTP hostname
smtp_portYesSMTP port
starttlsNo"true" to use STARTTLS
tls_insecureNo"true" to skip TLS verification (avoid in prod)
usernameNoSMTP username (PLAIN)
passwordWith usernameSMTP password
fromYesFrom address (supports templates)
toYesComma-separated recipients (supports templates)
subjectYesSubject (supports templates)
bodyNoBody (supports templates)
content_typeNoMIME type (default text/plain; charset=utf-8)
timeoutNoDial/send timeout (default 30s)

Implementation: internal/workflow/actions.go (emailAction)

Exec Action

Run an allowlisted external command (no shell). Useful for tightly controlled custom integrations.

- type: exec
  command: /opt/fi-fhir/bin/handler
  allowlist: /opt/fi-fhir/bin/handler,/opt/fi-fhir/bin/other_handler
  args: '["--mode","notify"]'
  stdin: json # json (default), none, template
  timeout: 30s
  env_ENV: prod

Config Options:

OptionRequiredDescription
commandYesAbsolute path to executable
allowlistYesComma-separated absolute paths allowed to run
argsNoJSON array (["a","b"]) or whitespace string
stdinNojson (default), none, or template
stdin_templateWith stdin=templateTemplate for stdin
timeoutNoExecution timeout (default 30s)
env_*NoEnvironment variables passed to the process

Implementation: internal/workflow/actions.go (execAction)

File Action

Write event payloads to disk:

- type: file
  base_dir: /var/lib/fi-fhir/out
  path: '{{.Type}}/{{.Patient.MRN}}.json'
  format: pretty # json, pretty, ndjson
  perm: '0600'

Config Options:

OptionRequiredDescription
pathYesOutput path (supports Go templates)
base_dirNoIf set, output is forced under this directory
formatNojson (default), pretty, or ndjson
permNoFile permissions (octal string, default 0600)

Implementation: internal/workflow/actions.go (fileAction)

Webhook Action

HTTP callback:

- type: webhook
  url: https://api.example.com/events/{{.type}} # Supports templates
  method: POST
  timeout: 30s
  user_agent: fi-fhir/1.0
  token: ${WEBHOOK_TOKEN} # Bearer token
  # OR custom authorization header
  authorization: Basic ${BASIC_AUTH}

Config Options:

OptionRequiredDescription
urlYesWebhook URL (supports Go templates)
methodNoHTTP method (default: POST)
timeoutNoRequest timeout (default: 30s)
tokenNoBearer token
authorizationNoCustom Authorization header
user_agentNoCustom User-Agent header

Implementation: internal/workflow/actions.go (webhookAction)

Database Action

Store to database:

- type: database
  connection: ${DATABASE_URL}
  table: events
  operation: insert # insert, upsert
  conflict_on: event_id # For upsert: conflict columns
  mapping_event_id: id # Column mappings use mapping_<column> prefix
  mapping_event_type: type
  mapping_patient_mrn: patient.mrn # Dot notation for nested fields
  mapping_payload: __raw__ # Special value for full JSON

Config Options:

OptionRequiredDescription
connectionYesDatabase DSN (postgres://, mysql://, sqlite://)
tableYesTarget table name
operationNoinsert (default) or upsert
conflict_onFor upsertComma-separated conflict columns
mapping_<col>Yes (1+)Map column to event field path

Implementation: internal/workflow/database.go - Uses Go's database/sql interface. Users must register their own drivers.

Queue Action

Publish to message queue:

- type: queue
  driver: kafka # kafka, rabbitmq, nats, sqs, log
  topic: healthcare.events.{{.type}} # Supports Go templates
  key: patient.mrn # Message key for partitioning
  header_source: fi-fhir # Headers use header_<name> prefix
  header_env: production
  brokers: localhost:9092 # Driver-specific config passed through

Config Options:

OptionRequiredDescription
driverYesQueue driver name (kafka, rabbitmq, nats, sqs, log)
topicYesTopic/queue name (supports Go templates)
keyNoEvent field path for message key
header_<name>NoStatic headers to add to messages
OtherNoDriver-specific options passed to factory

Implementation: internal/workflow/queue.go - Uses driver registry pattern. Users register drivers via RegisterQueueDriver(). Built-in log driver for testing.

Log Action

Log for debugging/audit:

- type: log
  level: info # debug, info, warn, error
  message: 'Processed {{.type}} for {{.patient.mrn}}' # Go template

Config Options:

OptionRequiredDescription
levelNoLog level: debug, info (default), warn, error
messageNoMessage template (default: "Event processed")

Note: debug level includes full event JSON in output. warn and error write to stderr.

Implementation: internal/workflow/actions.go (logAction)

Event Store Action

Append to event sourcing store:

- type: event_store
  connection: ${DATABASE_URL} # PostgreSQL connection string
  table: events # Events table name (default: events)
  stream: patient:{{.patient.mrn}} # Stream ID template (required)
  event_type: { { .type } } # Event type override (uses event.type by default)
  metadata_source: adt_interface # Additional metadata (metadata_<key>)
  metadata_environment: production

Config Options:

OptionRequiredDescription
connectionYesPostgreSQL DSN (also accepts dsn or db)
tableNoEvents table name (default: events)
streamYesStream ID template (e.g., patient:{{.patient.mrn}})
event_typeNoEvent type override (default: uses event's type field)
metadata_<key>NoAdditional metadata to store with event

Key Features:

  • Append-only storage with automatic versioning
  • Optimistic concurrency (uses VersionAny for conflict-free appends)
  • Full event JSON payload preserved
  • Template support for stream IDs and event types
  • Metadata enrichment from config and event

Use Cases:

  • Capture all events for audit trail
  • Build CQRS read models via projections
  • Enable event replay for debugging/recovery
  • Feed downstream analytics pipelines

Implementation: internal/workflow/event_store.go

Retry Configuration

All HTTP-based actions (webhook, fhir) support retry with exponential backoff. Retry is enabled by default with sensible defaults:

- type: webhook
  url: https://api.example.com/events
  # Retry configuration (all optional)
  retry_max: 3 # Max retry attempts (0 to disable)
  retry_delay: 1s # Initial delay before first retry
  retry_max_delay: 30s # Maximum delay cap
  retry_multiplier: 2.0 # Exponential backoff multiplier
  retry_jitter: 0.1 # Jitter factor (0.0-1.0) to prevent thundering herd
  retry_on_status: '429,500,502,503,504' # Status codes that trigger retry

Retry Config Options:

OptionDefaultDescription
retry_max3Maximum retry attempts (0 disables retries)
retry_delay1sInitial delay before first retry
retry_max_delay30sMaximum delay cap for exponential backoff
retry_multiplier2.0Multiplier for exponential backoff
retry_jitter0.1Jitter factor (0.0-1.0) for delay randomization
retry_on_status429,500,502,503,504Comma-separated HTTP status codes to retry
retrytrueSet to "false" to disable retries entirely

Behavior:

  • Network errors (connection refused, timeout) are always retried
  • HTTP status codes in retry_on_status trigger retry
  • Non-retryable status codes (400, 401, 404, etc.) fail immediately
  • Delay grows exponentially: initial * multiplier^(attempt-1)
  • Jitter randomizes delay: delay * (1 ± jitter) to prevent synchronized retries
  • Context cancellation stops retry loop immediately

Example with aggressive retries:

- type: fhir
  endpoint: https://fhir.hospital.org/r4
  token: ${FHIR_TOKEN}
  retry_max: 5
  retry_delay: 500ms
  retry_max_delay: 10s
  retry_multiplier: 1.5
  retry_on_status: '408,429,500,502,503,504' # Include 408 Request Timeout

Example with retries disabled:

- type: webhook
  url: https://fast-api.example.com/events
  retry_max: 0 # or retry: "false"

Implementation: internal/workflow/retry.go

Circuit Breaker

HTTP-based actions (webhook, fhir) support the circuit breaker pattern to prevent cascading failures when external services are unavailable. The circuit breaker is disabled by default:

- type: fhir
  endpoint: https://fhir.hospital.org/r4
  token: ${FHIR_TOKEN}
  # Circuit breaker configuration (all optional)
  circuit_breaker: 'true' # Enable circuit breaker (default: disabled)
  circuit_failure_threshold: 5 # Failures before opening (default: 5)
  circuit_success_threshold: 2 # Successes in half-open before closing (default: 2)
  circuit_timeout: 30s # Time in open state before half-open (default: 30s)

Circuit Breaker Config Options:

OptionDefaultDescription
circuit_breakerdisabledSet to "true" to enable circuit breaker
circuit_failure_threshold5Consecutive failures before circuit opens
circuit_success_threshold2Consecutive successes in half-open before closing
circuit_timeout30sTime circuit stays open before transitioning to half-open

State Machine:

       failure >= threshold
           ┌─────┐
           │     ▼
┌──────────┴───┐   timeout   ┌────────────┐   success >= threshold
│   CLOSED     │────────────▶│  HALF-OPEN │───────────────────────┐
│ (normal)     │◀────────────│  (probing) │                       │
└──────────────┘   failure   └────────────┘                       │
       ▲                                                          │
       └──────────────────────────────────────────────────────────┘

Behavior:

  • Closed (normal): Requests pass through. Failures increment counter; successes reset it
  • Open (fast-fail): Requests immediately fail with circuit breaker is open error
  • Half-Open (probing): Limited requests allowed to test if service recovered

Key Features:

  • Per-endpoint circuit breakers via global registry
  • HTTP status codes 500, 502, 503, 504 count as failures
  • Network errors (connection refused, timeout) count as failures
  • Circuit breaker wraps retry logic (outermost layer) to prevent retry storms
  • Thread-safe with concurrent access support

Example with aggressive circuit breaker:

- type: webhook
  url: https://fragile-api.example.com/events
  circuit_breaker: 'true'
  circuit_failure_threshold: 3 # Open after just 3 failures
  circuit_success_threshold: 1 # Close after 1 success
  circuit_timeout: 10s # Try again after 10s
  retry_max: 2 # Fewer retries to fail faster

Example monitoring circuit state: The circuit breaker tracks statistics including total failures, successes, and rejected requests. These can be accessed programmatically via GetCircuitBreaker(endpoint).Stats().

Implementation: internal/workflow/circuit_breaker.go

Rate Limiting

HTTP-based actions (webhook, fhir) support rate limiting to prevent overwhelming external services during high-volume event streams. Rate limiting is disabled by default:

- type: webhook
  url: https://api.example.com/events
  # Rate limiting configuration (all optional)
  rate_limit: 'true' # Enable rate limiting (default: disabled)
  rate_limit_rate: 10 # Requests per second (default: 10)
  rate_limit_burst: 20 # Maximum burst size (default: 20)
  rate_limit_wait: 'true' # Wait when limited vs fail fast (default: true)

Rate Limit Config Options:

OptionDefaultDescription
rate_limitdisabledSet to "true" to enable rate limiting
rate_limit_rate10Requests per second
rate_limit_burst20Maximum burst capacity
rate_limit_waittrue"true" to wait for token, "false" to fail immediately

Token Bucket Algorithm: Rate limiting uses the token bucket algorithm:

  • Tokens are added at rate_limit_rate tokens per second
  • Bucket can hold up to rate_limit_burst tokens (also initial capacity)
  • Each request consumes one token
  • If no tokens available: wait for refill (rate_limit_wait: true) or fail with ErrRateLimited

Behavior:

  • Per-endpoint limiters: Each unique URL/endpoint gets its own rate limiter
  • Limiters are reused across requests to the same endpoint
  • Rate limiting is applied before circuit breaker and retry logic
  • With rate_limit_wait: true, requests queue up and wait their turn
  • With rate_limit_wait: false, requests fail immediately when rate limited

Example with conservative rate limiting:

- type: fhir
  endpoint: https://fhir.external-payer.com/r4
  token: ${PAYER_TOKEN}
  rate_limit: 'true'
  rate_limit_rate: 5 # Only 5 requests per second
  rate_limit_burst: 10 # Allow small bursts
  rate_limit_wait: 'true' # Wait rather than fail

Example with fail-fast rate limiting:

- type: webhook
  url: https://rate-limited-api.example.com/events
  rate_limit: 'true'
  rate_limit_rate: 100
  rate_limit_burst: 50
  rate_limit_wait: 'false' # Return error immediately if rate limited

Programmatic Access:

// Get rate limiter for an endpoint
limiter := workflow.GetRateLimiter("https://api.example.com")

// Check current token count
tokens := limiter.Tokens()

// Wait for a token (with context for cancellation)
err := limiter.Wait(ctx)

Implementation: internal/workflow/ratelimit.go

Dead Letter Queue

Failed events can be stored in a dead letter queue (DLQ) for later inspection and reprocessing. The DLQ captures failed events with metadata about why they failed.

Setup:

// Create workflow engine
engine, _ := workflow.NewEngine(workflowConfig)

// Configure DLQ with in-memory storage (for development/testing)
dlq := workflow.NewMemoryDLQ()
engine.SetDLQ(dlq)

// Or use file-based storage (for persistence)
dlq, _ := workflow.NewFileDLQ("/var/lib/fi-fhir/dlq")
engine.SetDLQ(dlq)

// Optional: Configure with callback for alerting
config := workflow.DLQConfig{
    MaxAttempts: 5,  // Stop retrying after 5 attempts
    OnDeadLetter: func(event *workflow.FailedEvent) {
        log.Printf("Event %s failed: %s", event.ID, event.Error)
        // Send alert to monitoring system
    },
}
engine.SetDLQ(dlq, config)

Failed Event Structure:

type FailedEvent struct {
    ID           string                 // Unique identifier
    Event        interface{}            // Original event data
    RouteName    string                 // Route that failed
    ActionType   string                 // Action type (fhir, webhook, etc.)
    Error        string                 // Error message
    ErrorType    string                 // Classified error type
    Attempts     int                    // Number of attempts
    FirstFailure time.Time              // When first failed
    LastFailure  time.Time              // Most recent failure
    Metadata     map[string]string      // Additional context
}

Error Types:

ErrorTypeDescription
circuit_openCircuit breaker is open
timeoutRequest timed out
connection_errorNetwork connection failed
auth_errorAuthentication/authorization failed
server_errorHTTP 5xx response
client_errorHTTP 4xx response
unknownUnclassified error

Reprocessing:

// Reprocess all events in DLQ (up to limit)
result := engine.ReprocessDLQ(100)
fmt.Printf("Processed: %d, Failed: %d, Skipped: %d\n",
    result.Processed, result.Failed, result.Skipped)

// Reprocess a specific event
result, err := engine.ReprocessDLQEvent("event-id-123")
if err != nil {
    log.Printf("Reprocess failed: %v", err)
}

// List failed events for inspection
events, _ := dlq.List(50)
for _, e := range events {
    fmt.Printf("Event %s failed %d times: %s\n", e.ID, e.Attempts, e.Error)
}

DLQ Backends:

BackendUse CasePersistence
MemoryDLQDevelopment, testingNone (lost on restart)
FileDLQSimple persistenceJSON files in directory
CustomProductionImplement DeadLetterQueue interface

Custom DLQ Implementation:

type DeadLetterQueue interface {
    Push(event *FailedEvent) error
    Pop() (*FailedEvent, error)
    Peek() (*FailedEvent, error)
    List(limit int) ([]*FailedEvent, error)
    Get(id string) (*FailedEvent, error)
    Remove(id string) error
    Len() int
    Clear() error
}

Implementation: internal/workflow/dlq.go, internal/workflow/engine.go

Metrics / Observability

The workflow engine provides pluggable metrics collection for monitoring and observability. Metrics are disabled by default (no-op) and can be enabled by setting a metrics backend.

Setup:

// Create workflow engine
engine, _ := workflow.NewEngine(workflowConfig)

// Use in-memory metrics for testing
metrics := workflow.NewInMemoryMetrics()
engine.SetMetrics(metrics)

// Or use global metrics for action-level instrumentation
workflow.SetGlobalMetrics(metrics)

Available Metrics:

MetricTypeLabelsDescription
EventProcessedCounter + HistogrameventType, source, successEvents processed with duration
EventRoutedCountereventType, routeNameEvents matched to routes
ActionExecutedCounter + HistogramactionType, routeName, successActions executed with duration
ActionRetriedCounteractionType, routeName, attemptRetry attempts per action
CircuitBreakerStateChangedCounterendpoint, fromState, toStateCircuit state transitions
CircuitBreakerRejectedCounterendpointRequests rejected by open circuit
RateLimitWaitedCounter + HistogramendpointRequests that waited for rate limit
RateLimitRejectedCounterendpointRequests rejected by rate limiter
DLQPushedCounterrouteName, actionType, errorTypeEvents pushed to DLQ
DLQPoppedCounterrouteName, successEvents reprocessed from DLQ
HTTPRequestCompletedCounter + Histogramendpoint, method, statusCodeHTTP requests with duration

Metrics Interface:

type Metrics interface {
    // Event processing
    EventProcessed(eventType, source string, success bool, duration time.Duration)
    EventRouted(eventType, routeName string)

    // Action execution
    ActionExecuted(actionType, routeName string, success bool, duration time.Duration)
    ActionRetried(actionType, routeName string, attempt int)

    // Circuit breaker
    CircuitBreakerStateChanged(endpoint string, fromState, toState CircuitState)
    CircuitBreakerRejected(endpoint string)

    // Rate limiting
    RateLimitWaited(endpoint string, waitDuration time.Duration)
    RateLimitRejected(endpoint string)

    // Dead letter queue
    DLQPushed(routeName, actionType, errorType string)
    DLQPopped(routeName string, success bool)
    DLQDepth() int64

    // HTTP requests (for webhook, FHIR actions)
    HTTPRequestCompleted(endpoint, method string, statusCode int, duration time.Duration)
}

Built-in Implementations:

ImplementationUse Case
NoOpMetricsDefault, discards all metrics
InMemoryMetricsTesting, inspectable counters
CustomProduction (Prometheus, StatsD, etc.)

Prometheus Adapter (Reference Implementation):

A ready-to-use Prometheus adapter is included:

import "gitlab.flexinfer.ai/libs/fi-fhir/internal/workflow"

// Create Prometheus metrics with defaults
promMetrics, err := workflow.NewPrometheusMetrics(nil)
if err != nil {
    log.Fatal(err)
}

// Configure workflow engine
engine.SetMetrics(promMetrics)
workflow.SetGlobalMetrics(promMetrics) // For action-level metrics

// Expose /metrics endpoint
http.Handle("/metrics", promMetrics.Handler())
http.ListenAndServe(":9090", nil)

Custom Configuration:

config := &workflow.PrometheusConfig{
    Namespace:   "myapp",           // Metric prefix (default: fi_fhir)
    Subsystem:   "events",          // Subsystem name (default: workflow)
    ConstLabels: prometheus.Labels{ // Labels added to all metrics
        "service": "healthcare-integration",
        "env":     "production",
    },
    DurationBuckets: []float64{     // Custom histogram buckets
        0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5,
    },
    Registry: customRegistry,        // Use custom registry (optional)
}

promMetrics, err := workflow.NewPrometheusMetrics(config)

Exported Metrics:

MetricTypeLabels
fi_fhir_workflow_events_processed_totalCounterevent_type, source, success
fi_fhir_workflow_events_processed_duration_secondsHistogramevent_type, source
fi_fhir_workflow_events_routed_totalCounterevent_type, route_name
fi_fhir_workflow_actions_executed_totalCounteraction_type, route_name, success
fi_fhir_workflow_actions_executed_duration_secondsHistogramaction_type, route_name
fi_fhir_workflow_action_retries_totalCounteraction_type, route_name
fi_fhir_workflow_circuit_breaker_state_changes_totalCounterendpoint, from_state, to_state
fi_fhir_workflow_circuit_breaker_rejections_totalCounterendpoint
fi_fhir_workflow_rate_limit_waits_totalCounterendpoint
fi_fhir_workflow_rate_limit_wait_duration_secondsHistogramendpoint
fi_fhir_workflow_rate_limit_rejections_totalCounterendpoint
fi_fhir_workflow_dlq_pushed_totalCounterroute_name, action_type, error_type
fi_fhir_workflow_dlq_popped_totalCounterroute_name, success
fi_fhir_workflow_dlq_depthGauge-
fi_fhir_workflow_http_requests_totalCounterendpoint, method, status_code
fi_fhir_workflow_http_requests_duration_secondsHistogramendpoint, method

Implementation: internal/workflow/metrics_prometheus.go

Inspecting Metrics (Testing):

metrics := workflow.NewInMemoryMetrics()
engine.SetMetrics(metrics)

// Process events...
engine.Process(event)

// Check metrics
snapshot := metrics.Snapshot()
fmt.Printf("Events processed: %d\n", snapshot.EventsProcessed)
fmt.Printf("Successful: %d, Failed: %d\n", snapshot.EventsSucceeded, snapshot.EventsFailed)
fmt.Printf("HTTP requests: %d\n", snapshot.HTTPRequests)

Implementation: internal/workflow/metrics.go

Distributed Tracing

The workflow engine supports distributed tracing via a pluggable tracer interface. Tracing is disabled by default (no-op) and can be enabled with an OpenTelemetry adapter or custom implementation.

Setup:

import "gitlab.flexinfer.ai/libs/fi-fhir/internal/workflow"

// Create workflow engine
engine, _ := workflow.NewEngine(workflowConfig)

// Create OpenTelemetry tracer
tracer, err := workflow.NewOTelTracer(&workflow.OTelConfig{
    ServiceName:    "my-healthcare-service",
    ServiceVersion: "1.0.0",
    Environment:    "production",
})
if err != nil {
    log.Fatal(err)
}
defer tracer.Shutdown(context.Background())

// Configure workflow engine
engine.SetTracer(tracer)

// Or set global tracer for action-level spans
workflow.SetGlobalTracer(tracer)

Span Hierarchy:

workflow.process (root span)
├── workflow.route (one per matched route)
│   ├── workflow.transform (one per transform)
│   ├── workflow.action (one per action)
│   │   └── http.request (for webhook/FHIR actions)

Span Names and Attributes:

Span NameAttributesDescription
workflow.processevent.type, event.sourceRoot span for event processing
workflow.routeroute.name, route.matchedRoute evaluation and execution
workflow.transformtransform.type, transform.indexTransform application
workflow.actionaction.type, route.name, action.successAction execution
http.requesthttp.method, http.url, http.status_codeHTTP client calls

Tracer Interface:

type Tracer interface {
    StartSpan(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
}

type Span interface {
    End()
    SetAttribute(key string, value interface{})
    SetStatus(code SpanStatus, message string)
    RecordError(err error)
    AddEvent(name string, attrs ...SpanAttribute)
}

Built-in Implementations:

ImplementationUse Case
NoOpTracerDefault, creates no-op spans
OTelTracerOpenTelemetry integration (OTLP, Jaeger, Zipkin, etc.)
CustomIntegrate with any tracing backend

OpenTelemetry Configuration:

import (
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

// Create OTLP exporter for Jaeger/Tempo/etc
exporter, err := otlptracegrpc.New(ctx,
    otlptracegrpc.WithEndpoint("localhost:4317"),
    otlptracegrpc.WithInsecure(),
)

config := &workflow.OTelConfig{
    ServiceName:    "healthcare-integration",
    ServiceVersion: "2.0.0",
    Environment:    "staging",
    Exporter:       exporter,
    Sampler:        sdktrace.AlwaysSample(), // or sdktrace.TraceIDRatioBased(0.1)
}

tracer, _ := workflow.NewOTelTracer(config)

Context Propagation:

// Process with existing trace context
ctx := extractTraceContext(incomingRequest) // e.g., from HTTP headers
result := engine.ProcessWithContext(ctx, event)

// Extract span from context (for adding attributes later)
span := workflow.SpanFromContext(ctx)
span.SetAttribute("custom.key", "value")

Context-Aware Actions:

// Custom action with tracing support
type MyAction struct{}

func (a *MyAction) ExecuteWithContext(ctx context.Context, event interface{}, config map[string]string) error {
    // Child spans will be linked to parent action span
    _, span := workflow.GetGlobalTracer().StartSpan(ctx, "my.operation")
    defer span.End()

    // ... operation logic
    return nil
}

// Register context-aware action
engine.RegisterAction("my_action", &MyAction{})

Implementation Files:

  • internal/workflow/tracing.go - Interface and no-op implementation
  • internal/workflow/tracing_otel.go - OpenTelemetry adapter

Go Implementation

Core Types

// Workflow represents a complete workflow configuration
type Workflow struct {
    Name    string   `yaml:"name"`
    Version string   `yaml:"version"`
    Routes  []Route  `yaml:"routes"`
}

// Route defines a single event processing route
type Route struct {
    Name       string      `yaml:"name"`
    Filter     Filter      `yaml:"filter"`
    Transforms []Transform `yaml:"transform"`
    Actions    []Action    `yaml:"actions"`
}

// Filter matches events for routing
type Filter struct {
    EventType  StringOrSlice `yaml:"event_type"`
    Source     StringOrSlice `yaml:"source"`
    Condition  string        `yaml:"condition"`  // CEL expression
}

// Transform modifies events
type Transform struct {
    SetField        string            `yaml:"set_field,omitempty"`
    MapTerminology  *TerminologyMap   `yaml:"map_terminology,omitempty"`
    Enrich          *EnrichConfig     `yaml:"enrich,omitempty"`
    Redact          *RedactConfig     `yaml:"redact,omitempty"`
}

// Action executes on matched events
type Action struct {
    Type     string                 `yaml:"type"`
    Config   map[string]interface{} `yaml:"-,inline"`
}

Engine Interface

// Engine processes events through workflow routes
type Engine struct {
    workflow     *Workflow
    actions      map[string]ActionHandler
    celEvaluator *CELEvaluator  // For condition expression evaluation
}

// CELEvaluator evaluates CEL expressions against events (internal/workflow/cel.go)
type CELEvaluator struct {
    env   *cel.Env
    cache map[string]cel.Program  // Compiled expressions cached for performance
    mu    sync.RWMutex
}

// ActionHandler executes a specific action type
type ActionHandler interface {
    Execute(event interface{}, config map[string]interface{}) error
}

// Process routes an event through the workflow
func (e *Engine) Process(event interface{}) []error {
    var errors []error

    for _, route := range e.workflow.Routes {
        if !e.matches(event, route.Filter) {
            continue
        }

        // Apply transforms
        transformed := event
        for _, t := range route.Transforms {
            transformed = e.transform(transformed, t)
        }

        // Execute actions
        for _, action := range route.Actions {
            handler := e.actions[action.Type]
            if err := handler.Execute(transformed, action.Config); err != nil {
                errors = append(errors, err)
            }
        }
    }

    return errors
}

Built-in Actions

Phase 1 (MVP) ✅

  • log - Structured logging - see internal/workflow/actions.go
  • webhook - HTTP callbacks - see internal/workflow/actions.go
  • fhir - FHIR R4 server integration - see internal/workflow/fhir.go

Phase 2 ✅

  • database - SQL/NoSQL storage - see internal/workflow/database.go
  • queue - Message queue publishing - see internal/workflow/queue.go

Phase 3 🔲

  • email - Email notifications (planned)
  • file - File output (planned)
  • custom - User-defined action plugins (planned)

Example Workflows

ADT to FHIR with OAuth

workflow:
  name: adt_to_fhir
  version: '1.0'

  routes:
    - name: admits
      filter:
        event_type: patient_admit
      transform:
        - map_terminology:
            field: patient.race
            from: LOCAL
            to: CDC
      actions:
        - type: fhir
          endpoint: https://fhir.hospital.org/r4
          operation: upsert
          token_url: https://auth.hospital.org/token
          client_id: ${FHIR_CLIENT_ID}
          client_secret: ${FHIR_CLIENT_SECRET}
          scopes: patient/*.write

Lab Results with Alerts

workflow:
  name: lab_processing
  version: '1.0'

  routes:
    - name: all_labs
      filter:
        event_type: lab_result
      actions:
        - type: fhir
          endpoint: https://fhir.hospital.org/r4
          token: ${FHIR_TOKEN}

    - name: critical_labs
      filter:
        event_type: lab_result
        condition: event.result.interpretation in ["critical", "panic"]
      actions:
        - type: webhook
          url: https://alerts.hospital.org/critical
          method: POST
          token: ${ALERT_TOKEN}
        - type: log
          level: warn
          message: 'CRITICAL LAB: {{.test.code}} = {{.result.value}}'

Multi-Source Aggregation to Kafka

workflow:
  name: patient_aggregation
  version: '1.0'

  routes:
    - name: epic_patients
      filter:
        source: epic_adt
      transform:
        - set_field: patient.source_system = "EPIC"
      actions:
        - type: queue
          driver: kafka
          topic: patients.unified
          key: patient.mrn
          brokers: kafka.hospital.org:9092

    - name: cerner_patients
      filter:
        source: cerner_adt
      transform:
        - set_field: patient.source_system = "CERNER"
      actions:
        - type: queue
          driver: kafka
          topic: patients.unified
          key: patient.mrn
          brokers: kafka.hospital.org:9092

Event Audit Log to Database

workflow:
  name: event_audit
  version: '1.0'

  routes:
    - name: audit_all_events
      filter:
        event_type: [patient_admit, patient_discharge, lab_result]
      actions:
        - type: database
          connection: ${DATABASE_URL}
          table: audit_log
          operation: insert
          mapping_event_id: id
          mapping_event_type: type
          mapping_source: source
          mapping_patient_mrn: patient.mrn
          mapping_timestamp: timestamp
          mapping_payload: __raw__

Eligibility Verification Processing

workflow:
  name: eligibility_processing
  version: '1.0'

  routes:
    # Track all eligibility inquiries
    - name: log_inquiries
      filter:
        event_type: eligibility_inquiry
      actions:
        - type: database
          connection: ${DATABASE_URL}
          table: eligibility_inquiries
          operation: insert
          mapping_id: id
          mapping_trace_number: trace_number
          mapping_subscriber_id: subscriber.identifiers.member_id
          mapping_payer_id: information_source.identifiers.payer_id
          mapping_service_types: inquiry.service_types
          mapping_timestamp: timestamp
          mapping_payload: __raw__

    # Handle successful eligibility responses
    - name: active_coverage
      filter:
        event_type: eligibility_response
        condition: event.status.eligible == true
      actions:
        - type: database
          connection: ${DATABASE_URL}
          table: eligibility_cache
          operation: upsert
          conflict_on: subscriber_id,payer_id
          mapping_subscriber_id: subscriber.identifiers.member_id
          mapping_payer_id: information_source.identifiers.payer_id
          mapping_status: status.status
          mapping_plan_name: status.plan_name
          mapping_plan_begin: plan_begin_date
          mapping_plan_end: plan_end_date
          mapping_benefits: __raw__
        - type: log
          level: info
          message: 'Eligibility confirmed: {{.subscriber.name.family}}, {{.subscriber.name.given}} - {{.status.plan_name}}'

    # Alert on eligibility rejections
    - name: eligibility_errors
      filter:
        event_type: eligibility_response
        condition: size(event.errors) > 0
      actions:
        - type: webhook
          url: https://notifications.hospital.org/eligibility-error
          method: POST
          token: ${NOTIFICATION_TOKEN}
        - type: log
          level: warn
          message: 'Eligibility rejected: {{.errors}}'

Claim Status Tracking (276/277)

name: claim_status_processing
version: '1.0'

routes:
  # Track all claim status requests
  - name: log_status_requests
    filter:
      event_type: claim_status_request
    actions:
      - type: database
        connection: ${DATABASE_URL}
        table: claim_status_requests
        operation: insert
        mapping_id: id
        mapping_trace_number: trace_number
        mapping_claim_id: inquiry.claim_submitter_id
        mapping_payer_claim_id: inquiry.payer_claim_id
        mapping_subscriber_id: subscriber.identifiers.member_id
        mapping_payer_id: payer.identifiers.payer_id
        mapping_timestamp: timestamp

  # Handle finalized claims (A2 status)
  - name: finalized_claims
    filter:
      event_type: claim_status_response
      condition: event.statuses.exists(s, s.status_category_code == "A2")
    actions:
      - type: database
        connection: ${DATABASE_URL}
        table: claim_status_cache
        operation: upsert
        conflict_on: claim_submitter_id,payer_id
        mapping_claim_id: claim_submitter_id
        mapping_payer_claim_id: payer_claim_id
        mapping_status: statuses[0].status_category_code
        mapping_status_description: statuses[0].status_category_description
        mapping_updated_at: timestamp
      - type: log
        level: info
        message: 'Claim finalized: {{.claim_submitter_id}} - {{.statuses[0].status_category_description}}'

  # Alert on denied/rejected claims (A5, A8 status)
  - name: denied_claims
    filter:
      event_type: claim_status_response
      condition: event.statuses.exists(s, s.status_category_code in ["A5", "A8"])
    actions:
      - type: webhook
        url: https://notifications.hospital.org/claim-denied
        method: POST
        token: ${NOTIFICATION_TOKEN}
      - type: log
        level: warn
        message: 'Claim denied/rejected: {{.claim_submitter_id}} - {{.statuses[0].status_code_description}}'

  # Track pending claims for follow-up (A1 status)
  - name: pending_claims
    filter:
      event_type: claim_status_response
      condition: event.statuses.exists(s, s.status_category_code == "A1")
    actions:
      - type: database
        connection: ${DATABASE_URL}
        table: pending_claims_followup
        operation: upsert
        conflict_on: claim_submitter_id
        mapping_claim_id: claim_submitter_id
        mapping_status: statuses[0].status_code
        mapping_next_check: __now_plus_24h__

CLI Integration

# Run workflow on parsed events
fi-fhir workflow run --config workflow.yaml events.json

# Validate workflow configuration
fi-fhir workflow validate workflow.yaml

# Dry-run to see what would happen
fi-fhir workflow dry-run --config workflow.yaml message.hl7

# Combined parse + workflow
fi-fhir parse -f hl7v2 message.hl7 | fi-fhir workflow run --config workflow.yaml -

# Record events for replay testing
fi-fhir workflow record -c workflow.yaml -o recordings.json events.json

# Replay recordings against modified workflow
fi-fhir workflow replay -c workflow_v2.yaml -d recordings.json

# Simulate without side effects
fi-fhir workflow simulate -c workflow.yaml -v events.json

CLI Testing Commands

CommandDescription
workflow recordProcess events and save results for later replay
workflow replayReplay recordings and compare with original results
workflow simulateProcess with mock actions (no HTTP, DB, queue calls)

Record → Replay Workflow:

# 1. Record production-like events with current workflow
fi-fhir workflow record -c workflow_v1.yaml -o baseline.json events.json

# 2. After making changes, replay to detect differences
fi-fhir workflow replay -c workflow_v2.yaml -d baseline.json

# 3. Use --output for CI integration
fi-fhir workflow replay -c workflow_v2.yaml -o results.json baseline.json
if [ $? -ne 0 ]; then
  echo "Workflow regression detected!"
  cat results.json
  exit 1
fi

Implementation Plan

Phase 1: Core Engine ✅

  • Define workflow YAML schema
  • Implement Workflow, Route, Filter types
  • Basic filter matching (event_type, source)
  • Log action (with template rendering)
  • Webhook action (with auth, timeout, templates)
  • Engine orchestration
  • DryRun mode for testing routes without execution

Phase 2: Transforms & FHIR ✅

  • CEL expression evaluation for conditions (internal/workflow/cel.go)
  • Transform pipeline (internal/workflow/transforms.go)
    • set_field - Set field values with path notation
    • map_terminology - Map codes between terminology systems
    • redact - Remove sensitive fields
  • FHIR action with US Core mapping (Patient, Encounter, Observation, DiagnosticReport)
  • FHIR transaction bundle support for multi-resource events
  • Bearer token authentication for FHIR
  • OAuth2 client credentials flow (internal/workflow/oauth.go)

Phase 3: Advanced Actions ✅

  • Database action (internal/workflow/database.go)
    • PostgreSQL, MySQL, SQLite support via database/sql
    • Insert and upsert operations
    • Field mapping with dot notation
    • __raw__ special value for full JSON payload
    • Connection pooling with caching
  • Queue action (internal/workflow/queue.go)
    • Driver registry pattern (bring your own Kafka/RabbitMQ/NATS/SQS)
    • Topic templates with Go text/template support
    • Message key for partitioning
    • Custom headers
    • Built-in log driver for testing/debugging
  • Retry/error handling with exponential backoff (internal/workflow/retry.go)
    • Configurable max retries, delays, multiplier, jitter
    • Customizable retryable status codes
    • Context-aware cancellation support
    • Applied to webhook and FHIR actions
  • Circuit breaker pattern (internal/workflow/circuit_breaker.go)
    • Three-state machine: Closed → Open → Half-Open
    • Per-endpoint breakers via global registry
    • Configurable failure/success thresholds and timeout
    • Applied to webhook and FHIR actions
  • Dead letter queue for failed events (internal/workflow/dlq.go)
    • MemoryDLQ and FileDLQ backends
    • Error classification by type
    • Reprocessing with attempt tracking
    • Callback notifications for alerting
  • Rate limiting for high-volume event streams (internal/workflow/ratelimit.go)
    • Token bucket algorithm implementation
    • Per-endpoint rate limiters via registry
    • Configurable rate, burst, and wait behavior
    • Applied to webhook and FHIR actions

Phase 4: CLI & Tooling ✅

  • workflow run command
  • workflow validate command
  • workflow dry-run command

Phase 5: Observability ✅

  • Metrics interface (internal/workflow/metrics.go)
    • Pluggable backend design (NoOp, InMemory, Custom)
    • Event processing metrics (processed, routed, success/failure)
    • Action execution metrics with timing
    • HTTP request metrics (endpoint, method, status code, duration)
    • Circuit breaker state change metrics
    • Rate limit wait/rejection metrics
    • DLQ push/pop metrics
  • Engine instrumentation
    • Automatic metrics collection in Process()
    • Action timing and success tracking
    • DLQ reprocessing metrics
  • Action instrumentation
    • HTTP metrics for webhook action
    • HTTP metrics for FHIR actions (single resource and bundle)
  • InMemoryMetrics for testing with snapshot support
  • Prometheus adapter (internal/workflow/metrics_prometheus.go)
    • Full Metrics interface implementation
    • Configurable namespace, subsystem, and const labels
    • Custom histogram buckets for duration metrics
    • HTTP handler for /metrics endpoint
    • Endpoint URL sanitization to prevent cardinality explosion
  • Distributed tracing (internal/workflow/tracing.go, tracing_otel.go)
    • Pluggable Tracer interface (NoOp, OTel, Custom)
    • OpenTelemetry adapter with configurable exporter and sampler
    • Span hierarchy: process → route → transform/action → http
    • Context propagation via ProcessWithContext()
    • ContextActionHandler interface for action-level spans
    • HTTP spans for webhook and FHIR actions
  • Grafana dashboard templates (dashboards/grafana/)
    • Workflow overview dashboard with all metric categories
    • Import instructions and provisioning examples
    • Sample alerting rules
  • Log correlation with trace IDs (internal/workflow/logging.go)
    • Structured logger interface with trace context extraction
    • JSON and text output formats
    • Automatic trace_id and span_id inclusion in logs
    • Engine integration with debug/info/error logging
    • Global and per-engine logger configuration
    • Helper functions for manual trace context extraction
  • Alerting rule templates (dashboards/alerting/)
    • Prometheus alerting rules (standalone and Kubernetes CRD)
    • 18 alerts across 5 categories (availability, latency, resilience, DLQ, HTTP)
    • Alertmanager routing examples
    • Customization guidance for different traffic patterns
  • Health check endpoints (internal/workflow/health.go)
    • Liveness probe (/health) for Kubernetes
    • Readiness probe (/ready) for Kubernetes
    • Pluggable HealthChecker interface
    • Built-in checkers: Engine, DLQ, HTTP, CircuitBreaker
    • Concurrent checks with timeout and caching
  • Configuration validation (internal/workflow/validate.go)
    • Workflow structure validation (name, routes, actions)
    • CEL expression syntax validation
    • Action-specific validation (webhook, FHIR, database, queue)
    • Transform validation (set_field, map_terminology, redact)
    • Error/warning/info severity levels with codes

Log Correlation

The workflow engine supports structured logging with automatic trace correlation. When configured with a logger and tracer, all log entries include trace_id and span_id for correlation with distributed traces.

Setup:

import "gitlab.flexinfer.ai/libs/fi-fhir/internal/workflow"

// Create workflow engine
engine, _ := workflow.NewEngine(workflowConfig)

// Create structured logger with trace correlation
logger := workflow.NewStructuredLogger(&workflow.LoggerConfig{
    Level:          workflow.LevelInfo,
    Format:         workflow.FormatJSON,
    Output:         os.Stdout,
    IncludeTraceID: true,
    IncludeSpanID:  true,
    ServiceName:    "my-healthcare-service",
})

// Configure workflow engine
engine.SetLogger(logger)

// Or set global logger for all engines
workflow.SetGlobalLogger(logger)

Logger Configuration:

OptionDefaultDescription
LevelLevelInfoMinimum log level (Debug, Info, Warn, Error)
FormatFormatTextOutput format (FormatText, FormatJSON)
Outputos.Stdoutio.Writer for log output
IncludeTraceIDtrueInclude trace_id from span context
IncludeSpanIDtrueInclude span_id from span context
ServiceName""Optional service name added to all entries

Log Output Formats:

JSON format (for log aggregators like Loki, ELK):

{
  "timestamp": "2024-01-15T10:30:00Z",
  "level": "INFO",
  "service": "healthcare",
  "trace_id": "abc123...",
  "span_id": "def456...",
  "msg": "event processed",
  "event_type": "patient_admit",
  "duration_ms": 45
}

Text format (human-readable):

[2024-01-15T10:30:00Z] INFO: event processed [trace_id=abc123... span_id=def456...] event_type=patient_admit duration_ms=45

Logger Interface:

type Logger interface {
    Debug(ctx context.Context, msg string, fields ...Field)
    Info(ctx context.Context, msg string, fields ...Field)
    Warn(ctx context.Context, msg string, fields ...Field)
    Error(ctx context.Context, msg string, fields ...Field)
    WithFields(fields ...Field) Logger
}

Creating Child Loggers:

// Create logger with base fields (added to all log entries)
routeLogger := logger.WithFields(
    workflow.F("route", "patient_admits"),
    workflow.F("source", "epic_adt"),
)

// All subsequent log calls include these fields
routeLogger.Info(ctx, "processing event", workflow.F("mrn", "12345"))

Manual Trace Context Extraction:

// Extract trace/span IDs for external use
traceID := workflow.TraceIDFromContext(ctx)
spanID := workflow.SpanIDFromContext(ctx)

// Get fields for adding to existing loggers
fields := workflow.TraceContextFields(ctx)
// Returns: []Field{{"trace_id", "abc..."}, {"span_id", "def..."}}

Built-in Implementations:

ImplementationUse Case
StructuredLoggerProduction logging with trace correlation
NoOpLoggerDefault, discards all logs
CustomIntegrate with existing logging infrastructure

Engine Logging: When a logger is configured, the engine automatically logs:

  • DEBUG: Event processing start with type and source
  • INFO: Event processing completion with duration and routes matched
  • ERROR: Action failures with error details and duration

Correlating Logs with Traces:

// Set up both tracer and logger
tracer, _ := workflow.NewOTelTracer(&workflow.OTelConfig{
    ServiceName: "healthcare-integration",
})
engine.SetTracer(tracer)

logger := workflow.NewStructuredLogger(&workflow.LoggerConfig{
    Level:          workflow.LevelDebug,
    Format:         workflow.FormatJSON,
    IncludeTraceID: true,
    IncludeSpanID:  true,
})
engine.SetLogger(logger)

// Process event - logs will include trace context
result := engine.ProcessWithContext(ctx, event)

// Sample log output:
// {"timestamp":"...","level":"INFO","trace_id":"abc123","span_id":"def456",
//  "msg":"event processed","event_type":"patient_admit","duration_ms":45}

Querying Correlated Logs (Grafana/Loki Example):

{service="healthcare-integration"} |= `trace_id=abc123`

Implementation: internal/workflow/logging.go, internal/workflow/logging_test.go

Alerting Rules

Pre-built Prometheus alerting rules are provided for monitoring workflow health:

Installation:

# Standalone Prometheus
cp dashboards/alerting/workflow-alerts.yaml /etc/prometheus/rules/

# Kubernetes with Prometheus Operator
kubectl apply -f dashboards/alerting/workflow-alerts-k8s.yaml -n monitoring

Alert Categories:

CategoryAlertsDescription
Availability3Error rates, action failures, no-event detection
Latency3p99 processing time for events, actions, HTTP
Resilience5Circuit breaker, retry rate, rate limiting
DLQ4Queue depth, push rate, reprocessing failures
HTTP35xx/4xx error rates, authentication failures

Key Alerts:

AlertSeverityThreshold
WorkflowHighErrorRateCritical> 5% error rate for 5 minutes
WorkflowCircuitBreakerOpenCriticalCircuit opens for any endpoint
WorkflowDLQCriticalCritical> 1000 events in DLQ
WorkflowHighEventLatencyWarningp99 > 5 seconds for 10 minutes
WorkflowHighRetryRateWarning> 30% actions retried

Alertmanager Routing Example:

route:
  receiver: 'default'
  routes:
    - match:
        service: fi-fhir
        severity: critical
      receiver: 'pagerduty'
    - match:
        service: fi-fhir
        severity: warning
      receiver: 'slack'

Customization:

Adjust thresholds for your environment:

# High-traffic: tighter thresholds, faster alerts
- alert: WorkflowHighErrorRate
  expr: |
    sum(rate(fi_fhir_workflow_events_processed_total{success="false"}[1m]))
    / sum(rate(fi_fhir_workflow_events_processed_total[1m]))
    > 0.01  # 1% instead of 5%
  for: 2m # 2 minutes instead of 5

# Low-traffic: longer windows to avoid false positives
- alert: WorkflowNoEventsProcessed
  expr: sum(rate(fi_fhir_workflow_events_processed_total[1h])) == 0
  for: 2h # 2 hours instead of 30 minutes

Implementation: dashboards/alerting/workflow-alerts.yaml, dashboards/alerting/workflow-alerts-k8s.yaml

Health Check Endpoints

The workflow package provides Kubernetes-compatible health check endpoints for liveness and readiness probes.

Setup:

import "gitlab.flexinfer.ai/libs/fi-fhir/internal/workflow"

// Create health service
health := workflow.NewHealthService(&workflow.HealthConfig{
    Version:        "1.0.0",
    LivenessPath:   "/health",
    ReadinessPath:  "/ready",
    Timeout:        5 * time.Second,
    IncludeDetails: true,
})

// Register liveness checks (lightweight - is the process alive?)
health.RegisterLivenessCheck("app", workflow.AlwaysHealthy())

// Register readiness checks (can the service handle traffic?)
health.RegisterReadinessCheck("engine", workflow.EngineHealthChecker(engine))
health.RegisterReadinessCheck("dlq", workflow.DLQHealthChecker(dlq, 100, 1000))
health.RegisterReadinessCheck("fhir", workflow.HTTPHealthChecker(fhirURL, 2*time.Second))
health.RegisterReadinessCheck("circuits", workflow.CircuitBreakerHealthChecker(
    "https://fhir.hospital.org",
    "https://api.payer.com",
))

// Mount on HTTP server
http.Handle("/", health.Handler())
http.ListenAndServe(":8080", nil)

Response Format:

{
  "status": "healthy",
  "version": "1.0.0",
  "components": [
    {
      "name": "engine",
      "status": "healthy",
      "message": "Engine operational",
      "checked_at": "..."
    },
    {
      "name": "dlq",
      "status": "healthy",
      "message": "DLQ depth: 5 events",
      "checked_at": "..."
    },
    {
      "name": "circuits",
      "status": "degraded",
      "message": "1 circuit breaker(s) open",
      "checked_at": "..."
    }
  ],
  "checked_at": "2024-01-15T10:30:00Z"
}

Health Status Levels:

StatusHTTP CodeDescription
healthy200All checks passing
degraded200Some issues but can handle traffic
unhealthy503Cannot handle traffic

Built-in Health Checkers:

CheckerDescription
AlwaysHealthy()Always returns healthy (basic liveness)
EngineHealthChecker(engine)Checks engine has workflow loaded
DLQHealthChecker(dlq, warn, crit)Checks DLQ depth against thresholds
HTTPHealthChecker(url, timeout)Checks HTTP endpoint reachability
CircuitBreakerHealthChecker(endpoints...)Checks circuit breaker states
CustomHealthChecker(name, fn)Custom check from simple function

Kubernetes Integration:

livenessProbe:
  httpGet:
    path: /health
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 10

readinessProbe:
  httpGet:
    path: /ready
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 5

Features:

  • Concurrent check execution with timeout
  • Readiness response caching (1 second) to avoid hammering dependencies
  • Degraded status allows traffic while indicating issues
  • Component-level details for debugging

Implementation: internal/workflow/health.go, internal/workflow/health_test.go

Configuration Validation

The workflow package provides comprehensive validation of workflow configurations before execution.

Programmatic Usage:

import "gitlab.flexinfer.ai/libs/fi-fhir/internal/workflow"

// Load workflow from YAML
wf, err := workflow.LoadWorkflowFromFile("workflow.yaml")
if err != nil {
    log.Fatal(err)
}

// Validate workflow
result, err := workflow.ValidateWorkflow(wf)
if err != nil {
    log.Fatal(err)
}

if !result.Valid {
    for _, e := range result.Errors {
        fmt.Printf("ERROR [%s]: %s - %s\n", e.Code, e.Path, e.Message)
    }
    os.Exit(1)
}

for _, w := range result.Warnings {
    fmt.Printf("WARN [%s]: %s - %s\n", w.Code, w.Path, w.Message)
}

fmt.Println(result.Summary())

Validation Categories:

CategoryChecks
StructureWorkflow name, version, route names, duplicate detection
FiltersCEL expression syntax, filter criteria presence
Transformsset_field format, map_terminology fields, redact fields
ActionsType-specific required fields, URL/template validation

Severity Levels:

LevelBehaviorExample
errorPrevents workflow executionMissing required field
warningWorkflow runs but may have issuesNo authentication configured
infoInformational onlyUsing default values

Error Codes:

CodeDescription
MISSING_NAMEWorkflow name not specified
DUPLICATE_ROUTE_NAMETwo routes have the same name
INVALID_CELCEL expression cannot be compiled
MISSING_WEBHOOK_URLWebhook action missing URL
MISSING_FHIR_ENDPOINTFHIR action missing endpoint
MISSING_OAUTH_CLIENT_IDOAuth configured without client_id
MISSING_DB_CONNECTIONDatabase action missing connection
MISSING_QUEUE_DRIVERQueue action missing driver
INVALID_SET_FIELDset_field not in "path = value" format
NO_ROUTESWorkflow has no routes defined
NO_FILTERRoute has no filter criteria
NO_ACTIONSRoute has no actions

Validation Result:

type ValidationResult struct {
    Valid    bool              // false if any errors
    Errors   []ValidationError // Fatal issues
    Warnings []ValidationError // Non-fatal issues
    Info     []ValidationError // Informational
}

type ValidationError struct {
    Path     string            // e.g., "routes[0].filter.condition"
    Message  string            // Human-readable message
    Severity ValidationSeverity
    Code     string            // Machine-readable code
}

CLI Integration Example:

# Validate workflow before running
fi-fhir workflow validate workflow.yaml

# Output:
# ERROR [MISSING_FHIR_ENDPOINT]: routes[0].actions[0].endpoint - FHIR action requires endpoint
# WARN [NO_FHIR_AUTH]: routes[0].actions[0] - No authentication configured for FHIR action
# Workflow configuration invalid: 1 error(s), 1 warning(s)

Implementation: internal/workflow/validate.go, internal/workflow/validate_test.go

Event Replay / Simulation

The workflow package provides event recording, replay, and simulation capabilities for testing and debugging workflows.

Event Recording

Record events as they're processed for later replay:

import "gitlab.flexinfer.ai/libs/fi-fhir/internal/workflow"

// Option 1: Wrap engine with automatic recording
recorder := workflow.NewMemoryRecorder()
engine, _ := workflow.NewRecordingEngine(workflowConfig, recorder)

// Process events - they're automatically recorded
engine.Process(event1)
engine.Process(event2)

// Export to file for later analysis
recorder.Export("/path/to/events.json")

// Option 2: Manual recording after processing
engine, _ := workflow.NewEngine(workflowConfig)
recorder := workflow.NewMemoryRecorder()

result := engine.Process(event)
recorder.Record(event, result) // Record with processing result

Recorder Implementations:

ImplementationUse CasePersistence
MemoryRecorderTesting, short-term analysisNone (in-memory)
FileRecorderLong-term storage, production debuggingJSON files

Recorder Options:

// Limit recorder size (oldest events dropped)
recorder := workflow.NewMemoryRecorder(workflow.WithMaxSize(1000))

// Custom ID prefix for event identifiers
recorder := workflow.NewMemoryRecorder(workflow.WithIDPrefix("prod"))

Event Replay

Replay recorded events to test workflow changes:

// Load events from file
events, _ := workflow.LoadRecordedEvents("/path/to/events.json")

// Create replayer with new/modified workflow
replayer := workflow.NewEventReplayer(engine, recorder)

// Replay all events and compare results
summary, err := replayer.Replay(ctx, &workflow.ReplayConfig{
    EventTypes: []string{"patient_admit"}, // Filter by type (optional)
    Sources:    []string{"epic_adt"},      // Filter by source (optional)
    Limit:      100,                        // Max events to replay
    Callback: func(recorded *workflow.RecordedEvent, result *workflow.Result, diff *workflow.ReplayDiff) {
        if !diff.RoutingMatch {
            fmt.Printf("Event %s: routing changed\n", recorded.ID)
            fmt.Printf("  Added routes: %v\n", diff.AddedRoutes)
            fmt.Printf("  Removed routes: %v\n", diff.RemovedRoutes)
        }
    },
})

// Check summary
fmt.Printf("Replayed: %d events\n", summary.TotalEvents)
fmt.Printf("Routing changes: %d\n", summary.DifferentRouting)
fmt.Printf("Error state changes: %d\n", summary.DifferentErrors)

// Replay single event for debugging
result, diff, _ := replayer.ReplayOne(ctx, "evt-123")

Replay Summary:

type ReplaySummary struct {
    TotalEvents      int           // Events replayed
    MatchedRouting   int           // Same routing behavior
    DifferentRouting int           // Different routing
    MatchedErrors    int           // Same error state
    DifferentErrors  int           // Different error state
    TotalDuration    time.Duration // Time taken
    Diffs            []*ReplayDiff // Detailed differences
}

Use Cases:

  • Regression Testing: Verify workflow changes don't break existing routing
  • Debug Production Issues: Replay events that caused problems
  • Load Testing: Replay production traffic against staging
  • Workflow Migration: Compare old vs new workflow versions

Simulation Engine

Test workflows without side effects using mock actions:

// Create simulation engine (all actions mocked)
sim, _ := workflow.NewSimulationEngine(workflowConfig)

// Process events
sim.Process(event1)
sim.Process(event2)

// Verify behavior with assertions
err := sim.Assert().ActionCalledTimes("webhook", 2)
err = sim.Assert().ActionNotCalled("fhir")
err = sim.Assert().NoErrors()

// Inspect invocations
for _, inv := range sim.InvocationsOf("webhook") {
    fmt.Printf("Webhook called with config: %v\n", inv.Config)
}

// Generate report
report := sim.Report()
fmt.Printf("Total actions: %d\n", report.TotalActions)
fmt.Printf("Errors: %v\n", report.Errors)

Configuring Mock Behavior:

// Configure specific action to fail
sim.SetMock("webhook", workflow.NewMockAction().WithError(errors.New("network error")))

// Fail after N successful calls
sim.SetMock("fhir", workflow.NewMockAction().FailAfter(3, errors.New("rate limited")))

// Add simulated latency
sim.SetMock("database", workflow.NewMockAction().WithDelay(100 * time.Millisecond))

// Custom response data
sim.SetMock("queue", workflow.NewMockAction().WithResponse(map[string]string{"id": "msg-123"}))

Assertion Methods:

MethodDescription
ActionCalled(type)Assert action was called at least once
ActionNotCalled(type)Assert action was never called
ActionCalledTimes(type, n)Assert action called exactly N times
TotalActionCalls(n)Assert total action invocations
ActionCalledWithConfig(type, config)Assert action called with specific config
NoErrors()Assert no action errors occurred

Scenario Testing

Run test scenarios against the simulation engine:

runner := workflow.NewScenarioRunner(sim)

scenarios := []*workflow.Scenario{
    {
        Name: "patient admits route to webhook",
        Setup: func(sim *workflow.SimulationEngine) {
            // Optional: configure mocks
        },
        Events: []interface{}{
            map[string]interface{}{"type": "patient_admit", "source": "epic"},
            map[string]interface{}{"type": "patient_admit", "source": "cerner"},
        },
        Assertions: func(a *workflow.Assertions) error {
            if err := a.ActionCalledTimes("webhook", 2); err != nil {
                return err
            }
            return a.ActionNotCalled("fhir")
        },
    },
    {
        Name: "lab results route to FHIR",
        Events: []interface{}{
            map[string]interface{}{"type": "lab_result"},
        },
        Assertions: func(a *workflow.Assertions) error {
            return a.ActionCalled("fhir")
        },
    },
}

// Run all scenarios
results := runner.RunAll(scenarios)
for name, err := range results {
    if err != nil {
        fmt.Printf("FAIL: %s - %v\n", name, err)
    } else {
        fmt.Printf("PASS: %s\n", name)
    }
}

Implementation: internal/workflow/replay.go, internal/workflow/simulation.go


Performance Benchmarking

The workflow engine includes comprehensive benchmarks for measuring and tracking performance.

Running Benchmarks

# Run all workflow benchmarks
go test -bench=. -benchmem ./internal/workflow/...

# Run specific benchmark categories
go test -bench='BenchmarkEngine' -benchmem ./internal/workflow/...
go test -bench='BenchmarkCEL' -benchmem ./internal/workflow/...
go test -bench='BenchmarkTransform' -benchmem ./internal/workflow/...

# Run with longer duration for more stable results
go test -bench=. -benchmem -benchtime=1s ./internal/workflow/...

# Save baseline for comparison
go test -bench=. -benchmem ./internal/workflow/... > baseline.txt

# Compare against baseline (requires benchstat)
go install golang.org/x/perf/cmd/benchstat@latest
go test -bench=. -benchmem ./internal/workflow/... > current.txt
benchstat baseline.txt current.txt

Benchmark Categories

CategoryDescriptionKey Metrics
BenchmarkEngineProcess*Event processing throughputns/op, allocs/op
BenchmarkCELEvaluate*CEL condition evaluationCache hit vs miss
BenchmarkFilterMatch*Filter matching performanceEventType, Source, CEL
BenchmarkTransform*Transform pipelineSetField, Redact
BenchmarkThroughput*Events per secondevents/sec
BenchmarkParallel*Concurrent processingScalability

Performance Baselines

Typical performance on modern hardware (Apple M-series, AMD Ryzen):

BenchmarkExpectedThreshold
Engine.Process (simple)~1-2 µs< 5 µs
CEL Evaluate (cached)~150-250 ns< 500 ns
CEL Evaluate (compile)~40-50 µsN/A
Filter Match (combined)~1-1.5 µs< 3 µs
Transform SetField~180-200 ns< 500 ns

Using Benchmark Utilities

import "gitlab.flexinfer.ai/libs/fi-fhir/internal/workflow"

// Create benchmark suite for tracking
suite := workflow.NewBenchmarkSuite("v1.0.0")
suite.AddResult(workflow.BenchmarkResult{
    Name:         "BenchmarkEngineProcess",
    NsPerOp:      1500,
    AllocsPerOp:  30,
    EventsPerSec: 666666,
})

// Compare two suites
baseline := workflow.NewBenchmarkSuite("v1.0.0")
current := workflow.NewBenchmarkSuite("v1.1.0")
// ... add results ...

comparison := workflow.Compare(baseline, current)
fmt.Println(comparison.Summary())

if comparison.HasRegressions() {
    fmt.Println("Performance regression detected!")
}

Performance Thresholds

Define acceptable performance bounds for CI validation:

thresholds := workflow.DefaultWorkflowThresholds()
// Or customize:
thresholds := &workflow.PerformanceThresholds{
    MaxNsPerOp: map[string]float64{
        "BenchmarkEngineProcess": 5000,  // 5µs max
    },
    MinThroughput: map[string]float64{
        "BenchmarkThroughput_Simple": 100000,  // 100k events/sec
    },
}

violations := thresholds.Validate(suite)
if len(violations) > 0 {
    for _, v := range violations {
        fmt.Printf("Threshold violation: %s\n", v)
    }
}

Quick Runtime Benchmark

For runtime performance monitoring:

engine, _ := workflow.NewEngine(myWorkflow)
event := map[string]interface{}{"type": "patient_admit"}

// Run 100ms benchmark
opsPerSec := workflow.QuickBenchmark(engine, event, 100*time.Millisecond)
fmt.Printf("Current throughput: %.0f events/sec\n", opsPerSec)

Performance Optimization Tips

  1. CEL Expression Caching: CEL expressions are automatically cached after first compilation. Avoid dynamically generating expressions.

  2. Filter Short-Circuit: Place fast filters (EventType, Source) before CEL conditions in route order.

  3. Transform Pipeline: Transforms create deep copies of events. Minimize transforms for high-throughput routes.

  4. Parallel Processing: The engine is safe for concurrent use. Use goroutine pools for parallel event processing.

  5. Metrics Overhead: In-memory metrics add ~10-15% overhead. Use NoOpMetrics for maximum throughput.

Implementation: internal/workflow/benchmark_test.go, internal/workflow/benchmark_util.go


Load Testing

The workflow engine includes load testing utilities for validating performance under sustained load.

CLI Load Testing

# Quick smoke test (10s, 100 RPS)
fi-fhir workflow loadtest -c workflow.yaml -s smoke -v

# Standard load test (60s, 1000 RPS)
fi-fhir workflow loadtest -c workflow.yaml -s standard -v

# Custom load test
fi-fhir workflow loadtest -c workflow.yaml -d 60s -r 2000 -w 8 -v

# Stress test with JSON output
fi-fhir workflow loadtest -c workflow.yaml -s stress --json > results.json

# Burst test (maximum throughput)
fi-fhir workflow loadtest -c workflow.yaml -r 0 -d 10s -v

# List available scenarios
fi-fhir workflow loadtest --list-scenarios

Predefined Scenarios

ScenarioDurationTarget RPSWorkersUse Case
smoke10s1002Quick validation
standard60s10004Normal load testing
stress120s50008High load conditions
burst30sunlimited16Maximum throughput
soak5min5004Long-running stability

Programmatic Load Testing

import "gitlab.flexinfer.ai/libs/fi-fhir/internal/workflow"

engine, _ := workflow.NewEngine(myWorkflow)
tester := workflow.NewLoadTester(engine)

config := &workflow.LoadTestConfig{
    Duration:         60 * time.Second,
    TargetRPS:        1000,
    Workers:          4,
    WarmupDuration:   5 * time.Second,
    EventGenerator:   workflow.NewHealthcareEventGenerator(),
    ProgressInterval: 5 * time.Second,
    OnProgress: func(stats workflow.LoadTestProgress) {
        fmt.Printf("[%v] Rate: %.0f/s, P99: %v\n",
            stats.Elapsed, stats.EventsPerSec, stats.P99Latency)
    },
}

result, err := tester.Run(context.Background(), config)
if err != nil {
    log.Fatal(err)
}

fmt.Println(result.Summary())

// Validate against thresholds
if !result.Passed(0.90, 0.01, 10*time.Millisecond) {
    log.Fatal("Load test failed performance targets")
}

Event Generators

Different generators for varied load patterns:

// Static event (single event type)
gen := &workflow.StaticEventGenerator{
    Event: map[string]interface{}{"type": "patient_admit"},
}

// Random selection from pool
gen := workflow.NewRandomEventGenerator([]interface{}{
    map[string]interface{}{"type": "patient_admit"},
    map[string]interface{}{"type": "lab_result"},
})

// Weighted distribution (realistic traffic)
gen := workflow.NewWeightedEventGenerator([]workflow.WeightedEvent{
    {Event: map[string]interface{}{"type": "lab_result"}, Weight: 60},
    {Event: map[string]interface{}{"type": "patient_admit"}, Weight: 30},
    {Event: map[string]interface{}{"type": "claim_submitted"}, Weight: 10},
})

// Sequence (round-robin)
gen := &workflow.SequenceEventGenerator{
    Events: []interface{}{event1, event2, event3},
}

// Healthcare mix (realistic distribution)
gen := workflow.NewHealthcareEventGenerator()

Load Test Results

Results include comprehensive metrics:

  • Throughput: Total events, events/sec, achieved ratio vs target
  • Latency: Min, mean, P50, P90, P95, P99, P99.9, max, std dev
  • Errors: Count, rate, individual error details
  • Timing: Duration, warmup events

CI/CD Integration

# Run load test and fail if thresholds not met
fi-fhir workflow loadtest -c workflow.yaml -s standard --json > results.json

# Parse results with jq
if [ $(jq '.ErrorRate > 0.01' results.json) = "true" ]; then
    echo "Error rate exceeded threshold"
    exit 1
fi

if [ $(jq '.LatencyP99 > 10000000' results.json) = "true" ]; then
    echo "P99 latency exceeded 10ms"
    exit 1
fi

Implementation: internal/workflow/loadtest.go, internal/workflow/loadtest_test.go


Application Configuration

fi-fhir supports layered configuration following 12-Factor App principles:

Priority (highest to lowest):
1. CLI flags (not yet implemented for all options)
2. Environment variables (FI_FHIR_*)
3. Configuration file (YAML)
4. Default values

CLI Commands

# Show effective configuration (defaults + env overrides)
fi-fhir config show

# Show configuration from file with env overrides
fi-fhir config show -c config.yaml

# Validate configuration file
fi-fhir config validate config.yaml

# List all environment variables
fi-fhir config env

# Generate .env template
fi-fhir config env -f export > .env.template

# Generate sample config file
fi-fhir config init -o config.yaml
fi-fhir config init -m -o minimal.yaml  # minimal version

Configuration File

# config.yaml
server:
  host: '0.0.0.0'
  port: 8080
  read_timeout: 30s
  shutdown_timeout: 10s

workflow:
  config_path: workflow.yaml
  max_concurrency: 10
  retry_max_attempts: 3
  dlq_enabled: true

fhir:
  base_url: https://fhir.example.com/r4
  auth_type: oauth2 # none, basic, bearer, oauth2
  oauth2:
    token_url: https://auth.example.com/token
    client_id: my-app
    client_secret: ${secret:OAUTH_CLIENT_SECRET} # Secret reference

database:
  driver: postgres
  host: localhost
  password: ${secret:DATABASE_PASSWORD} # Secret reference

observability:
  metrics_enabled: true
  log_level: info
  log_format: json

Environment Variables

All configuration can be overridden via environment variables with the FI_FHIR_ prefix:

# Server
FI_FHIR_SERVER_PORT=9000
FI_FHIR_SERVER_READ_TIMEOUT=60s

# Workflow
FI_FHIR_WORKFLOW_CONFIG_PATH=/etc/workflow.yaml
FI_FHIR_WORKFLOW_DRY_RUN=true

# Database
FI_FHIR_DATABASE_HOST=db.example.com
FI_FHIR_DATABASE_PASSWORD=secret123

# Observability
FI_FHIR_LOG_LEVEL=debug
FI_FHIR_TRACING_ENABLED=true

See full list: fi-fhir config env

Secrets Management

Sensitive values can be stored separately using the ${secret:KEY} syntax:

# Values in config file
fhir:
  password: ${secret:FHIR_PASSWORD}
database:
  password: ${secret:DB_PASSWORD}

# Secret provider configuration
secrets:
  provider: env # env, file, vault, aws-ssm, k8s
  options:
    prefix: APP_SECRET_ # For env provider
    path: /run/secrets # For file provider

Secret Providers:

ProviderDescriptionUse Case
envEnvironment variablesLocal development, container environments
fileFiles in directory (filename=key)Docker secrets, Kubernetes secrets mounted as volumes
vaultHashiCorp Vault (planned)Enterprise deployments
aws-ssmAWS Systems Manager Parameter Store (planned)AWS deployments
k8sKubernetes secrets (uses file provider)Kubernetes deployments

Implementation: pkg/config/config.go, pkg/config/secrets.go


Deployment

Docker

Build and run with Docker:

# Build image
docker build -t fi-fhir:latest .

# Run with defaults
docker run -p 8080:8080 fi-fhir:latest

# Run with config file
docker run -p 8080:8080 \
  -v $(pwd)/config.yaml:/app/config/config.yaml \
  -e FI_FHIR_WORKFLOW_CONFIG_PATH=/app/config/workflow.yaml \
  fi-fhir:latest

Docker Compose

For local development with full stack:

# Start all services (fi-fhir, Postgres, Kafka, FHIR server, Jaeger, Prometheus, Grafana)
docker-compose up -d

# View logs
docker-compose logs -f fi-fhir

# Access services:
# - fi-fhir API: http://localhost:8080
# - fi-fhir metrics: http://localhost:9090/metrics
# - HAPI FHIR: http://localhost:8090
# - Jaeger UI: http://localhost:16686
# - Prometheus: http://localhost:9091
# - Grafana: http://localhost:3000 (admin/admin)

# Stop all services
docker-compose down

Kubernetes

Deploy using Kustomize:

# Deploy base configuration
kubectl apply -k deploy/kubernetes/base/

# Deploy production overlay
kubectl apply -k deploy/kubernetes/overlays/production/

# Check status
kubectl -n fi-fhir get pods
kubectl -n fi-fhir get services
kubectl -n fi-fhir logs -f deployment/fi-fhir

Key manifests in deploy/kubernetes/base/:

  • namespace.yaml - Namespace definition
  • deployment.yaml - Deployment with health checks, resource limits
  • service.yaml - ClusterIP service for internal access
  • ingress.yaml - Ingress with TLS (nginx + cert-manager)
  • configmap.yaml - Application and workflow configuration
  • secret.yaml - Sensitive configuration (replace in production)
  • pdb.yaml - Pod disruption budget for HA

Helm Chart

For more flexible deployments with parameterized values:

# Install with defaults
helm install fi-fhir deploy/helm/fi-fhir/

# Install with custom values
helm install fi-fhir deploy/helm/fi-fhir/ \
  --set replicaCount=3 \
  --set config.server.port=8080 \
  --set config.database.enabled=true \
  --set ingress.enabled=true \
  --set ingress.hosts[0].host=fi-fhir.example.com

# Install with values file
helm install fi-fhir deploy/helm/fi-fhir/ -f custom-values.yaml

# Upgrade deployment
helm upgrade fi-fhir deploy/helm/fi-fhir/ --reuse-values \
  --set image.tag=v1.2.0

# View generated manifests without installing
helm template fi-fhir deploy/helm/fi-fhir/ --debug

# Uninstall
helm uninstall fi-fhir

Key configurable values:

  • replicaCount - Number of pod replicas (default: 2)
  • config.server.* - Server settings (port, timeouts)
  • config.workflow.* - Workflow engine settings (concurrency, retries, DLQ)
  • config.database.enabled - Enable database action (requires secrets)
  • config.queue.enabled - Enable queue action (requires secrets)
  • autoscaling.enabled - Enable HPA for auto-scaling
  • ingress.enabled - Enable Ingress resource with TLS
  • serviceMonitor.enabled - Enable Prometheus ServiceMonitor
  • podDisruptionBudget.enabled - Enable PDB for HA

Implementation: deploy/helm/fi-fhir/

CI/CD Pipelines

GitLab CI (primary) - .gitlab-ci.yml:

# Pipeline stages
stages:
  - lint # golangci-lint, go fmt, go vet, helm lint
  - test # unit tests (Go 1.21, 1.22), integration, benchmarks
  - security # govulncheck, gosec, trivy
  - build # binary, docker image
  - release # multi-arch binaries, docker push, helm package

# Triggered on:
# - Merge requests to main
# - Pushes to main branch
# - Tag pushes (v*)

Security scanning (HIPAA compliance):

  • govulncheck - Go vulnerability database
  • gosec - Static security analysis
  • trivy - Container vulnerability scanning
  • golangci-lint - Comprehensive Go linting

Release process:

# Create and push a tag to trigger release
git tag v1.0.0
git push origin v1.0.0

# CI will automatically:
# 1. Run full test suite
# 2. Build multi-arch binaries (linux, darwin, windows × amd64, arm64)
# 3. Push Docker image to registry with version tags
# 4. Package and publish Helm chart
# 5. Create GitLab release with artifacts

Implementation: .gitlab-ci.yml, .golangci.yml


See Also