Graph System Architecture
Reflow's graph system provides a comprehensive flow-based programming (FBP) foundation for building visual workflow editors, data processing pipelines, and complex computational graphs. The system supports real-time validation, automatic layout, performance analysis, and both native Rust and WebAssembly implementations.
Core Concepts
Graph Structure
A Reflow graph consists of:
- Nodes: Processing units that represent actors or components
- Connections: Data flow paths between node ports
- Ports: Input/output endpoints with typed interfaces
- Initial Information Packets (IIPs): Static data injected into the graph
- Groups: Logical collections of related nodes
#![allow(unused)] fn main() { use reflow_network::graph::{Graph, GraphNode, GraphConnection, GraphEdge, PortType}; use std::collections::HashMap; // Create a new graph let mut graph = Graph::new("MyWorkflow", false, None); // Add nodes graph.add_node("source", "DataSource", None); graph.add_node("processor", "DataProcessor", None); graph.add_node("sink", "DataSink", None); // Connect nodes graph.add_connection("source", "output", "processor", "input", None); graph.add_connection("processor", "output", "sink", "input", None); }
Port Type System
Reflow uses a sophisticated type system to ensure data compatibility between connected nodes:
#![allow(unused)] fn main() { #[derive(Debug, Clone, PartialEq)] pub enum PortType { Any, // Accepts any data type Flow, // Control flow signals Event, // Event-driven data Boolean, // Boolean values Integer, // Integer numbers Float, // Floating-point numbers String, // Text data Object(String), // Structured objects with schema Array(Box<PortType>), // Arrays of typed elements Stream, // Streaming data Encoded, // Binary encoded data Option(Box<PortType>), // Optional values } }
Type Compatibility
The system automatically validates type compatibility when connections are made:
#![allow(unused)] fn main() { // These connections are valid graph.add_connection("int_source", "out", "float_sink", "in", None); // Integer → Float graph.add_connection("any_source", "out", "string_sink", "in", None); // Any → String graph.add_connection("data", "out", "stream", "in", None); // Any → Stream // This would be invalid and rejected // graph.add_connection("string_source", "out", "int_sink", "in", None); // String ↛ Integer }
Graph Operations
Node Management
#![allow(unused)] fn main() { // Add node with metadata let metadata = HashMap::from([ ("x".to_string(), json!(100)), ("y".to_string(), json!(200)), ("description".to_string(), json!("Processes incoming data")) ]); graph.add_node("processor", "DataProcessor", Some(metadata)); // Update node metadata graph.set_node_metadata("processor", HashMap::from([ ("color".to_string(), json!("#ff0000")) ])); // Remove node (also removes all connections) graph.remove_node("processor"); }
Connection Management
#![allow(unused)] fn main() { // Add connection with metadata let conn_metadata = HashMap::from([ ("weight".to_string(), json!(0.8)), ("priority".to_string(), json!("high")) ]); graph.add_connection("source", "data", "sink", "input", Some(conn_metadata)); // Get connection details if let Some(connection) = graph.get_connection("source", "data", "sink", "input") { println!("Connection: {:?}", connection); } // Remove specific connection graph.remove_connection("source", "data", "sink", "input"); }
Initial Information Packets (IIPs)
IIPs allow you to inject static data into the graph at startup:
#![allow(unused)] fn main() { use serde_json::json; // Add configuration data graph.add_initial( json!({"database_url": "postgresql://localhost/mydb"}), "database_connector", "config", None ); // Add initial data with index for array ports graph.add_initial_index( json!("input_file.txt"), "file_reader", "filenames", 0, None ); }
Graph Ports
Expose internal node ports as graph-level interfaces:
#![allow(unused)] fn main() { // Add input port to graph graph.add_inport( "data_input", // External port name "processor", // Internal node "input", // Internal port PortType::Any, // Port type None // Metadata ); // Add output port to graph graph.add_outport( "processed_data", // External port name "processor", // Internal node "output", // Internal port PortType::Object("ProcessedData".to_string()), None ); }
Graph Validation
Automatic Validation
The graph system performs continuous validation:
#![allow(unused)] fn main() { // Validate entire graph let validation_result = graph.validate_flow()?; if !validation_result.cycles.is_empty() { println!("Cycles detected: {:?}", validation_result.cycles); } if !validation_result.orphaned_nodes.is_empty() { println!("Orphaned nodes: {:?}", validation_result.orphaned_nodes); } for mismatch in validation_result.port_mismatches { println!("Port mismatch: {}", mismatch); } }
Cycle Detection
Advanced cycle detection with path tracking:
#![allow(unused)] fn main() { // Detect first cycle if let Some(cycle) = graph.detect_cycles() { println!("Cycle found: {:?}", cycle); } // Comprehensive cycle analysis let cycle_analysis = graph.analyze_cycles(); println!("Total cycles: {}", cycle_analysis.total_cycles); println!("Nodes in cycles: {:?}", cycle_analysis.nodes_in_cycles); }
Performance Analysis
Parallelism Detection
Identify opportunities for parallel execution:
#![allow(unused)] fn main() { let parallelism = graph.analyze_parallelism(); // Parallel branches that can execute simultaneously for branch in parallelism.parallel_branches { println!("Parallel branch: {:?}", branch.nodes); } // Pipeline stages for sequential execution for stage in parallelism.pipeline_stages { println!("Stage {}: {:?}", stage.level, stage.nodes); } }
Bottleneck Analysis
Find performance bottlenecks:
#![allow(unused)] fn main() { let bottlenecks = graph.detect_bottlenecks(); for bottleneck in bottlenecks { match bottleneck { Bottleneck::HighDegree(node) => { println!("High-degree bottleneck at node: {}", node); } Bottleneck::SequentialChain(chain) => { println!("Sequential chain that could be parallelized: {:?}", chain); } } } }
Resource Analysis
Estimate execution requirements:
#![allow(unused)] fn main() { let analysis = graph.analyze_for_runtime(); println!("Estimated execution time: {:.2}s", analysis.estimated_execution_time); println!("Resource requirements: {:?}", analysis.resource_requirements); for suggestion in analysis.optimization_suggestions { match suggestion { OptimizationSuggestion::ParallelizableChain { nodes } => { println!("Consider parallelizing: {:?}", nodes); } OptimizationSuggestion::RedundantNode { node, reason } => { println!("Redundant node {}: {}", node, reason); } OptimizationSuggestion::ResourceBottleneck { resource, severity } => { println!("Resource bottleneck in {}: {:.1}%", resource, severity * 100.0); } OptimizationSuggestion::DataTypeOptimization { from, to, suggestion } => { println!("Optimize {} → {}: {}", from, to, suggestion); } } } }
Graph Layout
Automatic Layout
The system provides intelligent automatic layout:
#![allow(unused)] fn main() { // Calculate optimal positions let positions = graph.calculate_layout(); for (node_id, position) in positions { println!("Node {}: x={:.1}, y={:.1}", node_id, position.x, position.y); } // Apply layout to graph metadata graph.auto_layout()?; }
Manual Positioning
Set custom node positions:
#![allow(unused)] fn main() { // Set specific position graph.set_node_position("processor", 150.0, 100.0)?; // Set position with custom dimensions and anchor let metadata = HashMap::from([ ("position".to_string(), json!({"x": 200, "y": 150})), ("dimensions".to_string(), json!({ "width": 120, "height": 80, "anchor": {"x": 0.5, "y": 0.5} // Center anchor })) ]); graph.set_node_metadata("custom_node", metadata); }
Event System
Real-time Updates
Subscribe to graph changes:
#![allow(unused)] fn main() { use reflow_network::graph::GraphEvents; // Graph creates event channel automatically let (sender, receiver) = graph.event_channel; // Listen for events while let Ok(event) = receiver.recv() { match event { GraphEvents::AddNode(node_data) => { println!("Node added: {:?}", node_data); } GraphEvents::AddConnection(conn_data) => { println!("Connection added: {:?}", conn_data); } GraphEvents::RemoveNode(node_data) => { println!("Node removed: {:?}", node_data); } // ... handle other events _ => {} } } }
Event Types
Complete list of graph events:
AddNode/RemoveNode/RenameNode/ChangeNodeAddConnection/RemoveConnection/ChangeConnectionAddInitial/RemoveInitialAddGroup/RemoveGroup/RenameGroup/ChangeGroupAddInport/RemoveInport/RenameInport/ChangeInportAddOutport/RemoveOutport/RenameOutport/ChangeOutportChangePropertiesStartTransaction/EndTransaction/Transaction
Serialization
Export Format
Graphs can be serialized to JSON for storage and interchange:
#![allow(unused)] fn main() { // Export to JSON-compatible format let export = graph.export(); let json_string = serde_json::to_string_pretty(&export)?; // Load from JSON let loaded_graph = Graph::load(export, Some(metadata)); }
Export Structure
{
"caseSensitive": false,
"properties": {
"name": "MyWorkflow",
"description": "A sample workflow"
},
"processes": {
"source": {
"id": "source",
"component": "DataSource",
"metadata": {"x": 0, "y": 0}
}
},
"connections": [
{
"from": {"nodeId": "source", "portId": "output"},
"to": {"nodeId": "sink", "portId": "input"},
"metadata": {}
}
],
"inports": {},
"outports": {},
"groups": []
}
WebAssembly Support
Browser Integration
The graph system compiles to WebAssembly for browser usage:
import { Graph } from 'reflow-network';
// Create graph in browser
const graph = new Graph("WebWorkflow", false, {});
// Add nodes and connections
graph.addNode("input", "InputNode", {x: 0, y: 0});
graph.addNode("output", "OutputNode", {x: 200, y: 0});
graph.addConnection("input", "out", "output", "in", {});
// Subscribe to events
graph.subscribe((event) => {
console.log("Graph event:", event);
});
// Export for persistence
const exported = graph.toJSON();
localStorage.setItem('workflow', JSON.stringify(exported));
TypeScript Support
Full TypeScript definitions are generated:
interface GraphNode {
id: string;
component: string;
metadata?: Map<string, any>;
}
interface GraphConnection {
from: GraphEdge;
to: GraphEdge;
metadata?: Map<string, any>;
data?: any;
}
type PortType =
| { type: "flow" }
| { type: "event" }
| { type: "boolean" }
| { type: "integer" }
| { type: "float" }
| { type: "string" }
| { type: "object", value: string }
| { type: "array", value: PortType }
| { type: "stream" }
| { type: "encoded" }
| { type: "any" }
| { type: "option", value: PortType };
Graph History
Undo/Redo System
Track changes for undo/redo functionality:
#![allow(unused)] fn main() { // Create graph with history tracking let (mut graph, mut history) = Graph::with_history(); // Make changes graph.add_node("test", "TestNode", None); graph.add_connection("test", "out", "sink", "in", None); // Undo last change if let Some(event) = history.undo() { // Apply inverse operation history.apply_inverse(&mut graph, event)?; } // Redo change if let Some(event) = history.redo() { // Reapply operation history.apply_event(&mut graph, event)?; } }
Advanced Features
Subgraph Analysis
Extract and analyze subgraphs:
#![allow(unused)] fn main() { // Get reachable subgraph from a node if let Some(subgraph) = graph.get_reachable_subgraph("start_node") { let analysis = graph.analyze_subgraph(&subgraph); println!("Subgraph nodes: {}", analysis.node_count); println!("Max depth: {}", analysis.max_depth); println!("Is cyclic: {}", analysis.is_cyclic); println!("Branching factor: {:.2}", analysis.branching_factor); } }
Graph Traversal
Efficient traversal algorithms:
#![allow(unused)] fn main() { // Depth-first traversal graph.traverse_depth_first("start_node", |node| { println!("Visiting node: {}", node.id); })?; // Breadth-first traversal graph.traverse_breadth_first("start_node", |node| { println!("Processing: {} ({})", node.id, node.component); })?; }
Node Groups
Organize nodes into logical groups:
#![allow(unused)] fn main() { // Create group graph.add_group("data_processing", vec!["filter".to_string(), "transform".to_string()], None); // Add node to existing group graph.add_to_group("data_processing", "validator"); // Remove from group graph.remove_from_group("data_processing", "validator"); }
Best Practices
Performance Optimization
- Use indexed operations: The graph uses internal indices for O(1) lookups
- Batch modifications: Group related changes to minimize event overhead
- Validate incrementally: Use targeted validation for better performance
- Cache analysis results: Store expensive analysis results when graph is stable
Memory Management
- Clean up connections: Always remove connections before removing nodes
- Limit history size: Use
with_history_and_limit()for bounded memory usage - Dispose of event listeners: Unsubscribe from events when no longer needed
Error Handling
- Check return values: Most operations return Result types
- Validate before execution: Use validation methods before running workflows
- Handle cycles gracefully: Implement cycle detection in your workflow runtime
- Monitor resource usage: Track memory and CPU usage for large graphs
Integration Examples
Visual Editor Integration
#![allow(unused)] fn main() { // In a visual editor, sync UI with graph events graph.subscribe(|event| { match event { GraphEvents::AddNode(data) => ui.add_node_widget(data), GraphEvents::RemoveNode(data) => ui.remove_node_widget(data.id), GraphEvents::AddConnection(data) => ui.draw_connection(data), _ => {} } }); }
Workflow Execution
#![allow(unused)] fn main() { // Convert graph to executable network let network = Network::from_graph(&graph)?; // Execute with runtime let runtime = Runtime::new(); runtime.execute(network).await?; }
Next Steps
- Creating Graphs - Detailed API guide
- Graph Analysis - Validation and performance analysis
- Layout System - Positioning and visualization
- SubgraphActor - Hierarchical graph composition via reflow_network
- Multi-Graph Composition - Workspace discovery and graph composition
- Distributed Networks - Cross-network execution and PeerMesh
- Building Visual Editors - Complete tutorial