ActorConfig Migration Guide
This guide helps you migrate existing actors from the old HashMap-based configuration approach to the new ActorConfig system, providing a smooth transition path with minimal breaking changes.
Migration Overview
The ActorConfig system replaces the previous set_config(HashMap<String, serde_json::Value>) method with a more robust create_process(ActorConfig) approach that provides:
- Type Safety: Strongly typed configuration with validation
- Better Error Handling: Clear configuration error messages
- Dynamic Updates: Runtime configuration changes
- Multiple Sources: Support for JSON, YAML, environment variables
- Schema Validation: Built-in validation and defaults
Quick Migration Steps
1. Update Actor Trait Implementation
Before (Old Pattern):
#![allow(unused)] fn main() { use reflow_network::actor::{Actor, ActorContext}; use std::collections::HashMap; pub struct DataProcessor { batch_size: usize, timeout: Duration, enable_retry: bool, } impl Actor for DataProcessor { fn set_config(&mut self, config: HashMap<String, serde_json::Value>) { self.batch_size = config.get("batch_size") .and_then(|v| v.as_f64()) .unwrap_or(10.0) as usize; self.timeout = Duration::from_millis( config.get("timeout_ms") .and_then(|v| v.as_f64()) .unwrap_or(5000.0) as u64 ); self.enable_retry = config.get("enable_retry") .and_then(|v| v.as_bool()) .unwrap_or(true); } async fn run(&self, context: ActorContext) -> Result<HashMap<String, Message>, anyhow::Error> { // Actor logic using self.batch_size, self.timeout, etc. // ... } } }
After (New Pattern):
#![allow(unused)] fn main() { use reflow_network::actor::{Actor, ActorConfig, ActorContext}; use std::collections::HashMap; pub struct DataProcessor; impl DataProcessor { pub fn new() -> Self { Self } } impl Actor for DataProcessor { fn create_process(&self, config: ActorConfig) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { // Extract configuration values let batch_size = config.get_number("batch_size").unwrap_or(10.0) as usize; let timeout = Duration::from_millis(config.get_number("timeout_ms").unwrap_or(5000.0) as u64); let enable_retry = config.get_boolean("enable_retry").unwrap_or(true); Box::pin(async move { // Actor logic using configuration values // ... }) } // Remove the old set_config method // fn set_config(&mut self, config: HashMap<String, serde_json::Value>) { ... } // Remove the old run method // async fn run(&self, context: ActorContext) -> Result<HashMap<String, Message>, anyhow::Error> { ... } } }
2. Update Actor Registration
Before:
#![allow(unused)] fn main() { let mut network = Network::new(); let mut processor = DataProcessor::new(); // Configure actor with HashMap let config = HashMap::from([ ("batch_size".to_string(), serde_json::Value::Number(50.into())), ("timeout_ms".to_string(), serde_json::Value::Number(10000.into())), ("enable_retry".to_string(), serde_json::Value::Bool(false)), ]); processor.set_config(config); network.register_actor("processor", processor)?; }
After:
#![allow(unused)] fn main() { let mut network = Network::new(); let processor = DataProcessor::new(); // Configuration is provided when adding to network let config = ActorConfig::from_json(r#" { "batch_size": 50, "timeout_ms": 10000, "enable_retry": false } "#)?; network.register_actor("processor", processor)?; network.add_node_with_config("processor", "processor", Some(config))?; }
Migration Patterns
Pattern 1: Simple State-Based Actor
Before:
#![allow(unused)] fn main() { struct TimerActor { interval_ms: u64, max_ticks: Option<u64>, current_ticks: u64, } impl Actor for TimerActor { fn set_config(&mut self, config: HashMap<String, serde_json::Value>) { self.interval_ms = config.get("interval_ms") .and_then(|v| v.as_f64()) .unwrap_or(1000.0) as u64; self.max_ticks = config.get("max_ticks") .and_then(|v| v.as_f64()) .map(|v| v as u64); } async fn run(&self, context: ActorContext) -> Result<HashMap<String, Message>, anyhow::Error> { let mut output = HashMap::new(); if self.current_ticks < self.max_ticks.unwrap_or(u64::MAX) { // Emit tick output.insert("tick".to_string(), Message::Integer(self.current_ticks as i64)); } Ok(output) } } }
After:
#![allow(unused)] fn main() { struct TimerActor; impl Actor for TimerActor { fn create_process(&self, config: ActorConfig) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { let interval_ms = config.get_number("interval_ms").unwrap_or(1000.0) as u64; let max_ticks = config.get_number("max_ticks").map(|v| v as u64); Box::pin(async move { let mut current_ticks = 0u64; let interval = Duration::from_millis(interval_ms); loop { if let Some(max) = max_ticks { if current_ticks >= max { break; } } // Emit tick current_ticks += 1; tokio::time::sleep(interval).await; } }) } } }
Pattern 2: Complex Configuration with Validation
Before:
#![allow(unused)] fn main() { struct DatabaseActor { connection_string: String, pool_size: u32, query_timeout: Duration, } impl Actor for DatabaseActor { fn set_config(&mut self, config: HashMap<String, serde_json::Value>) { self.connection_string = config.get("connection_string") .and_then(|v| v.as_str()) .unwrap_or("postgresql://localhost/db") .to_string(); let pool_size = config.get("pool_size") .and_then(|v| v.as_f64()) .unwrap_or(10.0) as u32; // Manual validation self.pool_size = if pool_size > 0 && pool_size <= 100 { pool_size } else { eprintln!("Invalid pool_size {}, using default", pool_size); 10 }; self.query_timeout = Duration::from_millis( config.get("query_timeout_ms") .and_then(|v| v.as_f64()) .unwrap_or(30000.0) as u64 ); } } }
After (with typed configuration):
#![allow(unused)] fn main() { use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] struct DatabaseConfig { #[serde(default = "default_connection_string")] connection_string: String, #[serde(default = "default_pool_size")] pool_size: u32, #[serde(default = "default_query_timeout")] query_timeout_ms: u64, } fn default_connection_string() -> String { "postgresql://localhost/db".to_string() } fn default_pool_size() -> u32 { 10 } fn default_query_timeout() -> u64 { 30000 } impl ActorConfigSchema for DatabaseConfig { fn validate(&self) -> Result<(), String> { if self.connection_string.is_empty() { return Err("connection_string cannot be empty".to_string()); } if self.pool_size == 0 || self.pool_size > 100 { return Err("pool_size must be between 1 and 100".to_string()); } if self.query_timeout_ms == 0 { return Err("query_timeout_ms must be positive".to_string()); } Ok(()) } } struct DatabaseActor; impl Actor for DatabaseActor { fn create_process(&self, config: ActorConfig) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { // Parse and validate configuration let db_config: DatabaseConfig = config.parse_typed().expect("Invalid configuration"); let connection_string = db_config.connection_string; let pool_size = db_config.pool_size; let query_timeout = Duration::from_millis(db_config.query_timeout_ms); Box::pin(async move { // Database actor implementation // Configuration is guaranteed to be valid }) } } }
Pattern 3: Actors with Complex State Management
Before:
#![allow(unused)] fn main() { struct StatefulProcessor { state: Arc<Mutex<ProcessorState>>, config: ProcessorConfig, } #[derive(Clone)] struct ProcessorConfig { batch_size: usize, processing_mode: ProcessingMode, } impl Actor for StatefulProcessor { fn set_config(&mut self, config: HashMap<String, serde_json::Value>) { self.config.batch_size = config.get("batch_size") .and_then(|v| v.as_f64()) .unwrap_or(10.0) as usize; let mode_str = config.get("processing_mode") .and_then(|v| v.as_str()) .unwrap_or("sequential"); self.config.processing_mode = match mode_str { "parallel" => ProcessingMode::Parallel, "batch" => ProcessingMode::Batch, _ => ProcessingMode::Sequential, }; } async fn run(&self, context: ActorContext) -> Result<HashMap<String, Message>, anyhow::Error> { // Use self.config and self.state // ... } } }
After:
#![allow(unused)] fn main() { struct StatefulProcessor; impl Actor for StatefulProcessor { fn create_process(&self, config: ActorConfig) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { let batch_size = config.get_number("batch_size").unwrap_or(10.0) as usize; let processing_mode = match config.get_string("processing_mode").as_deref() { Some("parallel") => ProcessingMode::Parallel, Some("batch") => ProcessingMode::Batch, _ => ProcessingMode::Sequential, }; Box::pin(async move { // Create state inside the process let state = Arc::new(Mutex::new(ProcessorState::new())); // Actor implementation with local state // ... }) } } }
Graph Migration
Updating Graph Definitions
Before:
{
"processes": {
"processor": {
"component": "DataProcessor",
"metadata": {
"batch_size": 50,
"timeout_ms": 10000,
"enable_retry": false
}
}
}
}
After:
{
"processes": {
"processor": {
"component": "DataProcessor",
"metadata": {
"config": {
"batch_size": 50,
"timeout_ms": 10000,
"enable_retry": false
}
}
}
}
}
The configuration is now nested under a "config" key in the metadata, which the system automatically extracts and converts to an ActorConfig.
Migration Utilities
Automatic Configuration Migration
#![allow(unused)] fn main() { use reflow_network::actor::ActorConfig; // Helper to migrate old graph metadata format pub fn migrate_graph_metadata(old_metadata: &serde_json::Value) -> serde_json::Value { if let Some(obj) = old_metadata.as_object() { // Check if it already has a "config" key if obj.contains_key("config") { return old_metadata.clone(); // Already migrated } // Wrap existing metadata in "config" key let mut new_metadata = serde_json::Map::new(); new_metadata.insert("config".to_string(), old_metadata.clone()); serde_json::Value::Object(new_metadata) } else { old_metadata.clone() } } // Helper to migrate legacy HashMap config to ActorConfig impl ActorConfig { pub fn from_legacy_hashmap(legacy: HashMap<String, serde_json::Value>) -> Self { let mut config = ActorConfig::default(); for (key, value) in legacy { config.set(&key, value); } config } } }
Migration Script
// migration_script.rs - Tool to migrate existing graph files use std::path::Path; use tokio::fs; pub async fn migrate_graph_file(path: &Path) -> Result<(), Box<dyn std::error::Error>> { let content = fs::read_to_string(path).await?; let mut graph: serde_json::Value = serde_json::from_str(&content)?; // Migrate processes metadata if let Some(processes) = graph.get_mut("processes").and_then(|p| p.as_object_mut()) { for (_, process) in processes.iter_mut() { if let Some(metadata) = process.get_mut("metadata") { *metadata = migrate_graph_metadata(metadata); } } } // Write back the migrated graph let migrated_content = serde_json::to_string_pretty(&graph)?; fs::write(path, migrated_content).await?; println!("Migrated: {}", path.display()); Ok(()) } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let graph_files = glob::glob("**/*.graph.json")?; for entry in graph_files { if let Ok(path) = entry { migrate_graph_file(&path).await?; } } println!("Migration completed!"); Ok(()) }
Backward Compatibility
Temporary Compatibility Layer
If you need to maintain compatibility with old and new systems during migration:
#![allow(unused)] fn main() { use reflow_network::actor::ActorConfig; pub struct CompatibilityActor { // Store configuration in both formats during transition legacy_config: Option<HashMap<String, serde_json::Value>>, actor_config: Option<ActorConfig>, } impl CompatibilityActor { pub fn new() -> Self { Self { legacy_config: None, actor_config: None, } } // Helper to get config value from either format fn get_config_value<T>(&self, key: &str) -> Option<T> where T: serde::de::DeserializeOwned + Clone, { // Try new format first if let Some(config) = &self.actor_config { if let Ok(value) = config.get::<T>(key) { return Some(value); } } // Fall back to legacy format if let Some(legacy) = &self.legacy_config { if let Some(value) = legacy.get(key) { if let Ok(parsed) = serde_json::from_value::<T>(value.clone()) { return Some(parsed); } } } None } } impl Actor for CompatibilityActor { // Support old method during transition fn set_config(&mut self, config: HashMap<String, serde_json::Value>) { self.legacy_config = Some(config); } // Implement new method fn create_process(&self, config: ActorConfig) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { // Use helper method to get values from either format let batch_size = self.get_config_value::<f64>("batch_size").unwrap_or(10.0) as usize; let timeout_ms = self.get_config_value::<f64>("timeout_ms").unwrap_or(5000.0) as u64; Box::pin(async move { // Actor implementation }) } } }
Testing Migration
Unit Tests for Migrated Actors
#![allow(unused)] fn main() { #[cfg(test)] mod migration_tests { use super::*; use reflow_network::actor::testing::TestActorConfig; #[tokio::test] async fn test_migrated_actor_with_legacy_values() { // Test that migrated actor works with old-style values let config = TestActorConfig::builder() .with_number("batch_size", 50.0) .with_number("timeout_ms", 10000.0) .with_boolean("enable_retry", false) .build(); let actor = DataProcessor::new(); // Should not panic with valid configuration let process = actor.create_process(config.into()); // Test that process can be spawned let handle = tokio::spawn(process); // Clean shutdown for test tokio::time::sleep(Duration::from_millis(100)).await; handle.abort(); } #[test] fn test_configuration_migration_helper() { let legacy_config = HashMap::from([ ("batch_size".to_string(), serde_json::Value::Number(25.into())), ("timeout_ms".to_string(), serde_json::Value::Number(15000.into())), ]); let actor_config = ActorConfig::from_legacy_hashmap(legacy_config); assert_eq!(actor_config.get_number("batch_size"), Some(25.0)); assert_eq!(actor_config.get_number("timeout_ms"), Some(15000.0)); } } }
Common Migration Issues
Issue 1: Missing Configuration Values
Problem: Actor expects configuration values that aren't provided.
Solution: Use default values and graceful degradation:
#![allow(unused)] fn main() { // Before: Could panic let batch_size = config.get("batch_size").unwrap().as_f64().unwrap() as usize; // After: Graceful with defaults let batch_size = config.get_number("batch_size").unwrap_or(10.0) as usize; }
Issue 2: Type Conversion Errors
Problem: Configuration values have different types than expected.
Solution: Use explicit type checking and conversion:
#![allow(unused)] fn main() { // Robust type handling let batch_size = match config.get_number("batch_size") { Some(size) if size > 0.0 => size as usize, Some(invalid) => { eprintln!("Invalid batch_size: {}, using default", invalid); 10 }, None => { println!("No batch_size specified, using default"); 10 } }; }
Issue 3: State Management Migration
Problem: Actors with complex internal state need restructuring.
Solution: Move state into the process:
#![allow(unused)] fn main() { // Before: State as struct fields struct StatefulActor { state: ProcessorState, config: Config, } // After: State managed in process impl Actor for StatefulActor { fn create_process(&self, config: ActorConfig) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { Box::pin(async move { let mut state = ProcessorState::new(); loop { // Process using local state // ... } }) } } }
Migration Checklist
Pre-Migration
-
Identify all actors using
set_config - Document current configuration formats
- Create backup of existing graph files
- Plan migration order (dependencies first)
During Migration
- Update actor trait implementations
- Migrate configuration extraction logic
- Add typed configuration schemas (recommended)
- Update graph file metadata format
- Update actor registration code
Post-Migration
- Test all actors with new configuration system
- Verify graph loading and execution
-
Remove old
set_configimplementations - Update documentation and examples
- Performance testing with new system
Validation
- All actors receive expected configuration
- Configuration validation works correctly
- Default values are applied appropriately
- Error handling for invalid configurations
- Dynamic configuration updates (if used)
Performance Considerations
Before and After Performance
The new ActorConfig system provides better performance in several areas:
- Configuration Parsing: One-time parsing vs repeated HashMap lookups
- Type Safety: Compile-time validation reduces runtime errors
- Memory Usage: More efficient internal representation
- Validation: Built-in validation vs manual checking
Benchmarking Migration
#![allow(unused)] fn main() { // benchmark_migration.rs use criterion::{black_box, criterion_group, criterion_main, Criterion}; fn benchmark_old_config(c: &mut Criterion) { let config = HashMap::from([ ("batch_size".to_string(), serde_json::Value::Number(50.into())), ("timeout_ms".to_string(), serde_json::Value::Number(10000.into())), ]); c.bench_function("old_config_extraction", |b| b.iter(|| { let batch_size = black_box(config.get("batch_size") .and_then(|v| v.as_f64()) .unwrap_or(10.0) as usize); let timeout = black_box(config.get("timeout_ms") .and_then(|v| v.as_f64()) .unwrap_or(5000.0) as u64); })); } fn benchmark_new_config(c: &mut Criterion) { let config = ActorConfig::from_json(r#" { "batch_size": 50, "timeout_ms": 10000 } "#).unwrap(); c.bench_function("new_config_extraction", |b| b.iter(|| { let batch_size = black_box(config.get_number("batch_size").unwrap_or(10.0) as usize); let timeout = black_box(config.get_number("timeout_ms").unwrap_or(5000.0) as u64); })); } criterion_group!(benches, benchmark_old_config, benchmark_new_config); criterion_main!(benches); }
Next Steps
After completing the migration:
- Remove Legacy Code: Clean up old
set_configimplementations - Add Validation: Implement typed configuration schemas for better validation
- Dynamic Configuration: Consider adding runtime configuration updates
- Documentation: Update all examples and documentation
- Monitoring: Add configuration monitoring and alerting
Getting Help
If you encounter issues during migration:
- Check Examples: Look at migrated examples in the documentation
- Configuration Validation: Use typed schemas to catch issues early
- Testing: Write comprehensive tests for migrated actors
- Community: Ask for help in the Reflow community forums
- GitHub Issues: Report bugs or ask for clarification
The migration to ActorConfig provides significant benefits in terms of type safety, validation, and maintainability. While it requires some initial effort, the improved developer experience and runtime reliability make it worthwhile.