Basic Concepts
This guide introduces the fundamental concepts of Reflow's actor-based workflow engine.
Core Concepts
Actors
Actors are the building blocks of Reflow workflows. Each actor is an isolated unit of computation that:
- Processes incoming messages
- Maintains its own state
- Communicates only through message passing
- Runs concurrently with other actors
#![allow(unused)] fn main() { // Example: Simple actor that doubles numbers (using actor macro) use std::collections::HashMap; use reflow_network::{ actor::ActorContext, message::Message, }; use actor_macro::actor; #[actor( DoublerActor, inports::<100>(number), outports::<50>(result) )] async fn doubler_actor(context: ActorContext) -> Result<HashMap<String, Message>, anyhow::Error> { let payload = context.get_payload(); if let Some(Message::Integer(n)) = payload.get("number") { Ok([ ("result".to_owned(), Message::integer(n * 2)) ].into()) } else { Err(anyhow::anyhow!("Expected integer input")) } } // Alternative: Manual implementation use reflow_network::actor::{Actor, ActorBehavior, Port, ActorLoad}; use parking_lot::Mutex; use std::sync::Arc; pub struct ManualDoublerActor { inports: Port, outports: Port, load: Arc<Mutex<ActorLoad>>, } impl ManualDoublerActor { pub fn new() -> Self { Self { inports: flume::unbounded(), outports: flume::unbounded(), load: Arc::new(Mutex::new(ActorLoad::new(0))), } } } impl Actor for ManualDoublerActor { fn get_behavior(&self) -> ActorBehavior { Box::new(|context: ActorContext| { Box::pin(async move { let payload = context.get_payload(); if let Some(Message::Integer(n)) = payload.get("number") { Ok([ ("result".to_owned(), Message::Integer(n * 2)) ].into()) } else { Err(anyhow::anyhow!("Expected integer input")) } }) }) } fn get_inports(&self) -> Port { self.inports.clone() } fn get_outports(&self) -> Port { self.outports.clone() } fn load_count(&self) -> Arc<Mutex<ActorLoad>> { self.load.clone() } fn create_process(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + 'static + Send>> { // Process creation implementation... todo!("See creating-actors.md for complete implementation") } } }
Messages
Messages are the data that flows between actors. Reflow supports various message types:
#![allow(unused)] fn main() { pub enum Message { String(String), Integer(i64), Float(f64), Boolean(bool), Array(Vec<Message>), Object(HashMap<String, Message>), Binary(Vec<u8>), Null, Error(String), } }
Ports
Ports are the communication channels between actors:
- Input ports (inports): Receive messages from other actors
- Output ports (outports): Send messages to other actors
┌─────────────┐
│ Actor │
│ ┌───────┐ │
│ │ Logic │ │
│ └───────┘ │
│ │
│ in1 ──────→ │ ──────→ out1
│ in2 ──────→ │ ──────→ out2
└─────────────┘
Workflows (Graphs)
Workflows are directed graphs of connected actors that define:
- Data flow between actors
- Processing logic and transformations
- Execution order and dependencies
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Source │───▶│Transform│───▶│ Sink │
│ Actor │ │ Actor │ │ Actor │
└─────────┘ └─────────┘ └─────────┘
Actor State
Each actor can maintain its own state that persists between message processing:
#![allow(unused)] fn main() { // Example: Counter actor with state (using actor macro) use reflow_network::actor::MemoryState; #[actor( CounterActor, state(MemoryState), inports::<100>(increment), outports::<50>(count) )] async fn counter_actor(context: ActorContext) -> Result<HashMap<String, Message>, anyhow::Error> { let payload = context.get_payload(); let state = context.get_state(); let mut state_guard = state.lock(); let memory_state = state_guard .as_mut_any() .downcast_mut::<MemoryState>() .expect("Expected MemoryState"); // Initialize state if needed if !memory_state.contains_key("count") { memory_state.insert("count", serde_json::json!(0)); } // Get current count let current_count = memory_state.get("count") .and_then(|v| v.as_i64()) .unwrap_or(0); // Increment by 1 or by specified amount let increment_by = if let Some(Message::Integer(amount)) = payload.get("increment") { *amount } else { 1 // Default increment }; let new_count = current_count + increment_by; // Update state memory_state.insert("count", serde_json::json!(new_count)); Ok([ ("count".to_owned(), Message::Integer(new_count)) ].into()) } }
Actor Types
Native Actors (Rust)
Built directly in Rust for maximum performance:
#![allow(unused)] fn main() { struct ProcessorActor { // Implementation in Rust } }
Script Actors
Execute scripts in various languages:
JavaScript/TypeScript (Deno)
// JavaScript actor function
function process(inputs, context) {
const data = inputs.data;
return { result: data.toUpperCase() };
}
Python
# Python actor script
import numpy as np
inputs = Context.get_inputs()
data = np.array(inputs["data"])
__return_value = data.sum()
WebAssembly
#![allow(unused)] fn main() { // WASM actor using Reflow Plugin SDK (reflow_wasm_actor) use reflow_wasm_actor::*; use std::collections::HashMap; // Define plugin metadata fn metadata() -> PluginMetadata { PluginMetadata { component: "ProcessorActor".to_string(), description: "Processes input data".to_string(), inports: vec![ port_def!("input", "Input data", "String", required), ], outports: vec![ port_def!("output", "Processed output", "String"), ], config_schema: None, } } // Implement actor behavior fn process_actor(context: ActorContext) -> Result<ActorResult, Box<dyn std::error::Error>> { let mut outputs = HashMap::new(); // Get input and process it if let Some(Message::String(input)) = context.payload.get("input") { let processed = format!("Processed: {}", input); outputs.insert("output".to_string(), Message::String(processed)); } Ok(ActorResult { outputs, state: None, // No state changes }) } // Register the plugin actor_plugin!( metadata: metadata(), process: process_actor ); }
Message Passing Patterns
Point-to-Point
One actor sends to one specific actor:
Actor A ───────▶ Actor B
Broadcast
One actor sends to multiple actors:
┌─────▶ Actor B
Actor A ┤
└─────▶ Actor C
Collect/Merge
Multiple actors send to one actor:
Actor A ┐
├─────▶ Actor C
Actor B ┘
Concurrency Model
Actor Isolation
- Each actor runs in its own execution context
- No shared memory between actors
- Thread-safe by design
Message Processing
- Actors process messages asynchronously
- Messages are queued for processing
- Backpressure handling prevents overflow
Parallelism
- Multiple actors can run simultaneously
- Work is distributed across available CPU cores
- Network can span multiple machines
Error Handling
Actor-Level Errors
#![allow(unused)] fn main() { // Errors are returned as Error messages Err(anyhow::anyhow!("Processing failed")) }
Network-Level Errors
#![allow(unused)] fn main() { // Error propagation through the network HashMap::from([ ("error".to_string(), Message::Error("Network timeout".to_string())) ]) }
Recovery Patterns
- Dead letter queues for failed messages
- Circuit breakers for failing actors
- Supervisor actors for monitoring
Lifecycle Management
Actor Creation
#![allow(unused)] fn main() { let actor = MyActor::new(config); let process = actor.create_process(); tokio::spawn(process); }
Actor Termination
#![allow(unused)] fn main() { // Graceful shutdown drop(inports); // Closes input channels // Actor completes current message and exits }
State Persistence
#![allow(unused)] fn main() { // State can be persisted and restored let state = actor.get_state(); // Serialize state for persistence }
Advanced Concepts
Hot Code Reloading
- Script actors can be updated without stopping the workflow
- State preservation during updates
Multi-tenancy
- Isolated workspaces for different users/projects
- Resource quotas and permissions
Distributed Execution
- Actors can run on different machines
- Network-transparent message passing
Best Practices
Actor Design
- Keep actors small and focused
- Avoid blocking operations in actor logic
- Use async/await for I/O operations
Message Design
- Use typed messages when possible
- Keep messages small and serializable
- Include error context in error messages
Workflow Design
- Design for failure (circuit breakers, timeouts)
- Monitor actor performance and health
- Use appropriate parallelism levels
Next Steps
Now that you understand the basic concepts:
- Set up development: Development Setup
- Create your first workflow: First Workflow
- Learn about specific actors: Actor API
- Explore scripting: JavaScript Runtime