Observability Architecture

Reflow's observability is built on an event pipeline that translates low-level network events into rich engine events and forwards them to Zeal IDE for visibility, tracing, and replay.

System Components

1. ExecutionEngine

The engine creates an isolated Network per workflow execution and translates NetworkEvents (from reflow_network) into enriched EngineEvents with timing, size, and connection metadata.

The engine maintains a HashMap<String, Instant> to track per-actor start times, computing duration_ms when actors complete.

2. EventBridge

The bridge connects the engine's per-execution event channel to two consumers:

#![allow(unused)]
fn main() {
pub struct EventBridge {
    trace_collector: Option<Arc<TraceCollector>>,
    zip_session: Option<Arc<ZipSession>>,
}
}

One bridge task is spawned per execution via bridge.attach(workflow_id, execution_id, event_rx). The task:

  1. Begins a trace session via TraceCollector
  2. Drains the flume::Receiver<EngineEvent> channel
  3. Forwards each event to both TraceCollector and ZipSession
  4. Tracks terminal state (success/failure based on Completed and Failed events)
  5. Completes the trace session when the channel closes (sender dropped)

3. TraceCollector

Submits per-node execution data to Zeal's TracesAPI over HTTP. Manages trace session lifecycle:

#![allow(unused)]
fn main() {
pub struct TraceCollector {
    traces_api: tokio::sync::Mutex<TracesAPI>,
    sessions: tokio::sync::Mutex<HashMap<String, ActiveSession>>,
    batch_size: usize,  // default: 50
}
}

Each ActiveSession tracks:

  • session_id — returned by Zeal on session creation
  • start_time — for duration calculation
  • nodes_completed / nodes_failed — aggregate counters
  • total_data_processed — bytes processed across all nodes
  • pending_events — buffered TraceEvents awaiting flush

Events are flushed when the buffer reaches batch_size (50) or when the session completes.

4. ZipSession

Manages the outbound connection to Zeal IDE:

  • Template Registration: Registers all actor templates (native + API) on startup
  • WebSocket Channel: Opens a tokio-tungstenite connection to /ws/zip
  • Event Translation: Converts EngineEvents to ZipExecutionEvents using zeal-sdk helpers
  • Event Emission: Pushes ZIP events as JSON text frames over WebSocket
#![allow(unused)]
fn main() {
pub struct ZipSession {
    config: ZipSessionConfig,
    client: ZealClient,
    ws: ZipWebSocket,       // Mutex<Option<WsSink>>
    engine: Arc<ExecutionEngine>,
    shutdown: Arc<Notify>,
}
}

Event Flow

sequenceDiagram
    participant N as Network
    participant E as ExecutionEngine
    participant EB as EventBridge
    participant TC as TraceCollector
    participant ZS as ZipSession
    participant Z as Zeal IDE

    N->>E: NetworkEvent (ActorStarted, ActorCompleted, MessageSent, NetworkIdle)
    E->>E: Translate to EngineEvent (enrich with duration_ms, output_size)
    E->>EB: flume channel
    par TraceCollector
        EB->>TC: process_event()
        TC->>TC: Buffer TraceEvent (batch_size=50)
        TC->>Z: POST /api/traces/sessions/{id}/events
    and ZipSession
        EB->>ZS: emit_engine_event()
        ZS->>ZS: translate_event() → ZipExecutionEvent
        ZS->>Z: WebSocket text frame (JSON)
    end

NetworkEvent → EngineEvent Translation

The engine's run_execution() loop translates each NetworkEvent into an EngineEvent:

NetworkEventEngineEventTypeEnrichments
ActorStarted { actor_id }(records start time)Stored in timing map
ActorCompleted { actor_id, output }ActorCompletedduration_ms, output_size, output_connections
ActorFailed { actor_id, error }ActorFailederror, output_connections
MessageSent { from, to, data }MessageSentsize (serialized bytes)
NetworkIdle / NetworkShutdownCompletedduration_ms, nodes_executed, nodes_failed

The engine waits for NetworkIdle or NetworkShutdown before emitting the Completed event — it does not fire prematurely after network.start().

EngineEvent → ZIP Event Translation

The ZipSession::translate_event() method maps engine events to Zeal SDK types:

EngineEventTypeZipExecutionEventOptions
StartedExecutionStartedworkflow_id, execution_id
NodeExecutingNodeExecutinginput connections
ActorCompletedNodeCompletedNodeCompletedOptions { duration, output_size }
ActorFailedNodeFailedNodeError { message, code, stack }
CompletedExecutionCompletedExecutionSummary { success_count, error_count }
FailedExecutionFailedExecutionError { message }, ExecutionFailedOptions { duration }

MessageSent and NetworkIdle have no ZIP mapping and are silently dropped.

EngineEvent → TraceEvent Translation

The TraceCollector::process_event() method maps engine events to zeal-sdk trace types:

EngineEventTypeTraceEventTypeTraceData
NodeExecutingInputdata_type: "lifecycle", preview: {"status": "executing"}
ActorCompletedOutputdata_type: "application/json", size, duration, preview
ActorFailedErrorTraceError { message }
MessageSentOutputdata_type: "message", size, from/to preview

Trace Session Lifecycle

stateDiagram-v2
    [*] --> Created: POST /api/traces/sessions
    Created --> Active: Events flowing
    Active --> Active: Buffer events (batch_size=50)
    Active --> Flushing: Buffer full
    Flushing --> Active: POST events batch
    Active --> Completing: Channel closed
    Completing --> Done: POST complete with SessionSummary
    Done --> [*]

The SessionSummary submitted on completion includes:

  • total_nodes — nodes completed + failed
  • successful_nodes / failed_nodes
  • total_duration — wall clock ms
  • total_data_processed — bytes across all nodes

Configuration

The observability pipeline is activated when zeal_url is set in ServerConfig:

#![allow(unused)]
fn main() {
let config = ServerConfig {
    zeal_url: Some("http://localhost:3000".to_string()),
    // ...
};
}

When zeal_url is None, no EventBridge is created and executions run without observability forwarding. The REST API still works for headless execution.

Graceful Degradation

  • If the WebSocket connection fails during ZipSession::start(), a warning is logged but the session continues (traces still work via HTTP)
  • If TraceCollector fails to begin a session, an error is logged but execution continues
  • Individual event forwarding failures are logged at debug level and do not interrupt execution

Next Steps