Distributed Networks
Reflow's distributed network system enables bi-directional communication between separate Reflow instances, allowing you to build scalable, multi-node workflows while maintaining the familiar actor-based programming model.
Overview
The distributed network architecture extends Reflow's local actor model to support remote communication across network boundaries. This enables:
- Cross-Network Actor Communication: Actors in one Reflow instance can send messages to actors in remote instances
- Network-Transparent Operation: Remote actors appear as local actors in your workflows
- Bi-directional Message Flow: Full duplex communication between distributed nodes
- Automatic Discovery: Networks can discover and register with each other automatically
- Conflict Resolution: Smart handling of actor name conflicts across networks
Architecture Components
┌─────────────────────────────────────────────────────────────────────┐
│ Distributed Reflow Network │
├─────────────────────────────────────────────────────────────────────┤
│ Instance A (Server) │ Instance B (Client) │
│ ┌─────────────────────────────┐ │ ┌─────────────────────────────────┐ │
│ │ Local Network │ │ │ Local Network │ │
│ │ ├─ Actor A1 ─┐ │ │ │ ├─ Actor B1 ─┐ │ │
│ │ ├─ Actor A2 ─┤ │ │ │ ├─ Actor B2 ─┤ │ │
│ │ └─ Actor A3 ─┘ │ │ │ └─ Actor B3 ─┘ │ │
│ └─────────────────────────────┘ │ └─────────────────────────────────┘ │
│ │ │ │ │
│ ┌─────────────────────────────┐ │ ┌─────────────────────────────────┐ │
│ │ Network Bridge │◄─┤ │ Network Bridge │ │
│ │ ├─ Discovery Service │ │ │ ├─ Discovery Service │ │
│ │ ├─ Message Router │ │ │ ├─ Message Router │ │
│ │ ├─ Connection Manager │ │ │ ├─ Connection Manager │ │
│ │ └─ Remote Actor Proxy │ │ │ └─ Remote Actor Proxy │ │
│ └─────────────────────────────┘ │ └─────────────────────────────────┘ │
│ │ │ │ │
│ ┌─────────────────────────────┐ │ ┌─────────────────────────────────┐ │
│ │ Transport Layer │◄─┤ │ Transport Layer │ │
│ │ ├─ WebSocket/TCP Server │ │ │ ├─ WebSocket/TCP Client │ │
│ │ ├─ Protocol Handler │ │ │ ├─ Protocol Handler │ │
│ │ └─ Serialization │ │ │ └─ Serialization │ │
│ └─────────────────────────────┘ │ └─────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Core Components
- DistributedNetwork: Main orchestrator that combines local networks with distributed communication
- NetworkBridge: Handles all cross-network communication and actor registration
- DiscoveryService: Automatic network discovery and registration
- MessageRouter: Routes messages between local and remote actors
- RemoteActorProxy: Local representatives of remote actors
- TransportLayer: WebSocket/TCP communication infrastructure
Basic Setup
Creating a Distributed Network
#![allow(unused)] fn main() { use reflow_network::distributed_network::{DistributedNetwork, DistributedConfig}; use reflow_network::network::NetworkConfig; // Configure the distributed network let config = DistributedConfig { network_id: "main_workflow_engine".to_string(), instance_id: "server_001".to_string(), bind_address: "0.0.0.0".to_string(), bind_port: 8080, discovery_endpoints: vec![ "http://discovery.example.com:3000".to_string() ], auth_token: Some("secure_token".to_string()), max_connections: 100, heartbeat_interval_ms: 30000, local_network_config: NetworkConfig::default(), }; // Create and start the distributed network let mut distributed_network = DistributedNetwork::new(config).await?; distributed_network.start().await?; }
Registering Local Actors
#![allow(unused)] fn main() { use your_actors::DataProcessorActor; // Register actors that will be available to remote networks distributed_network.register_local_actor( "data_processor", DataProcessorActor::new(), Some(HashMap::from([ ("capability".to_string(), serde_json::Value::String("data_processing".to_string())), ("version".to_string(), serde_json::Value::String("1.0.0".to_string())), ])) )?; }
Connecting to Remote Networks
#![allow(unused)] fn main() { // Connect to another network distributed_network.connect_to_network("192.168.1.100:8080").await?; // Register a remote actor for local use distributed_network.register_remote_actor( "remote_validator", // Remote actor ID "validation_network" // Remote network ID ).await?; }
Actor Communication Patterns
Direct Remote Messaging
#![allow(unused)] fn main() { use reflow_network::message::Message; // Send message to remote actor distributed_network.send_to_remote_actor( "validation_network", // Target network "remote_validator", // Target actor "input", // Target port Message::String("validate this data".to_string().into()) ).await?; }
Workflow Integration
Remote actors integrate seamlessly into local workflows:
#![allow(unused)] fn main() { // Get local network handle let local_network = distributed_network.get_local_network(); let mut network = local_network.write(); // Add local actor network.add_node("local_collector", "data_collector", None)?; // Add remote actor (appears as local) network.add_node("remote_processor", "remote_validator@validation_network", None)?; // Connect them in a workflow network.add_connection(Connector { from: ConnectionPoint { actor: "local_collector".to_string(), port: "output".to_string(), ..Default::default() }, to: ConnectionPoint { actor: "remote_processor".to_string(), port: "input".to_string(), ..Default::default() }, })?; }
Network Discovery
Automatic Discovery
The discovery service can automatically find and register remote networks:
#![allow(unused)] fn main() { // Enable automatic discovery let config = DistributedConfig { // ... other config discovery_endpoints: vec![ "http://service-discovery.local:3000".to_string(), "http://backup-discovery.local:3000".to_string(), ], // ... }; }
Manual Network Registration
#![allow(unused)] fn main() { // Manually connect to specific networks let networks_to_connect = vec![ "analytics.company.com:8080", "ml-pipeline.company.com:8080", "data-warehouse.company.com:8080", ]; for endpoint in networks_to_connect { match distributed_network.connect_to_network(endpoint).await { Ok(_) => println!("Connected to {}", endpoint), Err(e) => eprintln!("Failed to connect to {}: {}", endpoint, e), } } }
Conflict Resolution
When multiple networks have actors with the same name, Reflow provides several resolution strategies:
Automatic Aliasing
#![allow(unused)] fn main() { // Register remote actor with automatic conflict resolution let alias = distributed_network.register_remote_actor_with_strategy( "data_processor", // Remote actor name (conflicts with local) "analytics_network", // Remote network ConflictResolutionStrategy::AutoAlias // Strategy ).await?; println!("Remote actor available as: {}", alias); // Output: "Remote actor available as: analytics_network_data_processor" }
Manual Aliasing
#![allow(unused)] fn main() { // Provide custom aliases for clarity distributed_network.register_remote_actor_with_strategy( "validator", "security_network", ConflictResolutionStrategy::ManualAlias("security_validator".to_string()) ).await?; }
Security Considerations
Authentication
#![allow(unused)] fn main() { let config = DistributedConfig { // Use authentication tokens auth_token: Some("your_secure_token_here".to_string()), // ... other config }; }
Network Isolation
#![allow(unused)] fn main() { // Restrict which networks can connect let config = DistributedConfig { // Only allow specific discovery endpoints discovery_endpoints: vec![ "https://trusted-discovery.company.com:3000".to_string() ], max_connections: 10, // Limit concurrent connections // ... other config }; }
Monitoring and Health Checks
Connection Status
#![allow(unused)] fn main() { // Check network health let bridge_status = distributed_network.get_bridge_status().await?; println!("Connected networks: {}", bridge_status.connected_networks.len()); for (network_id, status) in &bridge_status.connected_networks { println!(" {}: {:?}", network_id, status); } }
Heartbeat Monitoring
#![allow(unused)] fn main() { let config = DistributedConfig { heartbeat_interval_ms: 15000, // 15 second heartbeats // ... other config }; }
Error Handling
Connection Failures
#![allow(unused)] fn main() { use reflow_network::distributed_network::DistributedError; match distributed_network.connect_to_network("unreachable:8080").await { Ok(_) => println!("Connected successfully"), Err(DistributedError::ConnectionTimeout) => { eprintln!("Connection timed out - network may be down"); }, Err(DistributedError::AuthenticationFailed) => { eprintln!("Authentication failed - check token"); }, Err(e) => eprintln!("Other error: {}", e), } }
Message Delivery Failures
#![allow(unused)] fn main() { // Messages automatically retry with backoff match distributed_network.send_to_remote_actor( "target_network", "target_actor", "input", message ).await { Ok(_) => println!("Message sent successfully"), Err(e) => { eprintln!("Failed to send message: {}", e); // Message will be retried automatically } } }
Performance Considerations
Connection Pooling
#![allow(unused)] fn main() { let config = DistributedConfig { max_connections: 50, // Adjust based on load // ... other config }; }
Message Batching
Messages are automatically batched for efficiency, but you can tune batching behavior:
#![allow(unused)] fn main() { // Large messages are automatically compressed let large_data = Message::Object(/* large JSON object */); distributed_network.send_to_remote_actor( "target_network", "target_actor", "bulk_input", large_data ).await?; }
Best Practices
Network Design
- Use Descriptive Network IDs: Choose meaningful names like
analytics_clusterinstead ofnetwork1 - Plan for Conflicts: Use descriptive actor names to minimize naming conflicts
- Group Related Services: Co-locate related actors in the same network for efficiency
- Design for Failure: Always handle network partitions and connection failures gracefully
Actor Organization
#![allow(unused)] fn main() { // Good: Descriptive, specific names distributed_network.register_local_actor("customer_data_validator", validator, None)?; distributed_network.register_local_actor("payment_processor", processor, None)?; // Avoid: Generic names likely to conflict // distributed_network.register_local_actor("validator", validator, None)?; // distributed_network.register_local_actor("processor", processor, None)?; }
Resource Management
#![allow(unused)] fn main() { // Always clean up connections struct DistributedWorkflow { network: DistributedNetwork, } impl Drop for DistributedWorkflow { fn drop(&mut self) { // Gracefully shutdown connections if let Err(e) = tokio::task::block_in_place(|| { tokio::runtime::Handle::current().block_on(self.network.shutdown()) }) { eprintln!("Error during cleanup: {}", e); } } } }
Troubleshooting
Common Issues
- Connection Refused: Check firewall settings and ensure target network is running
- Authentication Failed: Verify auth tokens match between networks
- Actor Not Found: Ensure remote actor is registered and network is connected
- Message Timeouts: Check network latency and increase timeout values if needed
Debug Logging
Enable detailed logging for troubleshooting:
#![allow(unused)] fn main() { use tracing_subscriber; // Enable debug logging tracing_subscriber::fmt() .with_max_level(tracing::Level::DEBUG) .init(); }
Health Check Endpoint
Networks automatically expose health endpoints:
# Check network health
curl http://your-network:8080/health
# Get network status
curl http://your-network:8080/status
PeerMesh — Server-Side Distributed Orchestration
When running as a Reflow server node connected to Zeal, the PeerMesh manages peer-to-peer connections for distributed execution. It creates one DistributedNetwork per execution and responds to orchestration commands from Zeal.
Architecture
#![allow(unused)] fn main() { pub struct PeerMesh { networks: RwLock<HashMap<String, DistributedNetwork>>, node_id: String, bind_address: String, base_port: u16, } }
The PeerMesh:
- Creates a
DistributedNetworkper execution, each binding on an incrementing port - Responds to
subgraph.assigncommands from Zeal (via ZipSession) to take ownership of subgraph execution - Responds to
peer.connectcommands to establish peer-to-peer links between nodes - Tears down per-execution networks on completion
Integration with Zeal
When Zeal orchestrates a distributed workflow:
- Zeal sends
subgraph.assignto each Reflow node via the ZIP WebSocket - The PeerMesh creates a
DistributedNetworkfor the assigned execution - Zeal sends
peer.connectto establish connections between nodes - The PeerMesh calls
connect_peer()to link networks via WebSocket bridges - Remote actors are registered as
RemoteActorProxyinstances in the local network - On execution completion,
teardown_execution()cleans up the distributed network
#![allow(unused)] fn main() { // PeerMesh responds to Zeal commands peer_mesh.connect_peer(execution_id, peer_address).await?; peer_mesh.register_remote_actor(execution_id, actor_id, network_id).await?; peer_mesh.teardown_execution(execution_id).await; }
Distributed Composition Planning
For workflows spanning multiple Reflow nodes, the DistributedComposition system plans execution across network boundaries:
#![allow(unused)] fn main() { pub struct DistributedGraphComposition { pub local_sources: Vec<GraphSource>, pub remote_sources: Vec<RemoteGraphConfig>, pub local_connections: Vec<CompositionConnection>, pub distributed_connections: Vec<DistributedConnection>, pub execution_targets: HashMap<String, String>, // graph → network_id } }
The DistributedNamespaceResolver maps processes to their home networks using qualified names like {network_id}/{namespace}/{process}:
#![allow(unused)] fn main() { let mut resolver = DistributedNamespaceResolver::new(); resolver.register_local_graph("data_pipeline", &graph)?; resolver.register_remote_graph("ml_pipeline", "ml_node_1", &remote_graph)?; // Find edges that cross network boundaries let cross_edges = resolver.find_cross_network_connections(&connections)?; }
The planner produces a DistributedCompositionPlan with:
local_composition— the graph to execute on this nodeproxy_actors—ProxyActorSpecentries for actors that proxy to remote networkscross_network_edges— connections requiring proxy bridgesremote_executions— graphs delegated to other nodes
Next Steps
- Multi-Graph Composition - Workspace discovery and graph composition
- SubgraphActor - Hierarchical graph composition
- Graph System - Core graph operations
- Remote Actors - Detailed remote actor API
- Discovery & Registration - Network discovery details
- Zeal IDE Integration - ZIP session and orchestration commands
- Distributed Workflow Tutorial - Step-by-step example