Storage Backends
Reflow's observability framework supports multiple storage backends to accommodate different operational requirements, from development and testing to large-scale production deployments.
Overview
The tracing system provides a pluggable storage architecture that allows you to choose the most appropriate backend for your needs:
- Memory Storage: Fast, ephemeral storage for development and testing
- SQLite Storage: Lightweight, embedded database for small to medium deployments
- PostgreSQL Storage: Robust, scalable database for production environments
- ClickHouse Storage: High-performance analytical database for massive scale
- Custom Storage: Implement your own storage adapter
Memory Storage
When to Use
- Development and testing environments
- Temporary trace analysis
- Systems with limited persistence requirements
- Quick prototyping and debugging
Configuration
#![allow(unused)] fn main() { use reflow_tracing::storage::MemoryStorage; let storage = MemoryStorage::new(); }
Features
- Ultra-fast: No disk I/O overhead
- Zero configuration: Works out of the box
- Bounded capacity: Configurable memory limits
- Automatic cleanup: LRU eviction when capacity is reached
#![allow(unused)] fn main() { use reflow_tracing::storage::{MemoryStorage, MemoryConfig}; let config = MemoryConfig { max_traces: 10_000, max_events_per_trace: 1_000, max_memory_mb: 256, eviction_policy: EvictionPolicy::LRU, }; let storage = MemoryStorage::with_config(config); }
Limitations
- No persistence: Data lost on restart
- Memory bound: Limited by available RAM
- Single process: No sharing between instances
- No complex queries: Basic filtering only
SQLite Storage
When to Use
- Small to medium production deployments
- Single-node applications
- Applications requiring persistence without database administration
- Development environments with persistence needs
Configuration
#![allow(unused)] fn main() { use reflow_tracing::storage::SqliteStorage; let storage = SqliteStorage::new("traces.db").await?; }
Features
- Persistent: Data survives restarts
- ACID transactions: Data integrity guarantees
- Full SQL support: Complex queries and analysis
- Embedded: No separate database server required
- Backup friendly: Single file for easy backups
#![allow(unused)] fn main() { use reflow_tracing::storage::{SqliteStorage, SqliteConfig}; let config = SqliteConfig { database_path: "traces.db".to_string(), journal_mode: JournalMode::WAL, synchronous: SynchronousMode::Normal, cache_size_mb: 64, busy_timeout_ms: 5000, max_connections: 10, }; let storage = SqliteStorage::with_config(config).await?; }
Performance Tuning
#![allow(unused)] fn main() { // Optimize for write performance let fast_config = SqliteConfig { journal_mode: JournalMode::WAL, // Write-Ahead Logging synchronous: SynchronousMode::Normal, // Balanced durability/speed cache_size_mb: 128, // Larger cache busy_timeout_ms: 10000, // Handle contention ..Default::default() }; // Optimize for read performance let read_config = SqliteConfig { cache_size_mb: 256, // Very large cache temp_store: TempStore::Memory, // In-memory temp tables mmap_size_mb: 512, // Memory-mapped I/O ..Default::default() }; }
Limitations
- Single writer: Write concurrency limited
- File size: Large databases can become unwieldy
- Network access: No remote access without additional tools
PostgreSQL Storage
When to Use
- Production environments with multiple instances
- High-concurrency applications
- Applications requiring advanced SQL features
- Distributed systems
- Long-term data retention requirements
Configuration
#![allow(unused)] fn main() { use reflow_tracing::storage::PostgresStorage; let storage = PostgresStorage::new("postgresql://user:pass@localhost/traces").await?; }
Features
- High concurrency: Excellent multi-client performance
- ACID compliance: Strong consistency guarantees
- Advanced SQL: Window functions, CTEs, advanced analytics
- JSON support: Native support for trace event JSON
- Partitioning: Time-based table partitioning
- Replication: Built-in streaming replication
#![allow(unused)] fn main() { use reflow_tracing::storage::{PostgresStorage, PostgresConfig}; let config = PostgresConfig { connection_url: "postgresql://user:pass@localhost/traces".to_string(), max_connections: 20, min_connections: 5, connection_timeout_ms: 5000, idle_timeout_ms: 600000, max_lifetime_ms: 1800000, schema_name: "tracing".to_string(), enable_partitioning: true, partition_interval: PartitionInterval::Daily, }; let storage = PostgresStorage::with_config(config).await?; }
Schema Setup
-- Create dedicated schema
CREATE SCHEMA IF NOT EXISTS tracing;
-- Create partitioned tables
CREATE TABLE tracing.traces (
trace_id UUID PRIMARY KEY,
flow_id VARCHAR(255) NOT NULL,
execution_id UUID NOT NULL,
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ,
status VARCHAR(50) NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (start_time);
CREATE TABLE tracing.events (
event_id UUID PRIMARY KEY,
trace_id UUID NOT NULL REFERENCES tracing.traces(trace_id),
timestamp TIMESTAMPTZ NOT NULL,
event_type VARCHAR(100) NOT NULL,
actor_id VARCHAR(255) NOT NULL,
data JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (timestamp);
-- Create indexes for performance
CREATE INDEX idx_traces_flow_id ON tracing.traces(flow_id);
CREATE INDEX idx_traces_start_time ON tracing.traces(start_time);
CREATE INDEX idx_events_trace_id ON tracing.events(trace_id);
CREATE INDEX idx_events_timestamp ON tracing.events(timestamp);
CREATE INDEX idx_events_actor_id ON tracing.events(actor_id);
CREATE INDEX idx_events_type ON tracing.events(event_type);
-- GIN index for JSON queries
CREATE INDEX idx_events_data_gin ON tracing.events USING GIN(data);
Partitioning Management
#![allow(unused)] fn main() { // Automatic partition management let config = PostgresConfig { enable_partitioning: true, partition_interval: PartitionInterval::Daily, partition_retention_days: 30, auto_create_partitions: true, ..Default::default() }; }
Performance Optimization
-- Optimize PostgreSQL configuration
ALTER SYSTEM SET shared_buffers = '256MB';
ALTER SYSTEM SET effective_cache_size = '1GB';
ALTER SYSTEM SET maintenance_work_mem = '64MB';
ALTER SYSTEM SET checkpoint_completion_target = 0.9;
ALTER SYSTEM SET wal_buffers = '16MB';
ALTER SYSTEM SET default_statistics_target = 100;
SELECT pg_reload_conf();
ClickHouse Storage
When to Use
- Very high-volume trace data (millions of events per second)
- Analytical workloads and reporting
- Time-series analysis
- Long-term data retention with compression
- Real-time dashboards and monitoring
Configuration
#![allow(unused)] fn main() { use reflow_tracing::storage::ClickHouseStorage; let storage = ClickHouseStorage::new("http://localhost:8123").await?; }
Features
- Columnar storage: Excellent compression and analytical performance
- Distributed architecture: Horizontal scaling
- Real-time ingestion: Handle massive write loads
- Advanced analytics: Built-in analytical functions
- Time-series optimized: Purpose-built for time-ordered data
#![allow(unused)] fn main() { use reflow_tracing::storage::{ClickHouseStorage, ClickHouseConfig}; let config = ClickHouseConfig { url: "http://clickhouse:8123".to_string(), database: "tracing".to_string(), cluster: Some("cluster".to_string()), username: Some("default".to_string()), password: None, compression: CompressionMethod::LZ4, batch_size: 10000, flush_interval_ms: 5000, max_memory_usage: 1_000_000_000, // 1GB max_execution_time_ms: 300_000, // 5 minutes }; let storage = ClickHouseStorage::with_config(config).await?; }
Schema Design
-- Optimized ClickHouse schema
CREATE TABLE tracing.events_local ON CLUSTER cluster (
timestamp DateTime64(3),
trace_id UUID,
event_id UUID,
flow_id String,
execution_id UUID,
event_type LowCardinality(String),
actor_id String,
port String,
message_type String,
message_size UInt32,
execution_time_ns UInt64,
memory_usage UInt64,
cpu_usage Float32,
data String -- JSON as string for flexibility
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/events', '{replica}')
PARTITION BY toYYYYMM(timestamp)
ORDER BY (timestamp, trace_id, event_id)
SETTINGS index_granularity = 8192;
-- Distributed table
CREATE TABLE tracing.events ON CLUSTER cluster AS tracing.events_local
ENGINE = Distributed(cluster, tracing, events_local, rand());
-- Materialized views for aggregations
CREATE MATERIALIZED VIEW tracing.event_metrics
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (timestamp, actor_id, event_type)
AS SELECT
toStartOfMinute(timestamp) as timestamp,
actor_id,
event_type,
count() as event_count,
avg(execution_time_ns) as avg_execution_time,
max(execution_time_ns) as max_execution_time,
sum(message_size) as total_bytes
FROM tracing.events_local
GROUP BY timestamp, actor_id, event_type;
Performance Tuning
<!-- ClickHouse configuration -->
<yandex>
<profiles>
<default>
<max_memory_usage>10000000000</max_memory_usage>
<use_uncompressed_cache>1</use_uncompressed_cache>
<load_balancing>random</load_balancing>
</default>
</profiles>
<users>
<default>
<profile>default</profile>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
</default>
</users>
</yandex>
Custom Storage Implementation
Storage Trait
#![allow(unused)] fn main() { use async_trait::async_trait; use reflow_tracing::storage::{StorageBackend, StorageError}; #[async_trait] pub trait StorageBackend: Send + Sync { async fn store_trace(&self, trace: FlowTrace) -> Result<(), StorageError>; async fn get_trace(&self, trace_id: TraceId) -> Result<Option<FlowTrace>, StorageError>; async fn query_traces(&self, query: TraceQuery) -> Result<Vec<FlowTrace>, StorageError>; async fn store_event(&self, trace_id: TraceId, event: TraceEvent) -> Result<(), StorageError>; async fn get_events(&self, trace_id: TraceId) -> Result<Vec<TraceEvent>, StorageError>; async fn health_check(&self) -> Result<(), StorageError>; } }
Example: Redis Storage
#![allow(unused)] fn main() { use redis::{Client, Connection}; use reflow_tracing::storage::{StorageBackend, StorageError}; pub struct RedisStorage { client: Client, } impl RedisStorage { pub fn new(url: &str) -> Result<Self, StorageError> { let client = Client::open(url)?; Ok(Self { client }) } } #[async_trait] impl StorageBackend for RedisStorage { async fn store_trace(&self, trace: FlowTrace) -> Result<(), StorageError> { let mut conn = self.client.get_connection()?; let key = format!("trace:{}", trace.trace_id); let value = serde_json::to_string(&trace)?; redis::cmd("SET") .arg(&key) .arg(&value) .arg("EX") .arg(3600) // 1 hour TTL .query(&mut conn)?; Ok(()) } async fn get_trace(&self, trace_id: TraceId) -> Result<Option<FlowTrace>, StorageError> { let mut conn = self.client.get_connection()?; let key = format!("trace:{}", trace_id); let value: Option<String> = redis::cmd("GET") .arg(&key) .query(&mut conn)?; match value { Some(json) => Ok(Some(serde_json::from_str(&json)?)), None => Ok(None), } } // Implement other methods... } }
Storage Selection Guide
Decision Matrix
| Feature | Memory | SQLite | PostgreSQL | ClickHouse | Custom |
|---|---|---|---|---|---|
| Persistence | ❌ | ✅ | ✅ | ✅ | Depends |
| Concurrency | Medium | Low | High | Very High | Depends |
| Scale | Small | Medium | Large | Massive | Depends |
| Setup Complexity | None | Low | Medium | High | Varies |
| Query Flexibility | Limited | High | Very High | High | Depends |
| Analytics | Basic | Good | Excellent | Outstanding | Depends |
| Operational Overhead | None | Low | Medium | High | Varies |
Recommendations
Development/Testing:
#![allow(unused)] fn main() { // Quick start with memory storage let storage = MemoryStorage::new(); }
Small Production:
#![allow(unused)] fn main() { // SQLite for simple deployments let storage = SqliteStorage::new("traces.db").await?; }
Medium Production:
#![allow(unused)] fn main() { // PostgreSQL for robust applications let storage = PostgresStorage::new("postgresql://...").await?; }
Large Scale/Analytics:
#![allow(unused)] fn main() { // ClickHouse for high-volume scenarios let storage = ClickHouseStorage::new("http://clickhouse:8123").await?; }
Migration Between Backends
Export/Import Tool
#![allow(unused)] fn main() { use reflow_tracing::migration::StorageMigrator; // Migrate from SQLite to PostgreSQL let migrator = StorageMigrator::new( SqliteStorage::new("traces.db").await?, PostgresStorage::new("postgresql://...").await? ); migrator.migrate_all_traces().await?; }
Backup and Restore
#![allow(unused)] fn main() { // Backup to file let backup_path = "traces_backup.json"; storage.export_to_file(backup_path).await?; // Restore from file storage.import_from_file(backup_path).await?; }
Monitoring Storage Performance
Metrics Collection
#![allow(unused)] fn main() { use reflow_tracing::storage::StorageMetrics; let metrics = storage.get_metrics().await?; println!("Storage performance:"); println!(" Write latency: {}ms", metrics.avg_write_latency_ms); println!(" Read latency: {}ms", metrics.avg_read_latency_ms); println!(" Storage size: {}MB", metrics.storage_size_mb); println!(" Query performance: {}ms", metrics.avg_query_latency_ms); }
Health Monitoring
#![allow(unused)] fn main() { // Regular health checks tokio::spawn(async move { loop { match storage.health_check().await { Ok(_) => println!("Storage healthy"), Err(e) => eprintln!("Storage unhealthy: {}", e), } tokio::time::sleep(Duration::from_secs(30)).await; } }); }
Best Practices
- Choose Appropriate Backend: Match storage backend to your scale and requirements
- Plan for Growth: Start simple but design for scale
- Monitor Performance: Track storage metrics and query performance
- Regular Backups: Implement automated backup strategies
- Partition Large Tables: Use time-based partitioning for better performance
- Index Strategically: Create indexes for common query patterns
- Manage Retention: Implement data retention policies to control growth
- Test Disaster Recovery: Regularly test backup and restore procedures
- Optimize Queries: Use EXPLAIN to understand and optimize query performance
- Monitor Resources: Keep an eye on disk space, memory, and CPU usage