ActorConfig System
The ActorConfig system provides a unified configuration framework for all actors in Reflow, enabling dynamic configuration, runtime parameter adjustment, and consistent actor behavior across different deployment environments.
Overview
ActorConfig replaces the previous ad-hoc configuration approach with a structured, type-safe system that supports:
- Type-Safe Configuration: Strongly typed configuration parameters with validation
- Dynamic Updates: Runtime configuration changes without actor restart
- Environment Variables: Automatic environment variable injection
- JSON/YAML Support: Flexible configuration file formats
- Validation & Defaults: Built-in validation with sensible defaults
- Metadata Integration: Rich metadata for configuration documentation
Basic Usage
Simple Actor Configuration
#![allow(unused)] fn main() { use reflow_network::actor::{Actor, ActorConfig, ActorContext}; use std::collections::HashMap; #[derive(Debug)] struct ProcessorActor { config: ActorConfig, } impl ProcessorActor { fn new() -> Self { Self { config: ActorConfig::default(), } } } impl Actor for ProcessorActor { fn create_process(&self, config: ActorConfig) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { // Extract configuration values let batch_size = config.get_number("batch_size").unwrap_or(10.0) as usize; let timeout_ms = config.get_number("timeout_ms").unwrap_or(5000.0) as u64; let enable_retry = config.get_boolean("enable_retry").unwrap_or(true); let processor_name = config.get_string("name").unwrap_or("default_processor".to_string().into()); Box::pin(async move { println!("Processor {} starting with batch_size={}, timeout={}ms, retry={}", processor_name, batch_size, timeout_ms, enable_retry); // Actor implementation using configuration... }) } // ... other Actor trait methods } }
Configuration from JSON
{
"name": "data_processor",
"batch_size": 50,
"timeout_ms": 10000,
"enable_retry": true,
"processing_mode": "parallel",
"max_retries": 3,
"retry_delay_ms": 1000
}
#![allow(unused)] fn main() { // Load configuration from JSON let config_json = r#" { "name": "data_processor", "batch_size": 50, "timeout_ms": 10000, "enable_retry": true, "processing_mode": "parallel" } "#; let config = ActorConfig::from_json(config_json)?; let actor = ProcessorActor::new(); // Use configuration when creating actor process let process = actor.create_process(config); tokio::spawn(process); }
Configuration Sources
Environment Variables
ActorConfig automatically reads from environment variables with configurable prefixes:
#![allow(unused)] fn main() { // Environment variables: // PROCESSOR_BATCH_SIZE=100 // PROCESSOR_TIMEOUT_MS=15000 // PROCESSOR_ENABLE_RETRY=false let config = ActorConfig::from_env("PROCESSOR")?; // Access values let batch_size = config.get_number("batch_size").unwrap(); // 100.0 let timeout = config.get_number("timeout_ms").unwrap(); // 15000.0 let retry = config.get_boolean("enable_retry").unwrap(); // false }
Configuration Files
#![allow(unused)] fn main() { // From YAML file let config = ActorConfig::from_yaml_file("configs/processor.yaml").await?; // From JSON file let config = ActorConfig::from_json_file("configs/processor.json").await?; // From TOML file let config = ActorConfig::from_toml_file("configs/processor.toml").await?; }
Combined Sources with Precedence
#![allow(unused)] fn main() { // Build configuration with precedence: CLI args > env vars > config file > defaults let config = ActorConfig::builder() .from_file("configs/defaults.yaml").await? .from_env("PROCESSOR")? .from_args(std::env::args())? .build()?; }
Configuration Schema and Validation
Defining Configuration Schema
#![allow(unused)] fn main() { use reflow_network::actor::{ActorConfigSchema, ConfigField, ConfigType}; use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] struct ProcessorConfigSchema { #[serde(default = "default_batch_size")] batch_size: u32, #[serde(default = "default_timeout")] timeout_ms: u64, #[serde(default)] enable_retry: bool, #[serde(default = "default_name")] name: String, processing_mode: ProcessingMode, } #[derive(Debug, Serialize, Deserialize)] enum ProcessingMode { Sequential, Parallel, Batch, } fn default_batch_size() -> u32 { 10 } fn default_timeout() -> u64 { 5000 } fn default_name() -> String { "processor".to_string() } impl ActorConfigSchema for ProcessorConfigSchema { fn schema() -> Vec<ConfigField> { vec![ ConfigField { name: "batch_size".to_string(), config_type: ConfigType::Number, required: false, default_value: Some(serde_json::Value::Number(10.into())), description: Some("Number of items to process in each batch".to_string()), validation: Some("Must be between 1 and 1000".to_string()), }, ConfigField { name: "timeout_ms".to_string(), config_type: ConfigType::Number, required: false, default_value: Some(serde_json::Value::Number(5000.into())), description: Some("Processing timeout in milliseconds".to_string()), validation: Some("Must be positive".to_string()), }, ConfigField { name: "enable_retry".to_string(), config_type: ConfigType::Boolean, required: false, default_value: Some(serde_json::Value::Bool(false)), description: Some("Enable automatic retry on failure".to_string()), validation: None, }, ConfigField { name: "name".to_string(), config_type: ConfigType::String, required: false, default_value: Some(serde_json::Value::String("processor".to_string())), description: Some("Actor instance name".to_string()), validation: Some("Must be non-empty alphanumeric".to_string()), }, ConfigField { name: "processing_mode".to_string(), config_type: ConfigType::String, required: true, default_value: None, description: Some("Processing execution mode".to_string()), validation: Some("Must be one of: sequential, parallel, batch".to_string()), }, ] } fn validate(&self) -> Result<(), String> { if self.batch_size == 0 || self.batch_size > 1000 { return Err("batch_size must be between 1 and 1000".to_string()); } if self.timeout_ms == 0 { return Err("timeout_ms must be positive".to_string()); } if self.name.is_empty() { return Err("name cannot be empty".to_string()); } Ok(()) } } }
Using Typed Configuration
#![allow(unused)] fn main() { impl Actor for ProcessorActor { fn create_process(&self, config: ActorConfig) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { // Parse and validate configuration against schema let typed_config: ProcessorConfigSchema = config.parse_typed()?; // Configuration is now type-safe and validated let batch_size = typed_config.batch_size; let timeout = Duration::from_millis(typed_config.timeout_ms); let enable_retry = typed_config.enable_retry; let name = typed_config.name; let mode = typed_config.processing_mode; Box::pin(async move { match mode { ProcessingMode::Sequential => { // Sequential processing logic }, ProcessingMode::Parallel => { // Parallel processing logic }, ProcessingMode::Batch => { // Batch processing logic }, } }) } } }
Dynamic Configuration Updates
Runtime Configuration Changes
#![allow(unused)] fn main() { use tokio::sync::watch; struct DynamicProcessorActor { config_receiver: watch::Receiver<ActorConfig>, } impl DynamicProcessorActor { fn new(config_receiver: watch::Receiver<ActorConfig>) -> Self { Self { config_receiver } } } impl Actor for DynamicProcessorActor { fn create_process(&self, initial_config: ActorConfig) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { let mut config_receiver = self.config_receiver.clone(); Box::pin(async move { let mut current_config = initial_config; loop { // Check for configuration updates if config_receiver.has_changed().unwrap_or(false) { current_config = config_receiver.borrow().clone(); println!("Configuration updated: {:?}", current_config); // Apply new configuration let batch_size = current_config.get_number("batch_size").unwrap_or(10.0) as usize; println!("New batch size: {}", batch_size); } // Process with current configuration // ... actor logic ... tokio::time::sleep(Duration::from_millis(100)).await; } }) } } // Update configuration at runtime async fn update_actor_config() -> Result<(), Box<dyn std::error::Error>> { let (config_sender, config_receiver) = watch::channel(ActorConfig::default()); let actor = DynamicProcessorActor::new(config_receiver); tokio::spawn(actor.create_process(ActorConfig::default())); // Update configuration after 5 seconds tokio::time::sleep(Duration::from_secs(5)).await; let new_config = ActorConfig::from_json(r#" { "batch_size": 100, "timeout_ms": 20000, "enable_retry": true } "#)?; config_sender.send(new_config)?; println!("Configuration updated!"); Ok(()) } }
Configuration in Networks and Graphs
Network-Level Configuration
#![allow(unused)] fn main() { use reflow_network::network::{Network, NetworkConfig}; // Configure network with global defaults let network_config = NetworkConfig { default_actor_config: Some(ActorConfig::from_json(r#" { "default_timeout_ms": 10000, "enable_monitoring": true, "log_level": "info" } "#)?), // ... other network config }; let mut network = Network::new(network_config); // Add actor with specific configuration let actor_config = ActorConfig::from_json(r#" { "batch_size": 50, "timeout_ms": 15000, "name": "data_processor_1" } "#)?; network.add_node_with_config("processor1", "DataProcessorActor", Some(actor_config))?; }
Graph-Level Configuration
{
"caseSensitive": false,
"properties": {
"name": "data_processing_pipeline"
},
"processes": {
"collector": {
"component": "DataCollectorActor",
"metadata": {
"config": {
"source_url": "https://api.example.com/data",
"poll_interval_ms": 5000,
"batch_size": 100
}
}
},
"processor": {
"component": "DataProcessorActor",
"metadata": {
"config": {
"processing_mode": "parallel",
"worker_count": 4,
"timeout_ms": 30000
}
}
},
"validator": {
"component": "DataValidatorActor",
"metadata": {
"config": {
"strict_validation": true,
"schema_file": "./schemas/data.json"
}
}
}
},
"connections": [
{
"from": { "nodeId": "collector", "portId": "Output" },
"to": { "nodeId": "processor", "portId": "Input" }
},
{
"from": { "nodeId": "processor", "portId": "Output" },
"to": { "nodeId": "validator", "portId": "Input" }
}
]
}
Loading Graph with Configurations
#![allow(unused)] fn main() { use reflow_network::graph::Graph; // Load graph - configurations are automatically extracted from metadata let graph = Graph::load_from_file("data_pipeline.graph.json").await?; // Each actor will receive its specific configuration // Network automatically extracts config from process metadata let mut network = Network::new(NetworkConfig::default()); network.load_graph(graph).await?; }
Environment-Specific Configurations
Development vs Production
#![allow(unused)] fn main() { // Development configuration let dev_config = ActorConfig::from_json(r#" { "log_level": "debug", "enable_profiling": true, "timeout_ms": 60000, "batch_size": 5 } "#)?; // Production configuration let prod_config = ActorConfig::from_json(r#" { "log_level": "warn", "enable_profiling": false, "timeout_ms": 10000, "batch_size": 100 } "#)?; // Select configuration based on environment let config = match std::env::var("ENVIRONMENT").as_deref() { Ok("production") => prod_config, Ok("staging") => prod_config, // Use prod config for staging _ => dev_config, // Default to dev config }; }
Configuration Profiles
#![allow(unused)] fn main() { // Base configuration let base_config = ActorConfig::from_yaml_file("configs/base.yaml").await?; // Environment-specific overrides let env = std::env::var("ENVIRONMENT").unwrap_or_else(|_| "development".to_string()); let env_config_path = format!("configs/{}.yaml", env); let final_config = if std::path::Path::new(&env_config_path).exists() { base_config.merge_with(ActorConfig::from_yaml_file(&env_config_path).await?)? } else { base_config }; }
Configuration Migration
Migrating from Direct HashMap
Before (Old Pattern):
#![allow(unused)] fn main() { // Old approach - direct HashMap usage impl Actor for OldActor { fn set_config(&mut self, config: HashMap<String, serde_json::Value>) { self.batch_size = config.get("batch_size") .and_then(|v| v.as_f64()) .unwrap_or(10.0) as usize; self.timeout = Duration::from_millis( config.get("timeout_ms") .and_then(|v| v.as_f64()) .unwrap_or(5000.0) as u64 ); } } }
After (New Pattern):
#![allow(unused)] fn main() { // New approach - ActorConfig impl Actor for NewActor { fn create_process(&self, config: ActorConfig) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { let batch_size = config.get_number("batch_size").unwrap_or(10.0) as usize; let timeout = Duration::from_millis(config.get_number("timeout_ms").unwrap_or(5000.0) as u64); Box::pin(async move { // Actor implementation with configuration }) } } }
Migration Helper
#![allow(unused)] fn main() { // Helper function to migrate from old HashMap format impl ActorConfig { pub fn from_legacy_hashmap(legacy: HashMap<String, serde_json::Value>) -> Self { let mut config = ActorConfig::default(); for (key, value) in legacy { config.set(&key, value); } config } } // Usage in migration let legacy_config = HashMap::from([ ("batch_size".to_string(), serde_json::Value::Number(50.into())), ("timeout_ms".to_string(), serde_json::Value::Number(10000.into())), ]); let actor_config = ActorConfig::from_legacy_hashmap(legacy_config); }
Advanced Features
Conditional Configuration
#![allow(unused)] fn main() { #[derive(Debug, Serialize, Deserialize)] struct ConditionalConfig { #[serde(default)] enable_cache: bool, #[serde(skip_serializing_if = "Option::is_none")] cache_size_mb: Option<u32>, #[serde(skip_serializing_if = "Option::is_none")] cache_ttl_seconds: Option<u64>, } impl ActorConfigSchema for ConditionalConfig { fn validate(&self) -> Result<(), String> { if self.enable_cache { if self.cache_size_mb.is_none() { return Err("cache_size_mb is required when cache is enabled".to_string()); } if self.cache_ttl_seconds.is_none() { return Err("cache_ttl_seconds is required when cache is enabled".to_string()); } } Ok(()) } } }
Configuration Inheritance
#![allow(unused)] fn main() { // Base actor configuration let base_config = ActorConfig::from_json(r#" { "timeout_ms": 10000, "enable_logging": true, "log_level": "info" } "#)?; // Specialized configuration inheriting from base let specialized_config = base_config.extend_with(ActorConfig::from_json(r#" { "batch_size": 50, "processing_mode": "parallel", "timeout_ms": 20000 } "#)?)?; // Result combines both configs with specialized values taking precedence // timeout_ms: 20000 (overridden) // enable_logging: true (inherited) // log_level: "info" (inherited) // batch_size: 50 (added) // processing_mode: "parallel" (added) }
Secret Management
#![allow(unused)] fn main() { use reflow_network::actor::SecretResolver; // Configuration with secret references let config_with_secrets = ActorConfig::from_json(r#" { "database_url": "${secret:DATABASE_URL}", "api_key": "${secret:API_KEY}", "batch_size": 100 } "#)?; // Resolve secrets from environment or secret store let secret_resolver = SecretResolver::new() .with_env_prefix("SECRET_") .with_vault_client(vault_client); let resolved_config = secret_resolver.resolve(config_with_secrets).await?; // Secrets are now resolved: // database_url: "postgresql://user:password@localhost/db" // api_key: "sk-1234567890abcdef" // batch_size: 100 }
Testing with ActorConfig
Test Configuration Helpers
#![allow(unused)] fn main() { use reflow_network::actor::testing::TestActorConfig; #[tokio::test] async fn test_actor_with_config() { let test_config = TestActorConfig::builder() .with_number("batch_size", 10.0) .with_boolean("enable_retry", false) .with_string("name", "test_actor") .build(); let actor = MyActor::new(); let process = actor.create_process(test_config.into()); // Test actor behavior with specific configuration // ... } #[tokio::test] async fn test_actor_configuration_validation() { let invalid_config = ActorConfig::from_json(r#" { "batch_size": -1, "timeout_ms": 0 } "#).unwrap(); let schema = MyActorConfigSchema::default(); assert!(schema.validate_config(&invalid_config).is_err()); } }
Configuration Mocking
#![allow(unused)] fn main() { // Mock configuration for testing struct MockConfigProvider { configs: HashMap<String, ActorConfig>, } impl MockConfigProvider { fn new() -> Self { Self { configs: HashMap::new(), } } fn add_config(&mut self, actor_id: &str, config: ActorConfig) { self.configs.insert(actor_id.to_string(), config); } } impl ConfigProvider for MockConfigProvider { async fn get_config(&self, actor_id: &str) -> Result<ActorConfig, ConfigError> { self.configs.get(actor_id) .cloned() .ok_or_else(|| ConfigError::NotFound(actor_id.to_string())) } } }
Best Practices
Configuration Organization
- Use Typed Schemas: Define strongly typed configuration schemas for validation
- Provide Sensible Defaults: Always provide reasonable default values
- Document Configuration: Include descriptions and validation rules
- Environment Separation: Use different configurations for different environments
- Secret Security: Never store secrets in plain text configuration files
Performance Considerations
#![allow(unused)] fn main() { // Cache parsed configuration for performance use std::sync::Arc; use once_cell::sync::OnceCell; struct CachedConfigActor { cached_config: OnceCell<Arc<ProcessorConfigSchema>>, } impl Actor for CachedConfigActor { fn create_process(&self, config: ActorConfig) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { // Parse configuration once and cache it let parsed_config = self.cached_config.get_or_init(|| { Arc::new(config.parse_typed().expect("Invalid configuration")) }).clone(); Box::pin(async move { // Use cached configuration let batch_size = parsed_config.batch_size; // ... }) } } }
Error Handling
#![allow(unused)] fn main() { use reflow_network::actor::ConfigError; impl Actor for RobustActor { fn create_process(&self, config: ActorConfig) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { Box::pin(async move { // Graceful configuration error handling let batch_size = match config.get_number("batch_size") { Some(size) if size > 0.0 => size as usize, Some(_) => { eprintln!("Invalid batch_size, using default"); 10 }, None => { println!("No batch_size specified, using default"); 10 } }; // Continue with actor logic }) } } }
Next Steps
- Actor Creation Guide - Learn how to create actors that use ActorConfig
- Multi-Graph Composition - Using ActorConfig in multi-graph setups
- ActorConfig Migration Guide - Migrating existing actors