Discovery & Registration
Learn how to use network discovery services and automatic actor registration in distributed Reflow networks.
Overview
Discovery and registration services enable:
- Automatic network discovery: Find available networks without manual configuration
- Service registration: Advertise your network's capabilities to others
- Dynamic actor discovery: Automatically find and register remote actors
- Health monitoring: Track network and actor availability
- Load balancing: Distribute connections across available instances
Discovery Service Types
1. Built-in Discovery
Use Reflow's built-in discovery where one network acts as a registry:
#![allow(unused)] fn main() { use reflow_network::distributed_network::{DistributedNetwork, DistributedConfig}; // Discovery server (registry) let registry_config = DistributedConfig { network_id: "discovery_registry".to_string(), instance_id: "registry_001".to_string(), bind_address: "0.0.0.0".to_string(), bind_port: 8090, discovery_endpoints: vec![], // Empty - this IS the discovery server // ... other config }; let mut registry_network = DistributedNetwork::new(registry_config).await?; registry_network.start().await?; println!("🔍 Discovery registry started on port 8090"); }
2. Client Networks Using Registry
#![allow(unused)] fn main() { // Client networks connect to registry for discovery let client_config = DistributedConfig { network_id: "worker_network".to_string(), instance_id: "worker_001".to_string(), bind_address: "127.0.0.1".to_string(), bind_port: 8091, discovery_endpoints: vec!["http://registry:8090".to_string()], // ... other config }; let mut client_network = DistributedNetwork::new(client_config).await?; client_network.start().await?; }
3. External Discovery Services
Integrate with external service discovery systems:
#![allow(unused)] fn main() { // Using Consul let consul_config = DistributedConfig { network_id: "consul_client".to_string(), discovery_endpoints: vec![ "http://consul.service.consul:8500/v1/agent/services".to_string() ], // ... other config }; // Using etcd let etcd_config = DistributedConfig { network_id: "etcd_client".to_string(), discovery_endpoints: vec![ "http://etcd.cluster.local:2379/v2/keys/reflow/services".to_string() ], // ... other config }; // Using Kubernetes DNS let k8s_config = DistributedConfig { network_id: "k8s_service".to_string(), discovery_endpoints: vec![ "http://reflow-discovery.default.svc.cluster.local:8080".to_string() ], // ... other config }; }
Network Registration
Basic Registration
Networks automatically register themselves when started:
#![allow(unused)] fn main() { let network_config = DistributedConfig { network_id: "ml_processing_cluster".to_string(), instance_id: "gpu_worker_001".to_string(), bind_address: "0.0.0.0".to_string(), bind_port: 8080, discovery_endpoints: vec!["http://discovery:8090".to_string()], // ... other config }; let mut network = DistributedNetwork::new(network_config).await?; // Registration happens automatically on start network.start().await?; // Network is now registered with discovery service }
Registration with Metadata
Include additional metadata during registration:
#![allow(unused)] fn main() { // Register with capabilities and metadata let registration_metadata = serde_json::json!({ "capabilities": ["ml_training", "gpu_compute", "data_processing"], "resources": { "cpu_cores": 32, "gpu_count": 4, "memory_gb": 128 }, "version": "1.2.0", "tags": ["ml", "gpu", "production"], "health_check_url": "http://worker:8080/health" }); // This metadata is included in registration (implementation detail) // The discovery service can use this for intelligent routing }
Manual Registration Control
Control registration timing and behavior:
#![allow(unused)] fn main() { // Start network without auto-registration let mut network = DistributedNetwork::new(config).await?; network.start().await?; // Perform initialization setup_local_actors(&mut network).await?; run_health_checks(&network).await?; // Register manually when ready network.register_with_discovery().await?; println!("✅ Network registered and ready for connections"); }
Network Discovery
Discover Available Networks
Find networks that are currently available:
#![allow(unused)] fn main() { // Discover all available networks let discovered_networks = client_network.discover_networks().await?; for network_info in discovered_networks { println!("🌐 Found network: {} ({})", network_info.network_id, network_info.endpoint ); println!(" Capabilities: {:?}", network_info.capabilities); println!(" Last seen: {}", network_info.last_seen); } }
Filtered Discovery
Find networks with specific capabilities:
#![allow(unused)] fn main() { // Discover networks with ML capabilities let ml_networks = client_network.discover_networks_with_capability("ml_training").await?; for network in ml_networks { println!("🧠 ML Network: {} at {}", network.network_id, network.endpoint); // Connect to ML networks client_network.connect_to_network(&network.endpoint).await?; } }
Discovery by Tags
Find networks using tag-based filtering:
#![allow(unused)] fn main() { // Discover production GPU networks let gpu_networks = client_network.discover_networks_by_tags(vec!["gpu", "production"]).await?; for network in gpu_networks { if network.is_healthy() { client_network.connect_to_network(&network.endpoint).await?; println!("✅ Connected to GPU network: {}", network.network_id); } } }
Automatic Actor Discovery
Discover Actors on Connected Networks
Once connected to a network, discover its available actors:
#![allow(unused)] fn main() { // Connect to a network first client_network.connect_to_network("ml_cluster:8080").await?; // Discover actors on that network let actors = client_network.discover_actors_on_network("ml_cluster").await?; for actor in actors { println!("🎭 Actor: {} ({})", actor.name, actor.component_type); println!(" Capabilities: {:?}", actor.capabilities); println!(" Ports: in={:?}, out={:?}", actor.inports, actor.outports); } }
Automatic Registration
Register all discovered actors automatically:
#![allow(unused)] fn main() { // Discover and register all compatible actors let discovered_actors = client_network.discover_actors_on_network("data_cluster").await?; for actor in discovered_actors { // Only register actors we can use if actor.capabilities.contains(&"data_processing".to_string()) { match client_network.register_remote_actor(&actor.name, "data_cluster").await { Ok(_) => println!("✅ Registered actor: {}", actor.name), Err(e) => eprintln!("❌ Failed to register {}: {}", actor.name, e), } } } }
Selective Auto-Registration
Register actors based on complex criteria:
#![allow(unused)] fn main() { async fn smart_actor_registration( network: &mut DistributedNetwork, remote_network_id: &str ) -> Result<Vec<String>, anyhow::Error> { let actors = network.discover_actors_on_network(remote_network_id).await?; let mut registered_actors = Vec::new(); for actor in actors { // Complex registration logic let should_register = match actor.component_type.as_str() { "DataProcessorActor" => { // Only register if we don't have local data processors !network.has_local_actor_of_type("DataProcessorActor").await }, "MLTrainerActor" => { // Only register GPU trainers actor.capabilities.contains(&"gpu_compute".to_string()) }, "DatabaseActor" => { // Register if it's a different database type than our local ones let local_dbs = network.get_local_database_types().await; !local_dbs.contains(&actor.get_database_type()) }, _ => false, // Don't auto-register unknown types }; if should_register { let alias = network.register_remote_actor(&actor.name, remote_network_id).await?; registered_actors.push(alias); println!("🤖 Smart-registered: {} as {}", actor.name, alias); } } Ok(registered_actors) } }
Health Monitoring
Network Health Checks
Monitor the health of discovered networks:
#![allow(unused)] fn main() { // Periodic health monitoring async fn monitor_network_health(network: &DistributedNetwork) -> Result<(), anyhow::Error> { let mut interval = tokio::time::interval(Duration::from_secs(30)); loop { interval.tick().await; let connected_networks = network.get_connected_networks().await; for network_id in connected_networks { match network.ping_network(&network_id).await { Ok(latency) => { println!("✅ Network {} healthy ({}ms)", network_id, latency.as_millis()); }, Err(e) => { eprintln!("❌ Network {} unhealthy: {}", network_id, e); // Attempt reconnection if let Ok(network_info) = network.get_network_info(&network_id).await { match network.reconnect_to_network(&network_info.endpoint).await { Ok(_) => println!("🔄 Reconnected to {}", network_id), Err(e) => eprintln!("🔌 Reconnection failed: {}", e), } } } } } } } }
Actor Health Monitoring
Monitor remote actor availability:
#![allow(unused)] fn main() { async fn monitor_remote_actors(network: &DistributedNetwork) -> Result<(), anyhow::Error> { let remote_actors = network.list_registered_remote_actors().await; for (actor_alias, actor_ref) in remote_actors { match network.ping_remote_actor(&actor_ref.network_id, &actor_ref.actor_id).await { Ok(_) => { println!("✅ Remote actor {} is responsive", actor_alias); }, Err(e) => { eprintln!("❌ Remote actor {} is unresponsive: {}", actor_alias, e); // Try to re-register the actor match network.refresh_remote_actor(&actor_alias).await { Ok(_) => println!("🔄 Refreshed remote actor: {}", actor_alias), Err(e) => { eprintln!("🚫 Failed to refresh {}: {}", actor_alias, e); // Consider removing the actor or marking it as unavailable } } } } } Ok(()) } }
Load Balancing and Failover
Discover Multiple Instances
Find multiple instances of the same service:
#![allow(unused)] fn main() { // Find all instances of a specific service type let data_processors = client_network.discover_networks_with_capability("data_processing").await?; println!("Found {} data processing networks:", data_processors.len()); for (i, network) in data_processors.iter().enumerate() { println!(" {}. {} at {} (load: {}%)", i + 1, network.network_id, network.endpoint, network.cpu_usage.unwrap_or(0.0) ); } }
Load-Balanced Registration
Register actors from multiple networks for load balancing:
#![allow(unused)] fn main() { // Register the same actor type from multiple networks let processing_networks = vec!["cluster_1", "cluster_2", "cluster_3"]; for (i, network_id) in processing_networks.iter().enumerate() { if client_network.is_network_available(network_id).await { let alias = format!("data_processor_{}", i + 1); client_network.register_remote_actor_with_alias( &alias, "data_processor", network_id ).await?; println!("⚖️ Registered load-balanced actor: {}", alias); } } }
Failover Registration
Implement failover with primary and backup actors:
#![allow(unused)] fn main() { struct FailoverActorRegistry { network: Arc<DistributedNetwork>, primary_actors: HashMap<String, String>, // service -> primary actor alias backup_actors: HashMap<String, Vec<String>>, // service -> backup actor aliases } impl FailoverActorRegistry { async fn register_with_failover(&mut self, service_name: &str, actor_type: &str ) -> Result<(), anyhow::Error> { let networks = self.network.discover_networks_with_capability(actor_type).await?; if networks.is_empty() { return Err(anyhow::anyhow!("No networks found with capability: {}", actor_type)); } // Primary: Use the network with lowest load let primary_network = networks.iter() .min_by(|a, b| a.cpu_usage.partial_cmp(&b.cpu_usage).unwrap()) .unwrap(); let primary_alias = format!("{}_primary", service_name); self.network.register_remote_actor_with_alias( &primary_alias, actor_type, &primary_network.network_id ).await?; self.primary_actors.insert(service_name.to_string(), primary_alias); // Backups: Register from other networks let mut backup_aliases = Vec::new(); for (i, network) in networks.iter().skip(1).enumerate() { let backup_alias = format!("{}_backup_{}", service_name, i + 1); self.network.register_remote_actor_with_alias( &backup_alias, actor_type, &network.network_id ).await?; backup_aliases.push(backup_alias); } self.backup_actors.insert(service_name.to_string(), backup_aliases); println!("🛡️ Registered failover service: {} with {} backups", service_name, backup_aliases.len()); Ok(()) } async fn handle_primary_failure(&self, service_name: &str) -> Result<String, anyhow::Error> { if let Some(backups) = self.backup_actors.get(service_name) { if let Some(first_backup) = backups.first() { // Promote first backup to primary println!("🔄 Promoting backup to primary for service: {}", service_name); return Ok(first_backup.clone()); } } Err(anyhow::anyhow!("No backup available for service: {}", service_name)) } } }
Configuration Management
Discovery Service Configuration
Configure discovery service behavior:
#![allow(unused)] fn main() { #[derive(Debug, Clone)] pub struct DiscoveryConfig { pub refresh_interval_ms: u64, pub health_check_interval_ms: u64, pub max_discovery_retries: u32, pub discovery_timeout_ms: u64, pub enable_auto_registration: bool, pub registration_metadata: serde_json::Value, } impl Default for DiscoveryConfig { fn default() -> Self { DiscoveryConfig { refresh_interval_ms: 30000, // 30 seconds health_check_interval_ms: 15000, // 15 seconds max_discovery_retries: 3, discovery_timeout_ms: 5000, // 5 seconds enable_auto_registration: true, registration_metadata: serde_json::json!({ "version": "1.0.0", "capabilities": [] }), } } } }
Environment-Specific Discovery
Configure discovery for different environments:
#![allow(unused)] fn main() { fn create_discovery_config(environment: &str) -> DiscoveryConfig { match environment { "development" => DiscoveryConfig { refresh_interval_ms: 10000, // Faster refresh for dev health_check_interval_ms: 5000, discovery_timeout_ms: 2000, // Shorter timeout enable_auto_registration: true, registration_metadata: serde_json::json!({ "environment": "development", "auto_discovery": true }), ..Default::default() }, "production" => DiscoveryConfig { refresh_interval_ms: 60000, // Slower refresh for prod health_check_interval_ms: 30000, discovery_timeout_ms: 10000, // Longer timeout enable_auto_registration: false, // Manual control registration_metadata: serde_json::json!({ "environment": "production", "manual_registration": true }), ..Default::default() }, _ => DiscoveryConfig::default(), } } }
Error Handling
Discovery Errors
Handle common discovery and registration errors:
#![allow(unused)] fn main() { async fn robust_discovery(network: &DistributedNetwork) -> Result<Vec<NetworkInfo>, anyhow::Error> { let mut retries = 3; let mut last_error = None; while retries > 0 { match network.discover_networks().await { Ok(networks) => { if networks.is_empty() { println!("⚠️ No networks discovered, retrying..."); } else { return Ok(networks); } }, Err(e) => { eprintln!("❌ Discovery attempt failed: {}", e); last_error = Some(e); // Wait before retry tokio::time::sleep(Duration::from_secs(2)).await; } } retries -= 1; } Err(last_error.unwrap_or_else(|| anyhow::anyhow!("Discovery failed after retries"))) } }
Registration Conflicts
Handle registration conflicts gracefully:
#![allow(unused)] fn main() { async fn safe_actor_registration( network: &mut DistributedNetwork, actor_name: &str, remote_network: &str ) -> Result<String, anyhow::Error> { match network.register_remote_actor(actor_name, remote_network).await { Ok(alias) => Ok(alias), Err(e) if e.to_string().contains("name conflict") => { // Try with numbered suffix for i in 1..=10 { let attempt_name = format!("{}_{}", actor_name, i); match network.register_remote_actor_with_alias( &attempt_name, actor_name, remote_network ).await { Ok(alias) => { println!("✅ Registered with conflict resolution: {}", alias); return Ok(alias); }, Err(_) => continue, } } Err(anyhow::anyhow!("Could not resolve naming conflict for: {}", actor_name)) }, Err(e) => Err(e), } } }
Best Practices
1. Discovery Strategy
#![allow(unused)] fn main() { // Good: Use hierarchical discovery with fallbacks let discovery_endpoints = vec![ "http://local-discovery:8090", // Local first "http://regional-discovery:8090", // Regional second "http://global-discovery:8090", // Global fallback ]; // Configure discovery timeouts appropriately let config = DiscoveryConfig { discovery_timeout_ms: 5000, // 5 seconds max max_discovery_retries: 3, // Try 3 times refresh_interval_ms: 30000, // Refresh every 30s ..Default::default() }; }
2. Health Monitoring
#![allow(unused)] fn main() { // Implement comprehensive health monitoring async fn comprehensive_health_check(network: &DistributedNetwork) -> HealthStatus { let mut status = HealthStatus::new(); // Check discovery service connectivity status.discovery_healthy = network.ping_discovery_service().await.is_ok(); // Check connected networks let networks = network.get_connected_networks().await; for network_id in networks { let network_healthy = network.ping_network(&network_id).await.is_ok(); status.network_health.insert(network_id, network_healthy); } // Check remote actors let actors = network.list_registered_remote_actors().await; for (alias, actor_ref) in actors { let actor_healthy = network.ping_remote_actor( &actor_ref.network_id, &actor_ref.actor_id ).await.is_ok(); status.actor_health.insert(alias, actor_healthy); } status } }
3. Resource Cleanup
#![allow(unused)] fn main() { // Proper cleanup on shutdown async fn graceful_shutdown(mut network: DistributedNetwork) -> Result<(), anyhow::Error> { // Stop discovery refresh network.stop_discovery_refresh().await?; // Unregister from discovery service network.unregister_from_discovery().await?; // Clean up remote actor registrations let remote_actors = network.list_registered_remote_actors().await; for (alias, _) in remote_actors { network.unregister_remote_actor(&alias).await?; } // Disconnect from all networks let connected = network.get_connected_networks().await; for network_id in connected { network.disconnect_from_network(&network_id).await?; } // Finally shutdown the network network.shutdown().await?; Ok(()) } }
Integration Examples
Docker Swarm Integration
# docker-compose.yml
version: '3.8'
services:
reflow-discovery:
image: reflow:latest
command: --mode discovery --port 8090
ports:
- "8090:8090"
deploy:
replicas: 1
reflow-worker:
image: reflow:latest
command: --mode worker --discovery http://reflow-discovery:8090
deploy:
replicas: 3
depends_on:
- reflow-discovery
Kubernetes Integration
# reflow-discovery-service.yaml
apiVersion: v1
kind: Service
metadata:
name: reflow-discovery
spec:
selector:
app: reflow-discovery
ports:
- port: 8090
targetPort: 8090
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: reflow-discovery
spec:
replicas: 1
selector:
matchLabels:
app: reflow-discovery
template:
metadata:
labels:
app: reflow-discovery
spec:
containers:
- name: reflow
image: reflow:latest
args: ["--mode", "discovery", "--port", "8090"]
ports:
- containerPort: 8090
Next Steps
- Conflict Resolution - Handle actor name conflicts
- Setting Up Distributed Networks - Basic network setup
- Remote Actors - Working with remote actors