Remote Actors
Learn how to register, manage, and interact with remote actors across distributed networks.
Overview
Remote actors allow you to use actors from other Reflow networks as if they were local. This enables:
- Cross-network workflows: Chain actors across multiple networks
- Resource distribution: Use specialized actors on different machines
- Load balancing: Distribute work across multiple network instances
- Service isolation: Keep different services in separate networks
Basic Remote Actor Usage
1. Register Remote Actors
After establishing a network connection, register remote actors:
#![allow(unused)] fn main() { use reflow_network::distributed_network::DistributedNetwork; // Assume networks are already connected let mut client_network = DistributedNetwork::new(client_config).await?; client_network.start().await?; client_network.connect_to_network("server:8080").await?; // Register a remote actor client_network.register_remote_actor( "data_processor", // Remote actor ID "server_network" // Remote network ID ).await?; println!("✅ Remote actor registered as proxy"); }
2. Use Remote Actors in Workflows
Remote actors appear as proxy actors in your local network:
#![allow(unused)] fn main() { // Get local network reference let local_network = client_network.get_local_network(); let mut network = local_network.write(); // Add remote actor to workflow (appears as local) network.add_node("remote_processor", "data_processor@server_network", None)?; // Create workflow with local and remote actors network.add_node("local_generator", "data_generator", None)?; // Connect local actor to remote actor network.add_connection(Connector { from: ConnectionPoint { actor: "local_generator".to_string(), port: "Output".to_string(), ..Default::default() }, to: ConnectionPoint { actor: "remote_processor".to_string(), // This is the remote actor! port: "Input".to_string(), ..Default::default() }, })?; }
3. Send Messages to Remote Actors
Send messages directly to remote actors:
#![allow(unused)] fn main() { use reflow_network::message::Message; // Send message to remote actor client_network.send_to_remote_actor( "server_network", // Target network "data_processor", // Target actor "Input", // Target port Message::String("Hello from client!".to_string().into()) ).await?; }
Advanced Registration
Register with Custom Local Names
Avoid naming conflicts by using custom local names:
#![allow(unused)] fn main() { // Register with custom alias to avoid conflicts let local_alias = client_network.register_remote_actor_with_alias( "server_data_processor", // Custom local name "data_processor", // Remote actor name "server_network" // Remote network ).await?; println!("Remote actor available as: {}", local_alias); }
Batch Registration
Register multiple remote actors at once:
#![allow(unused)] fn main() { let remote_actors = vec![ ("data_processor", "server_network"), ("validator", "server_network"), ("transformer", "processing_network"), ]; for (actor_id, network_id) in remote_actors { match client_network.register_remote_actor(actor_id, network_id).await { Ok(_) => println!("✅ Registered {}/{}", network_id, actor_id), Err(e) => eprintln!("❌ Failed to register {}/{}: {}", network_id, actor_id, e), } } }
Conditional Registration
Register actors based on availability:
#![allow(unused)] fn main() { // Check if network is available before registering if client_network.is_network_connected("server_network").await { client_network.register_remote_actor("data_processor", "server_network").await?; } else { eprintln!("Server network not available, using local fallback"); // Use local actor instead } }
Remote Actor Lifecycle
1. Registration Process
#![allow(unused)] fn main() { // 1. Network connection must be established first client_network.connect_to_network("server:8080").await?; // 2. Register creates a local proxy actor client_network.register_remote_actor("remote_actor", "server_network").await?; // 3. Proxy actor is added to local network let local_network = client_network.get_local_network(); let network = local_network.read(); assert!(network.has_actor("remote_actor@server_network")); }
2. Message Flow
sequenceDiagram
participant LA as Local Actor
participant P as Proxy Actor
participant B as Network Bridge
participant RN as Remote Network
participant RA as Remote Actor
LA->>P: Send Message
P->>B: Forward Message
B->>RN: Network Transport
RN->>RA: Deliver Message
RA->>RN: Response (if any)
RN->>B: Network Transport
B->>P: Forward Response
P->>LA: Deliver Response
3. Cleanup and Unregistration
#![allow(unused)] fn main() { // Unregister remote actor when no longer needed client_network.unregister_remote_actor("data_processor@server_network").await?; // Or unregister all actors from a network client_network.unregister_all_from_network("server_network").await?; }
Error Handling
Registration Errors
#![allow(unused)] fn main() { match client_network.register_remote_actor("processor", "server").await { Ok(_) => println!("✅ Registration successful"), Err(e) => { match e.to_string().as_str() { s if s.contains("Network not connected") => { eprintln!("❌ Must connect to network first"); // Establish connection and retry }, s if s.contains("Actor not found") => { eprintln!("❌ Remote actor 'processor' doesn't exist"); // Check available actors or use different name }, s if s.contains("Name conflict") => { eprintln!("❌ Actor name conflicts with local actor"); // Use different alias or handle conflict }, _ => eprintln!("❌ Registration failed: {}", e), } } } }
Message Delivery Errors
#![allow(unused)] fn main() { match client_network.send_to_remote_actor("server", "processor", "Input", message).await { Ok(_) => println!("✅ Message sent"), Err(e) => { match e.to_string().as_str() { s if s.contains("Network disconnected") => { eprintln!("❌ Connection lost, attempting reconnection..."); // Implement reconnection logic }, s if s.contains("Actor not available") => { eprintln!("❌ Remote actor is not responding"); // Use fallback actor or retry later }, s if s.contains("Timeout") => { eprintln!("❌ Message delivery timed out"); // Implement retry logic }, _ => eprintln!("❌ Message delivery failed: {}", e), } } } }
Performance Considerations
Connection Pooling
#![allow(unused)] fn main() { // Configure connection pooling for better performance let config = DistributedConfig { max_connections: 50, // Pool multiple connections heartbeat_interval_ms: 30000, // Balance between responsiveness and overhead // ... other settings }; }
Message Batching
#![allow(unused)] fn main() { // Send multiple messages efficiently let messages = vec![ ("Input", Message::String("msg1".to_string().into())), ("Input", Message::String("msg2".to_string().into())), ("Input", Message::String("msg3".to_string().into())), ]; // Batch send (if supported by future API) // client_network.send_batch_to_remote_actor("server", "processor", messages).await?; }
Caching and Local Fallbacks
#![allow(unused)] fn main() { // Implement local caching for remote actor responses struct CachedRemoteActor { network: Arc<DistributedNetwork>, cache: Arc<RwLock<HashMap<String, Message>>>, fallback_actor: Option<String>, } impl CachedRemoteActor { async fn send_with_fallback(&self, message: Message) -> Result<Message, anyhow::Error> { // Try remote actor first match self.network.send_to_remote_actor("server", "processor", "Input", message.clone()).await { Ok(response) => Ok(response), Err(_) => { // Fall back to local actor if available if let Some(fallback) = &self.fallback_actor { println!("⚠️ Using local fallback actor: {}", fallback); // Send to local actor instead self.send_to_local_actor(fallback, message).await } else { Err(anyhow::anyhow!("Remote actor unavailable and no fallback configured")) } } } } } }
Remote Actor Discovery
Automatic Discovery
#![allow(unused)] fn main() { // Discover all available actors on a remote network let available_actors = client_network.discover_remote_actors("server_network").await?; for actor_info in available_actors { println!("Available actor: {} (capabilities: {:?})", actor_info.name, actor_info.capabilities ); // Register if it matches our needs if actor_info.capabilities.contains(&"data_processing".to_string()) { client_network.register_remote_actor(&actor_info.name, "server_network").await?; } } }
Selective Registration by Capability
#![allow(unused)] fn main() { // Register only actors with specific capabilities let required_capabilities = vec!["ml_training", "gpu_compute"]; let actors = client_network.discover_remote_actors("ml_cluster").await?; for actor in actors { let has_required_caps = required_capabilities.iter() .any(|cap| actor.capabilities.contains(&cap.to_string())); if has_required_caps { client_network.register_remote_actor(&actor.name, "ml_cluster").await?; println!("✅ Registered ML actor: {}", actor.name); } } }
Monitoring Remote Actors
Health Checking
#![allow(unused)] fn main() { // Check if remote actor is responsive async fn check_remote_actor_health( network: &DistributedNetwork, network_id: &str, actor_id: &str ) -> bool { match network.ping_remote_actor(network_id, actor_id).await { Ok(_) => { println!("✅ Remote actor {}/{} is healthy", network_id, actor_id); true }, Err(e) => { eprintln!("❌ Remote actor {}/{} health check failed: {}", network_id, actor_id, e); false } } } }
Performance Monitoring
#![allow(unused)] fn main() { // Monitor remote actor performance struct RemoteActorMetrics { actor_id: String, network_id: String, total_messages: u64, successful_messages: u64, average_latency_ms: f64, last_response_time: Option<chrono::DateTime<chrono::Utc>>, } impl RemoteActorMetrics { async fn record_message_sent(&mut self) { self.total_messages += 1; // Record timing for latency calculation } async fn record_response_received(&mut self, latency: Duration) { self.successful_messages += 1; self.last_response_time = Some(chrono::Utc::now()); // Update rolling average let latency_ms = latency.as_millis() as f64; self.average_latency_ms = (self.average_latency_ms + latency_ms) / 2.0; } fn success_rate(&self) -> f64 { if self.total_messages == 0 { 0.0 } else { (self.successful_messages as f64) / (self.total_messages as f64) } } } }
Best Practices
1. Network Design
#![allow(unused)] fn main() { // Good: Organize actors by function and location "auth_service@auth_cluster" "data_processor@processing_cluster" "ml_trainer@gpu_cluster" // Avoid: Generic names that don't indicate purpose "actor1@server" "service@network" }
2. Error Resilience
#![allow(unused)] fn main() { // Implement circuit breaker pattern for remote actors struct CircuitBreaker { failure_count: u32, failure_threshold: u32, timeout_duration: Duration, last_failure_time: Option<Instant>, state: CircuitState, } enum CircuitState { Closed, // Normal operation Open, // Failing, don't try HalfOpen, // Testing if service recovered } impl CircuitBreaker { async fn call_remote_actor(&mut self, network: &DistributedNetwork) -> Result<Message, anyhow::Error> { match self.state { CircuitState::Open => { if self.should_attempt_reset() { self.state = CircuitState::HalfOpen; } else { return Err(anyhow::anyhow!("Circuit breaker is open")); } }, _ => {} } match network.send_to_remote_actor("server", "actor", "Input", Message::String("test".to_string().into())).await { Ok(response) => { self.on_success(); Ok(response) }, Err(e) => { self.on_failure(); Err(e) } } } } }
3. Resource Management
#![allow(unused)] fn main() { // Properly clean up remote actor registrations async fn cleanup_remote_actors(network: &mut DistributedNetwork) -> Result<(), anyhow::Error> { // Get list of registered remote actors let remote_actors = network.list_remote_actors().await?; // Unregister all remote actors for (actor_id, network_id) in remote_actors { network.unregister_remote_actor(&format!("{}@{}", actor_id, network_id)).await?; println!("🧹 Unregistered remote actor: {}@{}", actor_id, network_id); } Ok(()) } }
Troubleshooting
Common Issues
-
Remote Actor Not Found
#![allow(unused)] fn main() { // Verify actor exists on remote network let actors = client_network.list_actors_on_network("server_network").await?; println!("Available actors: {:?}", actors); } -
Registration Fails
#![allow(unused)] fn main() { // Check network connection status if !client_network.is_connected_to("server_network").await { client_network.connect_to_network("server:8080").await?; } } -
Messages Not Delivered
#![allow(unused)] fn main() { // Check message serialization let message = Message::String("test".to_string().into()); match serde_json::to_string(&message) { Ok(_) => println!("✅ Message is serializable"), Err(e) => eprintln!("❌ Message serialization failed: {}", e), } } -
High Latency
#![allow(unused)] fn main() { // Monitor network latency let start = Instant::now(); client_network.ping_network("server_network").await?; let latency = start.elapsed(); println!("Network latency: {:?}", latency); }
Next Steps
- Discovery & Registration - Advanced discovery patterns
- Conflict Resolution - Handle actor name conflicts
- Tutorial: Distributed Workflow Example