Distributed Workflow Example
Learn how to build and deploy distributed workflows using Reflow's distributed networking capabilities.
Overview
This tutorial demonstrates how to create a complete distributed workflow that spans multiple network instances. We'll build a real-world example: a distributed data processing and machine learning pipeline.
What You'll Build
A distributed system with three network instances:
- Data Instance: Collects and processes raw data
- ML Instance: Trains and evaluates machine learning models
- API Instance: Serves predictions and provides monitoring
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Data Instance │───▶│ ML Instance │───▶│ API Instance │
│ │ │ │ │ │
│ • Data Collector│ │ • Feature Eng. │ │ • Prediction API│
│ • Data Processor│ │ • Model Trainer │ │ • Monitoring │
│ • Data Validator│ │ • Model Eval. │ │ • Dashboard │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Prerequisites
- Rust development environment
- Basic understanding of Reflow actors and networks
- Familiarity with distributed systems concepts
Step 1: Project Setup
Create the project structure:
mkdir distributed_ml_pipeline
cd distributed_ml_pipeline
# Create instance directories
mkdir -p instances/{data,ml,api}
mkdir -p shared/actors
mkdir -p shared/types
# Initialize Cargo workspace
cargo init --name distributed_ml_pipeline
Cargo.toml
[workspace]
members = [
"instances/data",
"instances/ml",
"instances/api",
"shared/actors",
"shared/types"
]
[workspace.dependencies]
reflow_network = { path = "../../crates/reflow_network" }
actor_macro = { path = "../../crates/actor_macro" }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
uuid = { version = "1.0", features = ["v4"] }
chrono = { version = "0.4", features = ["serde"] }
Step 2: Shared Types and Actors
Shared Types
Create shared/types/src/lib.rs:
#![allow(unused)] fn main() { use serde::{Deserialize, Serialize}; use chrono::{DateTime, Utc}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DataRecord { pub id: String, pub timestamp: DateTime<Utc>, pub features: Vec<f64>, pub metadata: std::collections::HashMap<String, serde_json::Value>, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ProcessedData { pub record_id: String, pub processed_features: Vec<f64>, pub quality_score: f64, pub processing_timestamp: DateTime<Utc>, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TrainingData { pub features: Vec<Vec<f64>>, pub labels: Vec<f64>, pub metadata: TrainingMetadata, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TrainingMetadata { pub total_samples: usize, pub feature_count: usize, pub training_timestamp: DateTime<Utc>, pub data_source: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TrainedModel { pub model_id: String, pub model_data: Vec<u8>, // Serialized model pub performance_metrics: ModelMetrics, pub training_timestamp: DateTime<Utc>, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ModelMetrics { pub accuracy: f64, pub precision: f64, pub recall: f64, pub f1_score: f64, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PredictionRequest { pub request_id: String, pub features: Vec<f64>, pub model_version: Option<String>, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PredictionResponse { pub request_id: String, pub prediction: f64, pub confidence: f64, pub model_version: String, pub processing_time_ms: u64, } }
Shared Actors
Create shared/actors/src/lib.rs:
#![allow(unused)] fn main() { use reflow_network::{ actor::{Actor, ActorConfig, ActorContext, ActorLoad, MemoryState, Port}, message::{Message, EncodableValue}, }; use shared_types::*; use std::{collections::HashMap, sync::Arc}; use actor_macro::actor; use anyhow::Error; /// Logging actor that can be shared across all instances #[actor( DistributedLoggerActor, inports::<100>(Input), outports::<50>(Output), state(MemoryState) )] pub async fn distributed_logger_actor( context: ActorContext, ) -> Result<HashMap<String, Message>, Error> { let payload = context.get_payload(); let config = context.get_config(); let instance_name = config.get_string("instance_name").unwrap_or("unknown".to_string()); let log_level = config.get_string("log_level").unwrap_or("info".to_string()); for (port, message) in payload.iter() { let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f"); match message { Message::String(s) => { println!("[{}] [{}] [{}]: {}", timestamp, instance_name, log_level.to_uppercase(), s); }, Message::Object(obj) => { if let Ok(json_str) = serde_json::to_string_pretty(obj) { println!("[{}] [{}] [{}]:\n{}", timestamp, instance_name, log_level.to_uppercase(), json_str); } }, _ => { println!("[{}] [{}] [{}]: {:?}", timestamp, instance_name, log_level.to_uppercase(), message); } } } Ok(HashMap::new()) } /// Metrics collector for monitoring distributed system performance #[actor( MetricsCollectorActor, inports::<100>(Input), outports::<50>(Output, Alert), state(MemoryState) )] pub async fn metrics_collector_actor( context: ActorContext, ) -> Result<HashMap<String, Message>, Error> { let payload = context.get_payload(); let state = context.get_state(); let mut output = HashMap::new(); for (port, message) in payload.iter() { if let Message::Object(metric_data) = message { // Store metrics in state { let mut state_lock = state.lock(); if let Some(state_data) = state_lock.as_mut_any().downcast_mut::<MemoryState>() { let metrics_key = format!("metrics_{}", chrono::Utc::now().timestamp()); state_data.insert(metrics_key, metric_data.as_value().clone()); // Keep only last 100 metrics entries let keys: Vec<String> = state_data.data().keys() .filter(|k| k.starts_with("metrics_")) .cloned() .collect(); if keys.len() > 100 { let mut sorted_keys = keys; sorted_keys.sort(); for key in sorted_keys.into_iter().take(keys.len() - 100) { state_data.data_mut().remove(&key); } } } } // Check for alert conditions if let Some(error_rate) = metric_data.as_value().get("error_rate").and_then(|v| v.as_f64()) { if error_rate > 0.1 { // 10% error rate threshold let alert = Message::object(EncodableValue::from(serde_json::json!({ "type": "high_error_rate", "error_rate": error_rate, "timestamp": chrono::Utc::now().to_rfc3339(), "severity": "warning" }))); output.insert("Alert".to_string(), alert); } } // Forward metrics for further processing output.insert("Output".to_string(), message.clone()); } } Ok(output) } }
Step 3: Data Instance
Create the data processing instance in instances/data/src/main.rs:
use reflow_network::{ actor::{Actor, ActorConfig, ActorContext, ActorLoad, MemoryState, Port}, distributed_network::{DistributedConfig, DistributedNetwork}, message::{Message, EncodableValue}, network::NetworkConfig, }; use shared_actors::{DistributedLoggerActor, MetricsCollectorActor}; use shared_types::*; use std::{collections::HashMap, sync::Arc, time::Duration}; use actor_macro::actor; use anyhow::Error; use tokio::time::sleep; /// Data collector that simulates collecting raw data #[actor( DataCollectorActor, inports::<100>(Trigger), outports::<50>(Output, Metrics), state(MemoryState) )] async fn data_collector_actor( context: ActorContext, ) -> Result<HashMap<String, Message>, Error> { let payload = context.get_payload(); let state = context.get_state(); let mut output = HashMap::new(); if payload.contains_key("Trigger") { // Generate sample data let record = DataRecord { id: uuid::Uuid::new_v4().to_string(), timestamp: chrono::Utc::now(), features: (0..10).map(|_| rand::random::<f64>()).collect(), metadata: HashMap::from([ ("source".to_string(), serde_json::json!("sensor_array")), ("quality".to_string(), serde_json::json!("high")), ]), }; // Update collection count let count = { let mut state_lock = state.lock(); if let Some(state_data) = state_lock.as_mut_any().downcast_mut::<MemoryState>() { let count = state_data.get("collection_count") .and_then(|v| v.as_i64()) .unwrap_or(0) + 1; state_data.insert("collection_count".to_string(), serde_json::json!(count)); count } else { 1 } }; // Send data for processing let data_message = Message::object(EncodableValue::from(serde_json::to_value(record)?)); output.insert("Output".to_string(), data_message); // Send metrics let metrics = Message::object(EncodableValue::from(serde_json::json!({ "actor": "data_collector", "records_collected": count, "timestamp": chrono::Utc::now().to_rfc3339(), "instance": "data" }))); output.insert("Metrics".to_string(), metrics); } Ok(output) } /// Data processor that cleans and validates data #[actor( DataProcessorActor, inports::<100>(Input), outports::<50>(Output, Metrics, Log), state(MemoryState) )] async fn data_processor_actor( context: ActorContext, ) -> Result<HashMap<String, Message>, Error> { let payload = context.get_payload(); let mut output = HashMap::new(); for (port, message) in payload.iter() { if port == "Input" { if let Message::Object(obj) = message { if let Ok(record) = serde_json::from_value::<DataRecord>(obj.as_value().clone()) { // Simulate data processing let processed = ProcessedData { record_id: record.id.clone(), processed_features: record.features.iter() .map(|&f| f * 2.0 + 1.0) // Simple transformation .collect(), quality_score: record.features.iter().sum::<f64>() / record.features.len() as f64, processing_timestamp: chrono::Utc::now(), }; // Send processed data let processed_message = Message::object(EncodableValue::from(serde_json::to_value(processed)?)); output.insert("Output".to_string(), processed_message); // Send log message let log_message = Message::String( format!("Processed data record {} with quality score {:.2}", record.id, processed.quality_score).into() ); output.insert("Log".to_string(), log_message); // Send metrics let metrics = Message::object(EncodableValue::from(serde_json::json!({ "actor": "data_processor", "processing_time_ms": 10, // Simulated "quality_score": processed.quality_score, "timestamp": chrono::Utc::now().to_rfc3339(), "instance": "data" }))); output.insert("Metrics".to_string(), metrics); } } } } Ok(output) } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { tracing_subscriber::fmt::init(); println!("🚀 Starting Data Instance"); // Configure distributed network let config = DistributedConfig { network_id: "data_instance".to_string(), instance_id: "data_001".to_string(), bind_address: "127.0.0.1".to_string(), bind_port: 9001, discovery_endpoints: vec![], auth_token: Some("data_token".to_string()), max_connections: 10, heartbeat_interval_ms: 30000, local_network_config: NetworkConfig::default(), }; // Create distributed network let mut network = DistributedNetwork::new(config).await?; // Register local actors network.register_local_actor("data_collector", DataCollectorActor::new(), None)?; network.register_local_actor("data_processor", DataProcessorActor::new(), None)?; network.register_local_actor("logger", DistributedLoggerActor::new(), Some(HashMap::from([ ("instance_name".to_string(), serde_json::json!("data")), ])))?; network.register_local_actor("metrics", MetricsCollectorActor::new(), None)?; // Start the network network.start().await?; // Get local network for workflow setup { let local_net = network.get_local_network(); let mut net = local_net.write(); // Create workflow connections net.add_connection(reflow_network::connector::Connector { from: reflow_network::connector::ConnectionPoint { actor: "data_collector".to_string(), port: "Output".to_string(), ..Default::default() }, to: reflow_network::connector::ConnectionPoint { actor: "data_processor".to_string(), port: "Input".to_string(), ..Default::default() }, })?; net.add_connection(reflow_network::connector::Connector { from: reflow_network::connector::ConnectionPoint { actor: "data_processor".to_string(), port: "Log".to_string(), ..Default::default() }, to: reflow_network::connector::ConnectionPoint { actor: "logger".to_string(), port: "Input".to_string(), ..Default::default() }, })?; net.add_connection(reflow_network::connector::Connector { from: reflow_network::connector::ConnectionPoint { actor: "data_processor".to_string(), port: "Metrics".to_string(), ..Default::default() }, to: reflow_network::connector::ConnectionPoint { actor: "metrics".to_string(), port: "Input".to_string(), ..Default::default() }, })?; } println!("✅ Data Instance ready on 127.0.0.1:9001"); // Start data collection loop tokio::spawn(async move { loop { sleep(Duration::from_secs(5)).await; // Trigger data collection let trigger_message = Message::Boolean(true); if let Ok(local_net) = network.get_local_network().try_read() { let _ = local_net.send_to_actor("data_collector", "Trigger", trigger_message); } } }); // Keep running loop { sleep(Duration::from_secs(1)).await; } }
Step 4: ML Instance
Create the ML training instance in instances/ml/src/main.rs:
use reflow_network::{ actor::{Actor, ActorConfig, ActorContext, ActorLoad, MemoryState, Port}, distributed_network::{DistributedConfig, DistributedNetwork}, message::{Message, EncodableValue}, network::NetworkConfig, }; use shared_actors::{DistributedLoggerActor, MetricsCollectorActor}; use shared_types::*; use std::{collections::HashMap, sync::Arc, time::Duration}; use actor_macro::actor; use anyhow::Error; use tokio::time::sleep; /// Feature engineer that prepares data for ML training #[actor( FeatureEngineerActor, inports::<100>(Input), outports::<50>(Output, Log, Metrics), state(MemoryState) )] async fn feature_engineer_actor( context: ActorContext, ) -> Result<HashMap<String, Message>, Error> { let payload = context.get_payload(); let state = context.get_state(); let mut output = HashMap::new(); for (port, message) in payload.iter() { if port == "Input" { if let Message::Object(obj) = message { if let Ok(processed) = serde_json::from_value::<ProcessedData>(obj.as_value().clone()) { // Accumulate features for batch training { let mut state_lock = state.lock(); if let Some(state_data) = state_lock.as_mut_any().downcast_mut::<MemoryState>() { let mut features: Vec<Vec<f64>> = state_data.get("accumulated_features") .and_then(|v| serde_json::from_value(v.clone()).ok()) .unwrap_or_default(); let mut labels: Vec<f64> = state_data.get("accumulated_labels") .and_then(|v| serde_json::from_value(v.clone()).ok()) .unwrap_or_default(); features.push(processed.processed_features.clone()); labels.push(processed.quality_score); // Use quality score as label state_data.insert("accumulated_features".to_string(), serde_json::to_value(&features)?); state_data.insert("accumulated_labels".to_string(), serde_json::to_value(&labels)?); // Send training data when we have enough samples if features.len() >= 10 { let training_data = TrainingData { features: features.clone(), labels: labels.clone(), metadata: TrainingMetadata { total_samples: features.len(), feature_count: features[0].len(), training_timestamp: chrono::Utc::now(), data_source: "data_instance".to_string(), }, }; let training_message = Message::object(EncodableValue::from(serde_json::to_value(training_data)?)); output.insert("Output".to_string(), training_message); // Reset accumulation state_data.insert("accumulated_features".to_string(), serde_json::json!([])); state_data.insert("accumulated_labels".to_string(), serde_json::json!([])); let log_message = Message::String( format!("Generated training batch with {} samples", features.len()).into() ); output.insert("Log".to_string(), log_message); } } } // Send metrics let metrics = Message::object(EncodableValue::from(serde_json::json!({ "actor": "feature_engineer", "features_processed": 1, "timestamp": chrono::Utc::now().to_rfc3339(), "instance": "ml" }))); output.insert("Metrics".to_string(), metrics); } } } } Ok(output) } /// Model trainer that trains ML models #[actor( ModelTrainerActor, inports::<100>(Input), outports::<50>(Output, Log, Metrics), state(MemoryState) )] async fn model_trainer_actor( context: ActorContext, ) -> Result<HashMap<String, Message>, Error> { let payload = context.get_payload(); let mut output = HashMap::new(); for (port, message) in payload.iter() { if port == "Input" { if let Message::Object(obj) = message { if let Ok(training_data) = serde_json::from_value::<TrainingData>(obj.as_value().clone()) { // Simulate model training sleep(Duration::from_millis(100)).await; // Simulate training time let model = TrainedModel { model_id: uuid::Uuid::new_v4().to_string(), model_data: vec![1, 2, 3, 4, 5], // Dummy model data performance_metrics: ModelMetrics { accuracy: 0.85 + rand::random::<f64>() * 0.1, precision: 0.82 + rand::random::<f64>() * 0.15, recall: 0.78 + rand::random::<f64>() * 0.2, f1_score: 0.80 + rand::random::<f64>() * 0.15, }, training_timestamp: chrono::Utc::now(), }; // Send trained model let model_message = Message::object(EncodableValue::from(serde_json::to_value(model.clone())?)); output.insert("Output".to_string(), model_message); // Send log message let log_message = Message::String( format!("Trained model {} with accuracy {:.3}", model.model_id, model.performance_metrics.accuracy).into() ); output.insert("Log".to_string(), log_message); // Send metrics let metrics = Message::object(EncodableValue::from(serde_json::json!({ "actor": "model_trainer", "model_id": model.model_id, "accuracy": model.performance_metrics.accuracy, "training_samples": training_data.metadata.total_samples, "timestamp": chrono::Utc::now().to_rfc3339(), "instance": "ml" }))); output.insert("Metrics".to_string(), metrics); } } } } Ok(output) } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { tracing_subscriber::fmt::init(); println!("🚀 Starting ML Instance"); // Configure distributed network let config = DistributedConfig { network_id: "ml_instance".to_string(), instance_id: "ml_001".to_string(), bind_address: "127.0.0.1".to_string(), bind_port: 9002, discovery_endpoints: vec![], auth_token: Some("ml_token".to_string()), max_connections: 10, heartbeat_interval_ms: 30000, local_network_config: NetworkConfig::default(), }; // Create distributed network let mut network = DistributedNetwork::new(config).await?; // Register local actors network.register_local_actor("feature_engineer", FeatureEngineerActor::new(), None)?; network.register_local_actor("model_trainer", ModelTrainerActor::new(), None)?; network.register_local_actor("logger", DistributedLoggerActor::new(), Some(HashMap::from([ ("instance_name".to_string(), serde_json::json!("ml")), ])))?; network.register_local_actor("metrics", MetricsCollectorActor::new(), None)?; // Start the network network.start().await?; // Get local network for workflow setup { let local_net = network.get_local_network(); let mut net = local_net.write(); // Create workflow connections net.add_connection(reflow_network::connector::Connector { from: reflow_network::connector::ConnectionPoint { actor: "feature_engineer".to_string(), port: "Output".to_string(), ..Default::default() }, to: reflow_network::connector::ConnectionPoint { actor: "model_trainer".to_string(), port: "Input".to_string(), ..Default::default() }, })?; net.add_connection(reflow_network::connector::Connector { from: reflow_network::connector::ConnectionPoint { actor: "feature_engineer".to_string(), port: "Log".to_string(), ..Default::default() }, to: reflow_network::connector::ConnectionPoint { actor: "logger".to_string(), port: "Input".to_string(), ..Default::default() }, })?; net.add_connection(reflow_network::connector::Connector { from: reflow_network::connector::ConnectionPoint { actor: "model_trainer".to_string(), port: "Log".to_string(), ..Default::default() }, to: reflow_network::connector::ConnectionPoint { actor: "logger".to_string(), port: "Input".to_string(), ..Default::default() }, })?; } println!("✅ ML Instance ready on 127.0.0.1:9002"); // Connect to data instance println!("🔌 Connecting to data instance..."); network.connect_to_network("127.0.0.1:9001").await?; // Register remote actors from data instance network.register_remote_actor("data_processor", "data_instance").await?; // Connect data processor to feature engineer { let local_net = network.get_local_network(); let net = local_net.read(); // Note: This would connect via the proxy actor created for data_processor } println!("✅ Connected to data instance"); // Keep running loop { sleep(Duration::from_secs(1)).await; } }
Step 5: API Instance
Create the API serving instance in instances/api/src/main.rs:
#![allow(unused)] fn main() { use reflow_network::{ actor::{Actor, ActorConfig, ActorContext, ActorLoad, MemoryState, Port}, distributed_network::{DistributedConfig, DistributedNetwork}, message::{Message, EncodableValue}, network::NetworkConfig, }; use shared_actors::{DistributedLoggerActor, MetricsCollectorActor}; use shared_types::*; use std::{collections::HashMap, sync::Arc, time::Duration}; use actor_macro::actor; use anyhow::Error; use tokio::time::sleep; /// Prediction service that serves ML predictions #[actor( PredictionServiceActor, inports::<100>(ModelUpdate, PredictionRequest), outports::<50>(PredictionResponse, Log, Metrics), state(MemoryState) )] async fn prediction_service_actor( context: ActorContext, ) -> Result<HashMap<String, Message>, Error> { let payload = context.get_payload(); let state = context.get_state(); let mut output = HashMap::new(); for (port, message) in payload.iter() { match port.as_str() { "ModelUpdate" => { if let Message::Object(obj) = message { if let Ok(model) = serde_json::from_value::<TrainedModel>(obj.as_value().clone()) { // Store the latest model { let mut state_lock = state.lock(); if let Some(state_data) = state_lock.as_mut_any().downcast_mut::<MemoryState>() { state_data.insert("current_model".to_string(), serde_json::to_value(model.clone())?); state_data.insert("model_version".to_string(), serde_json::json!(model.model_id)); } } let log_message = Message::String( format!("Updated prediction model to {} (accuracy: {:.3})", }