Data Flow Tracing
Data Flow Tracing is a core component of Reflow's observability framework, providing automatic and comprehensive tracking of data movement between actors in your network. This feature gives you unprecedented visibility into how information flows through your system.
Overview
Traditional actor monitoring focuses on individual actor behavior - creation, completion, and failures. Data Flow Tracing extends this by capturing the connections between actors, providing insights into:
- Message Routing: How messages travel through your actor network
- Data Lineage: Complete paths of data transformation
- Performance Bottlenecks: Where data flow slows down or gets congested
- System Dependencies: Which actors depend on which data sources
How Data Flow Tracing Works
Automatic Capture
Data Flow Tracing operates at the connector level, intercepting messages as they flow between actors:
#![allow(unused)] fn main() { // Automatic tracing in connector implementation impl Connector { pub async fn send_message(&self, message: Message) -> Result<()> { // Send the actual message self.channel.send(message.clone()).await?; // Automatically record data flow event if let Some(tracing) = global_tracing() { tracing.trace_data_flow( &self.from_actor, &self.from_port, &self.to_actor, &self.to_port, message.type_name(), message.size_bytes() ).await?; } Ok(()) } } }
This approach provides several advantages:
- Zero Configuration: Works immediately with existing actor networks
- Complete Coverage: Captures all message flows without missing any
- Accurate Timing: Records actual transmission times
- Minimal Overhead: Efficient implementation with batching
Event Structure
Data Flow events contain rich metadata about the message transfer:
#![allow(unused)] fn main() { pub struct DataFlowEvent { // Standard event fields event_id: EventId, timestamp: DateTime<Utc>, event_type: TraceEventType::DataFlow { to_actor: String, // Destination actor to_port: String, // Destination port }, actor_id: String, // Source actor (from_actor) // Data flow specific information data: TraceEventData { port: Some("output".to_string()), // Source port message: Some(MessageSnapshot { message_type: "SensorReading".to_string(), size_bytes: 256, checksum: "sha256:abc123...", serialized_data: vec![], // Optional data capture }), performance_metrics: PerformanceMetrics { execution_time_ns: 1_500_000, // 1.5ms transfer time queue_depth: 3, // Destination queue depth throughput_msgs_per_sec: 1000.0, memory_usage_bytes: 512, // Memory for message processing cpu_usage_percent: 2.5, }, custom_attributes: HashMap::from([ ("source_actor", json!("sensor_reader")), ("source_port", json!("data")), ("destination_actor", json!("data_processor")), ("destination_port", json!("input")), ("message_id", json!("msg_12345")), ("protocol", json!("memory_channel")), ("compression", json!("none")), ]), ..Default::default() }, } }
Use Cases
1. Data Lineage Tracking
Track how data flows and transforms through your entire pipeline:
graph LR
A[Sensor Reader] -->|SensorReading| B[Data Validator]
B -->|ValidatedReading| C[Data Transformer]
C -->|ProcessedData| D[Analytics Engine]
D -->|Insights| E[Dashboard]
style A fill:#e1f5fe
style E fill:#f3e5f5
Query for complete data lineage:
#![allow(unused)] fn main() { // Find all data flow for a specific message let query = TraceQuery { event_types: Some(vec![TraceEventType::DataFlow { to_actor: "*".to_string(), to_port: "*".to_string() }]), custom_filter: Some("message_id = 'msg_12345'"), ..Default::default() }; let lineage = tracing_client.query_traces(query).await?; }
2. Performance Analysis
Identify bottlenecks in your data processing pipeline:
#![allow(unused)] fn main() { // Query for slow data transfers let slow_transfers = TraceQuery { event_types: Some(vec![TraceEventType::DataFlow { to_actor: "*".to_string(), to_port: "*".to_string() }]), performance_filter: Some("execution_time_ns > 10000000"), // > 10ms time_range: Some((Utc::now() - Duration::hours(1), Utc::now())), ..Default::default() }; }
3. System Dependency Mapping
Understand which actors depend on which data sources:
-- Find most active data flows
SELECT
source_actor,
destination_actor,
COUNT(*) as message_count,
AVG(execution_time_ns) as avg_transfer_time,
SUM(size_bytes) as total_bytes
FROM data_flow_events
WHERE timestamp > NOW() - INTERVAL '1 hour'
GROUP BY source_actor, destination_actor
ORDER BY message_count DESC;
4. Real-time Monitoring
Monitor data flow in real-time for operational awareness:
#![allow(unused)] fn main() { // Subscribe to data flow events for specific actors let filters = SubscriptionFilters { actor_ids: Some(vec!["critical_processor".to_string()]), event_types: Some(vec![TraceEventType::DataFlow { to_actor: "*".to_string(), to_port: "*".to_string() }]), ..Default::default() }; tracing_client.subscribe(filters).await?; }
Configuration
Enabling Data Flow Tracing
Data Flow Tracing is enabled automatically when you enable the observability framework:
#![allow(unused)] fn main() { let tracing_config = TracingConfig { server_url: "ws://localhost:8080".to_string(), enabled: true, // Enables all tracing including data flow batch_size: 50, // Batch size for data flow events batch_timeout: Duration::from_millis(1000), enable_compression: true, // Recommended for data flow events ..Default::default() }; }
Selective Tracing
For high-throughput systems, you might want to selectively trace certain data flows:
#![allow(unused)] fn main() { // Custom connector with selective tracing impl SelectiveConnector { pub async fn send_message(&self, message: Message) -> Result<()> { self.channel.send(message.clone()).await?; // Only trace certain message types or conditions if should_trace_message(&message) { if let Some(tracing) = global_tracing() { tracing.trace_data_flow( &self.from_actor, &self.from_port, &self.to_actor, &self.to_port, message.type_name(), message.size_bytes() ).await?; } } Ok(()) } } fn should_trace_message(message: &Message) -> bool { // Trace based on message type, size, or other criteria match message.type_name() { "CriticalAlert" => true, // Always trace alerts "DebugInfo" => false, // Never trace debug info "DataUpdate" if message.size_bytes() > 1024 => true, // Large updates only _ => rand::random::<f64>() < 0.1, // Sample 10% of other messages } } }
Sampling Configuration
For extremely high-throughput scenarios, implement sampling:
#![allow(unused)] fn main() { pub struct DataFlowSampler { sample_rate: f64, // 0.0 to 1.0 always_trace: Vec<String>, // Actor names to always trace never_trace: Vec<String>, // Actor names to never trace } impl DataFlowSampler { pub fn should_trace(&self, from_actor: &str, to_actor: &str) -> bool { if self.never_trace.contains(&from_actor.to_string()) || self.never_trace.contains(&to_actor.to_string()) { return false; } if self.always_trace.contains(&from_actor.to_string()) || self.always_trace.contains(&to_actor.to_string()) { return true; } rand::random::<f64>() < self.sample_rate } } }
Advanced Features
Message Content Capture
For debugging purposes, you can optionally capture message content:
#![allow(unused)] fn main() { let event = TraceEvent::data_flow_with_content( from_actor, from_port, to_actor, to_port, message_type, size_bytes, Some(message.serialize()?) // Optional content capture ); }
⚠️ Security Warning: Be careful when capturing message content in production. Ensure no sensitive data is included.
Custom Metadata
Add custom metadata to data flow events:
#![allow(unused)] fn main() { // Enhanced data flow tracing with custom metadata pub async fn trace_enhanced_data_flow( tracing: &TracingIntegration, from_actor: &str, from_port: &str, to_actor: &str, to_port: &str, message: &Message, custom_metadata: HashMap<String, serde_json::Value> ) -> Result<()> { let mut event = TraceEvent::data_flow( from_actor.to_string(), from_port.to_string(), to_actor.to_string(), to_port.to_string(), message.type_name(), message.size_bytes() ); // Add custom metadata event.data.custom_attributes.extend(custom_metadata); // Add message-specific metadata event.data.custom_attributes.insert( "message_priority".to_string(), json!(message.priority()) ); event.data.custom_attributes.insert( "message_correlation_id".to_string(), json!(message.correlation_id()) ); tracing.record_event(event).await } }
Causality Tracking
Link data flow events to their triggering events:
#![allow(unused)] fn main() { pub async fn trace_causally_linked_data_flow( tracing: &TracingIntegration, triggering_event_id: EventId, from_actor: &str, from_port: &str, to_actor: &str, to_port: &str, message: &Message ) -> Result<()> { let mut event = TraceEvent::data_flow( from_actor.to_string(), from_port.to_string(), to_actor.to_string(), to_port.to_string(), message.type_name(), message.size_bytes() ); // Link to triggering event event.causality.parent_event_id = Some(triggering_event_id); event.causality.dependency_chain.push(triggering_event_id); tracing.record_event(event).await } }
Performance Considerations
Overhead Analysis
Data Flow Tracing introduces minimal overhead:
- Memory: ~200 bytes per event
- CPU: ~0.1ms per event (including serialization)
- Network: Batched transmission reduces network calls
- Storage: ~1KB per event when stored
Optimization Strategies
- Batching: Use larger batch sizes for high-throughput scenarios
- Compression: Enable compression for network transmission
- Sampling: Sample events rather than capturing every one
- Filtering: Use selective tracing based on criticality
- Async Processing: All tracing operations are non-blocking
Monitoring Performance Impact
Monitor the tracing system's own performance:
#![allow(unused)] fn main() { // Monitor tracing overhead let tracing_metrics = global_tracing() .unwrap() .get_performance_metrics() .await?; println!("Events per second: {}", tracing_metrics.events_per_second); println!("Average latency: {}ms", tracing_metrics.avg_latency_ms); println!("Memory usage: {}MB", tracing_metrics.memory_usage_mb); }
Visualization and Analysis
Data Flow Diagrams
Generate visual representations of your data flow:
#![allow(unused)] fn main() { // Generate data flow graph for the last hour let flow_data = tracing_client.query_data_flows( TraceQuery { time_range: Some((Utc::now() - Duration::hours(1), Utc::now())), ..Default::default() } ).await?; let graph = DataFlowGraph::from_events(&flow_data); graph.render_to_file("data_flow_diagram.svg")?; }
Real-time Dashboard
Build real-time monitoring dashboards:
// WebSocket connection for real-time data flow monitoring
const ws = new WebSocket('ws://tracing-server:8080');
ws.onmessage = (event) => {
const traceEvent = JSON.parse(event.data);
if (traceEvent.event_type.DataFlow) {
updateDataFlowVisualization(traceEvent);
}
};
Troubleshooting
Common Issues
No Data Flow Events Appearing:
- Verify tracing is enabled:
enabled: true - Check that actors are connected via standard connectors
- Ensure global tracing is initialized before network operations
Too Many Events:
- Implement sampling: reduce
sample_rate - Use selective tracing for specific actors only
- Increase
batch_sizeto reduce network overhead
Performance Impact:
- Enable compression:
enable_compression: true - Use PostgreSQL backend for better concurrent performance
- Consider async event processing
Debugging Data Flow Issues
Use data flow tracing to debug connectivity and performance issues:
#![allow(unused)] fn main() { // Debug missing data flows let missing_flows = TraceQuery { actor_filter: Some("source_actor".to_string()), event_types: Some(vec![TraceEventType::MessageSent]), time_range: Some((start_time, end_time)), ..Default::default() }; let sent_messages = tracing_client.query_traces(missing_flows).await?; // Check if corresponding DataFlow events exist for sent_event in sent_messages { let corresponding_flow = find_data_flow_for_message(&sent_event).await?; if corresponding_flow.is_none() { println!("Missing data flow for message: {:?}", sent_event); } } }
Best Practices
- Start Simple: Begin with default settings and tune based on your needs
- Monitor Overhead: Keep an eye on the performance impact of tracing
- Use Sampling: For high-throughput systems, sample rather than trace everything
- Secure Sensitive Data: Never trace sensitive message content
- Regular Cleanup: Set up automatic cleanup of old trace data
- Correlate Events: Use causality tracking to link related events
- Custom Metadata: Add domain-specific metadata for better insights
Data Flow Tracing provides unprecedented visibility into your actor network's communication patterns. Use it to understand, debug, and optimize your distributed systems with confidence.