Message Passing
This document details Reflow's message passing system, which is the primary communication mechanism between actors.
Message Types
Reflow uses a strongly-typed message system with built-in serialization support:
#![allow(unused)] fn main() { #[derive(Clone, Debug, Serialize, Deserialize, Encode, Decode, PartialEq)] pub enum Message { Flow, Event(EncodableValue), Boolean(bool), Integer(i64), Float(f64), String(Arc<String>), Object(Arc<EncodableValue>), Array(Arc<Vec<EncodableValue>>), Stream(Arc<Vec<u8>>), Encoded(Arc<Vec<u8>>), Optional(Option<Arc<EncodableValue>>), Any(Arc<EncodableValue>), Error(Arc<String>), } }
EncodableValue
Reflow uses EncodableValue as a wrapper for complex data types:
#![allow(unused)] fn main() { #[derive(Clone, Debug, Serialize, Deserialize, Encode, Decode, PartialEq, Eq)] pub struct EncodableValue { pub(crate) data: Vec<u8>, } impl EncodableValue { pub fn new<T: Encode>(value: &T) -> Self { Self { data: bitcode::encode(value), } } pub fn decode<'a, T: Decode<'a>>(&'a self) -> Option<T> { bitcode::decode(&self.data).ok() } } }
Message Conversion
#![allow(unused)] fn main() { use serde_json::Value; // From JSON values let msg = Message::from(serde_json::json!(42)); // To JSON values let json: Value = message.into(); // Type checking if let Message::Integer(n) = message { println!("Number: {}", n); } // Working with EncodableValue - modern approach let data = serde_json::json!({"key": "value"}); let encodable = EncodableValue::from(data); let object_msg = Message::object(encodable); // Create arrays with EncodableValue - modern approach let array_items = vec![ EncodableValue::from(serde_json::json!("hello")), EncodableValue::from(serde_json::json!(42)), ]; let array_msg = Message::array(array_items); // Alternative: using helper methods for simple values let string_msg = Message::string("hello world".to_string()); let int_msg = Message::integer(42); let bool_msg = Message::boolean(true); let float_msg = Message::float(3.14); let error_msg = Message::error("Something went wrong".to_string()); }
Communication Channels
Ports
Ports are the communication endpoints for actors:
#![allow(unused)] fn main() { pub type Port = ( flume::Sender<HashMap<String, Message>>, flume::Receiver<HashMap<String, Message>>, ); // Actor payload format pub type ActorPayload = HashMap<String, Message>; }
Channel Properties
- Asynchronous: Non-blocking send/receive operations
- Bounded: Configurable buffer sizes for backpressure
- Multi-producer, Single-consumer: Multiple senders, one receiver per port
- Type-safe: Compile-time message type checking
Message Flow Patterns
Point-to-Point
Direct communication between two actors:
#![allow(unused)] fn main() { // Actor A sends to Actor B - using helper method let message = HashMap::from([ ("data".to_string(), Message::string("hello".to_string())) ]); sender.send_async(message).await?; }
Broadcast
One actor sends to multiple receivers:
#![allow(unused)] fn main() { // Using actor macro for broadcast #[actor( BroadcastActor, inports::<100>(input), outports::<50>(output1, output2, output3) )] async fn broadcast_actor(context: ActorContext) -> Result<HashMap<String, Message>, anyhow::Error> { let payload = context.get_payload(); if let Some(input_msg) = payload.get("input") { // Broadcast to all output ports Ok([ ("output1".to_owned(), input_msg.clone()), ("output2".to_owned(), input_msg.clone()), ("output3".to_owned(), input_msg.clone()), ].into()) } else { Err(anyhow::anyhow!("No input to broadcast")) } } // Manual implementation for dynamic outputs struct ManualBroadcastActor { inports: Port, outports: Port, outputs: Vec<flume::Sender<HashMap<String, Message>>>, load: Arc<Mutex<ActorLoad>>, } impl ManualBroadcastActor { async fn broadcast(&self, message: HashMap<String, Message>) -> Result<(), anyhow::Error> { for output in &self.outputs { output.send_async(message.clone()).await?; } Ok(()) } } }
Fan-In (Merge)
Multiple actors send to one receiver:
#![allow(unused)] fn main() { struct MergeActor { inputs: Vec<flume::Receiver<HashMap<String, Message>>>, output: flume::Sender<HashMap<String, Message>>, } impl MergeActor { async fn merge_loop(&self) { use futures::stream::{FuturesUnordered, StreamExt}; let mut streams: FuturesUnordered<_> = self.inputs .iter() .map(|rx| rx.recv_async()) .collect(); while let Some(result) = streams.next().await { if let Ok(message) = result { let _ = self.output.send_async(message).await; } } } } }
Serialization and Transport
Local Serialization
For local communication, messages use efficient in-memory representation:
#![allow(unused)] fn main() { // Zero-copy for simple types let msg = Message::Integer(42); // No allocation // Reference counting for complex types let complex = Message::Object(data); // Rc<HashMap<String, Message>> }
Network Serialization
For distributed communication:
#![allow(unused)] fn main() { use bitcode; use flate2::Compression; // Compress and serialize let compressed = compress_message(&message, Compression::default())?; let bytes = bitcode::serialize(&compressed)?; // Send over network network_send(bytes).await?; // Receive and deserialize let received = network_receive().await?; let message = bitcode::deserialize(&received)?; let decompressed = decompress_message(&message)?; }
Message Routing
Router Actor
#![allow(unused)] fn main() { pub struct RouterActor { routes: HashMap<String, flume::Sender<HashMap<String, Message>>>, default_route: Option<flume::Sender<HashMap<String, Message>>>, } impl RouterActor { pub fn route_message(&self, key: &str, message: HashMap<String, Message>) -> Result<()> { if let Some(sender) = self.routes.get(key) { sender.try_send(message)?; } else if let Some(default) = &self.default_route { default.try_send(message)?; } Ok(()) } } }
Content-Based Routing
#![allow(unused)] fn main() { impl RouterActor { fn route_by_content(&self, message: &HashMap<String, Message>) -> Option<&str> { // Route based on message content if let Some(Message::String(msg_type)) = message.get("type") { match msg_type.as_str() { "user_event" => Some("user_handler"), "system_event" => Some("system_handler"), "error" => Some("error_handler"), _ => None, } } else { None } } } }
Error Handling
Error Message Format
#![allow(unused)] fn main() { // Standard error message structure let error_msg = HashMap::from([ ("error".to_string(), Message::error("Processing failed".to_string())), ("code".to_string(), Message::integer(500)), ("source".to_string(), Message::string("database_actor".to_string())), ("timestamp".to_string(), Message::string(Utc::now().to_rfc3339())), ("details".to_string(), Message::object(error_details)), ]); }
Dead Letter Queue
#![allow(unused)] fn main() { pub struct DeadLetterQueue { storage: Arc<Mutex<Vec<(String, HashMap<String, Message>)>>>, max_size: usize, } impl DeadLetterQueue { pub async fn store_failed_message( &self, reason: String, message: HashMap<String, Message> ) { let mut storage = self.storage.lock(); if storage.len() >= self.max_size { storage.remove(0); // Remove oldest } storage.push((reason, message)); } } }
Backpressure Management
Flow Control
#![allow(unused)] fn main() { pub struct FlowControlActor { input: flume::Receiver<HashMap<String, Message>>, output: flume::Sender<HashMap<String, Message>>, buffer_size: usize, current_load: Arc<AtomicUsize>, } impl FlowControlActor { async fn process_with_backpressure(&self) { while let Ok(message) = self.input.recv_async().await { // Check current load let load = self.current_load.load(Ordering::Relaxed); if load > self.buffer_size { // Apply backpressure - slow down tokio::time::sleep(Duration::from_millis(10)).await; } self.current_load.fetch_add(1, Ordering::Relaxed); // Process message if let Err(_) = self.output.try_send(message) { // Output buffer full, apply backpressure tokio::time::sleep(Duration::from_millis(1)).await; } self.current_load.fetch_sub(1, Ordering::Relaxed); } } } }
Message Ordering
Ordered Delivery
#![allow(unused)] fn main() { pub struct OrderedDeliveryActor { sequence_number: AtomicU64, expected_sequence: AtomicU64, buffer: Arc<Mutex<BTreeMap<u64, HashMap<String, Message>>>>, } impl OrderedDeliveryActor { fn add_sequence_number(&self, mut message: HashMap<String, Message>) -> HashMap<String, Message> { let seq = self.sequence_number.fetch_add(1, Ordering::Relaxed); message.insert("sequence".to_string(), Message::Integer(seq as i64)); message } async fn deliver_in_order(&self, message: HashMap<String, Message>) { if let Some(Message::Integer(seq)) = message.get("sequence") { let seq = *seq as u64; let expected = self.expected_sequence.load(Ordering::Relaxed); if seq == expected { // Deliver immediately self.deliver_message(message).await; self.expected_sequence.fetch_add(1, Ordering::Relaxed); // Check buffer for next messages self.deliver_buffered_messages().await; } else { // Buffer out-of-order message self.buffer.lock().insert(seq, message); } } } } }
Performance Optimization
Message Batching
#![allow(unused)] fn main() { use crate::message::{Message, EncodableValue}; pub struct BatchingActor { batch_size: usize, batch_timeout: Duration, current_batch: Vec<HashMap<String, Message>>, input: flume::Receiver<HashMap<String, Message>>, output: flume::Sender<HashMap<String, Message>>, } impl BatchingActor { async fn process_with_batching(&mut self) { let mut interval = tokio::time::interval(self.batch_timeout); loop { tokio::select! { // Receive new message Ok(message) = self.input.recv_async() => { self.current_batch.push(message); if self.current_batch.len() >= self.batch_size { self.flush_batch().await; } } // Timeout - flush partial batch _ = interval.tick() => { if !self.current_batch.is_empty() { self.flush_batch().await; } } } } } async fn flush_batch(&mut self) { if !self.current_batch.is_empty() { // Convert to EncodableValue for proper serialization let batch_items: Vec<EncodableValue> = self.current_batch .drain(..) .map(|msg| EncodableValue::from(serde_json::to_value(msg).unwrap())) .collect(); let batch = Message::Array(batch_items); let batch_message = HashMap::from([ ("batch".to_string(), batch) ]); let _ = self.output.send_async(batch_message).await; } } } }
Zero-Copy Optimization
#![allow(unused)] fn main() { use bytes::Bytes; // Use Bytes for zero-copy binary data let data = Bytes::from(vec![1, 2, 3, 4]); let message = Message::Binary(data.to_vec()); // Reference counting for large objects use std::sync::Arc; struct LargeData { content: Vec<u8>, } let large_data = Arc::new(LargeData { content: vec![0; 1000000] }); // Pass Arc around instead of cloning large data }
Message Validation
Schema Validation
#![allow(unused)] fn main() { use serde_json::Value; pub struct MessageValidator { schemas: HashMap<String, Value>, // JSON Schema } impl MessageValidator { pub fn validate_message( &self, message_type: &str, message: &HashMap<String, Message> ) -> Result<(), ValidationError> { if let Some(schema) = self.schemas.get(message_type) { let json_value: Value = message.clone().into(); validate_json_schema(&json_value, schema)?; } Ok(()) } } }
Type Safety
#![allow(unused)] fn main() { // Type-safe message builders pub struct UserEventBuilder { user_id: Option<String>, event_type: Option<String>, timestamp: Option<String>, } impl UserEventBuilder { pub fn user_id(mut self, id: String) -> Self { self.user_id = Some(id); self } pub fn event_type(mut self, event_type: String) -> Self { self.event_type = Some(event_type); self } pub fn build(self) -> Result<HashMap<String, Message>, BuildError> { let user_id = self.user_id.ok_or(BuildError::MissingUserId)?; let event_type = self.event_type.ok_or(BuildError::MissingEventType)?; Ok(HashMap::from([ ("user_id".to_string(), Message::string(user_id)), ("event_type".to_string(), Message::string(event_type)), ("timestamp".to_string(), Message::string(Utc::now().to_rfc3339())), ])) } } }
Testing Message Passing
Mock Channels
#![allow(unused)] fn main() { pub struct MockChannel { sent_messages: Arc<Mutex<Vec<HashMap<String, Message>>>>, responses: Arc<Mutex<VecDeque<HashMap<String, Message>>>>, } impl MockChannel { pub fn new() -> Self { Self { sent_messages: Arc::new(Mutex::new(Vec::new())), responses: Arc::new(Mutex::new(VecDeque::new())), } } pub fn expect_message(&self, message: HashMap<String, Message>) { self.responses.lock().push_back(message); } pub fn verify_sent(&self, expected: &HashMap<String, Message>) -> bool { self.sent_messages.lock().contains(expected) } } }
Integration Testing
#![allow(unused)] fn main() { #[tokio::test] async fn test_message_pipeline() { let (tx1, rx1) = flume::unbounded(); let (tx2, rx2) = flume::unbounded(); // Create test actors let source = TestSourceActor::new(tx1); let processor = TestProcessorActor::new(rx1, tx2); let sink = TestSinkActor::new(rx2); // Start actors tokio::spawn(source.run()); tokio::spawn(processor.run()); tokio::spawn(sink.run()); // Test message flow let test_message = HashMap::from([ ("data".to_string(), Message::string("test".to_string())) ]); source.send(test_message.clone()).await; // Verify message received let received = sink.receive_next().await; assert_eq!(received.get("data"), test_message.get("data")); } }
Best Practices
Message Design
- Keep messages immutable - Never modify after creation
- Use appropriate granularity - Not too fine, not too coarse
- Include enough context - Messages should be self-contained
- Design for evolution - Use versioned message formats
Performance
- Batch when possible - Reduce overhead
- Use appropriate data types - Binary for large data
- Implement backpressure - Prevent resource exhaustion
- Monitor message rates - Track performance metrics
Error Handling
- Use structured errors - Include error codes and context
- Implement dead letter queues - Don't lose failed messages
- Design for retry - Make operations idempotent
- Log message failures - Enable debugging
Next Steps
- Graph System - Workflow composition
- Multi-Language Support - Script integration
- Performance Considerations - Optimization
- Creating Actors - Practical implementation