A long-running Kafka stream router (Java)
A daemon that consumes events from one Kafka topic, routes each one to a different output topic based on content, and stays up indefinitely. Same SDK as tutorial 05, opposite shape: where 05 spins up a fresh per-request graph and tears it down, this one boots a single long-running graph at startup and lets the Kafka poll loop drive ticks through it.
flowchart LR
kin([orders topic]) --> source[OrderSource]
source --> router[Router]
router -->|confirmed| s1[Sink: orders.confirmed]
router -->|confirmed| l1[Logger]
router -->|cancelled| s2[Sink: orders.cancelled]
router -->|cancelled| l2[Logger]
router -->|refunded| s3[Sink: orders.refunded]
router -->|refunded| l3[Logger]
router -->|other| s4[Sink: orders.dlq]
router -->|other| l4[Logger]
s1 --> ko1([orders.confirmed])
s2 --> ko2([orders.cancelled])
s3 --> ko3([orders.refunded])
s4 --> ko4([orders.dlq])
Each router outport feeds two downstream actors — a KafkaSink and a Logger. Reflow connectors are broadcast: every connector from a source fires for every packet on that source's outport, so adding the logger doesn't take packets away from the sinks.
What this replaces
Plain Kafka Streams would express this as a topology defined inside
a single KafkaStreams builder; routing is KStream#branch with
predicates compiled into the topology object. That works, but the
topology and the producers/consumers are coupled — the same
"split into N topics by content" shape on RabbitMQ, NATS, or in
front of a SQS-like service requires a different framework.
Reflow's wiring is independent of the transport. Swap KafkaSink
for an HTTP sink, an internal channel sink, or one of the built-in
catalog sinks; the routing topology doesn't change.
Prerequisites
Java 17+. The published JVM SDK auto-loads the native runtime — no separate cargo step:
// build.gradle.kts
plugins {
java
application
}
dependencies {
implementation("ai.offbit:reflow:0.2.7")
implementation("org.apache.kafka:kafka-clients:3.7.1")
implementation("org.slf4j:slf4j-simple:2.0.13")
}
application {
mainClass.set("ai.offbit.reflow.tutorial06.Tutorial06Application")
}
A local Kafka broker — KRaft, single-node — via
docker compose up -d:
services:
kafka:
image: apache/kafka:3.7.1
container_name: tut06-kafka
ports: ["9092:9092"]
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
CLUSTER_ID: reflow-tut06-cluster
OrderSource
A long-running consumer that publishes one Reflow message per inbound record. Two things are different from any actor we've seen so far in this series:
- The actor declares a
_triggerinport even though it has no upstream. Reflow's actor model is event-driven on input, so a pure source needs an initialFlowpacket to schedule its first run. We add that initial in the network setup. run()never returns. The Kafka poll loop runs indefinitely, and the actor publishes packets viactx.send(port, msg)— notctx.emit.
public class OrderSource extends Actor {
private final String bootstrap, topic, groupId;
private volatile boolean stopped = false;
public OrderSource(String bootstrap, String topic, String groupId) {
this.bootstrap = bootstrap; this.topic = topic; this.groupId = groupId;
}
public void stop() { stopped = true; }
@Override public String component() { return "order_source"; }
@Override public List<String> inports() { return List.of("_trigger"); }
@Override public List<String> outports() { return List.of("order"); }
@Override public void run(ActorCallContext ctx) {
Properties p = new Properties();
p.put(BOOTSTRAP_SERVERS_CONFIG, bootstrap);
p.put(GROUP_ID_CONFIG, groupId);
p.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
p.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
p.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(p)) {
consumer.subscribe(List.of(topic));
while (!stopped) {
var records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> r : records) {
ctx.send("order", Message.string(r.value()));
}
}
}
ctx.done();
}
}
Why ctx.send, not ctx.emit
ctx.emit accumulates packets in a HashMap that drains only when
ctx.done() fires. For an actor whose run() returns once per
tick that's the right model — it's the bulk-flush pattern. For a
continuously-publishing source whose run() never returns, emits
would sit in the buffer forever, never reaching downstream
connectors.
ctx.send(port, msg) writes straight to the outport channel,
bypassing the done() drain. It's how every long-running
source-actor pattern gets implemented across the SDKs (Python's
ctx.send, the new JVM ctx.send added in 0.2.6).
Router
Inspects the order's status field and emits on the matching
outport. Routing policy is one method — swap status-based for
hash-by-customer, geo-by-region, etc., by editing run().
public class Router extends Actor {
@Override public String component() { return "router"; }
@Override public List<String> inports() { return List.of("order"); }
@Override public List<String> outports() {
return List.of("confirmed", "cancelled", "refunded", "other");
}
@Override public void run(ActorCallContext ctx) {
String body = stripQuotes(ctx.inputDataJson("order"));
String status = extractStatus(body);
String port = switch (status) {
case "confirmed" -> "confirmed";
case "cancelled" -> "cancelled";
case "refunded" -> "refunded";
default -> "other";
};
ctx.emit(port, Message.string(body));
ctx.done();
}
}
ctx.emit is fine here because Router fires per tick — one input,
one output, done. The bulk-flush model fits.
Logger
Operational tap that fans off the router's outports in parallel with the KafkaSinks. Reflow's broadcast-on-source-outport semantics means we can add this without touching the routing logic — the connectors do the splitting.
public class Logger extends Actor {
private final String label;
public Logger(String label) { this.label = label; }
@Override public String component() { return "logger_" + label; }
@Override public List<String> inports() { return List.of("in"); }
@Override public List<String> outports() { return List.of(); }
@Override public void run(ActorCallContext ctx) {
String body = stripQuotes(ctx.inputDataJson("in"));
System.out.printf("[%-9s] %s%n", label, body);
ctx.done();
}
}
Output at the terminal as the router fires:
[confirmed] {"id":"a","status":"confirmed"}
[cancelled] {"id":"b","status":"cancelled"}
[refunded ] {"id":"c","status":"refunded"}
[dlq ] {"id":"d","status":"weird"}
KafkaSink
Three instances, one per output topic. Each holds a Kafka producer constructed lazily on first tick.
public class KafkaSink extends Actor {
private final String bootstrap, topic;
private volatile KafkaProducer<String, String> producer;
public KafkaSink(String bootstrap, String topic) {
this.bootstrap = bootstrap; this.topic = topic;
}
public void close() { var p = producer; if (p != null) p.close(); }
@Override public String component() { return "kafka_sink_" + topic; }
@Override public List<String> inports() { return List.of("in"); }
@Override public List<String> outports() { return List.of(); }
@Override public void run(ActorCallContext ctx) {
if (producer == null) { producer = makeProducer(); }
String body = stripQuotes(ctx.inputDataJson("in"));
producer.send(new ProducerRecord<>(topic, body));
ctx.done();
}
}
The wiring
One graph, started once at boot. The shutdown hook stops the source's poll loop and tears down the network — the JVM exits cleanly when the consumer/producer threads have drained.
public class Tutorial06Application {
public static void main(String[] args) throws Exception {
var bootstrap = System.getenv().getOrDefault("KAFKA_BOOTSTRAP", "localhost:9092");
var inputTopic = System.getenv().getOrDefault("INPUT_TOPIC", "orders");
var groupId = System.getenv().getOrDefault("GROUP_ID", "reflow-tut06");
var source = new OrderSource(bootstrap, inputTopic, groupId);
var confirmed = new KafkaSink(bootstrap, "orders.confirmed");
var cancelled = new KafkaSink(bootstrap, "orders.cancelled");
var refunded = new KafkaSink(bootstrap, "orders.refunded");
var dlq = new KafkaSink(bootstrap, "orders.dlq");
Network net = new Network();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
source.stop(); net.shutdown();
confirmed.close(); cancelled.close(); refunded.close(); dlq.close();
net.close();
}));
net.registerActor("tpl_source", source);
net.registerActor("tpl_router", new Router());
net.registerActor("tpl_sink_conf", confirmed);
net.registerActor("tpl_sink_canc", cancelled);
net.registerActor("tpl_sink_ref", refunded);
net.registerActor("tpl_sink_dlq", dlq);
net.registerActor("tpl_log_conf", new Logger("confirmed"));
net.registerActor("tpl_log_canc", new Logger("cancelled"));
net.registerActor("tpl_log_ref", new Logger("refunded"));
net.registerActor("tpl_log_dlq", new Logger("dlq"));
net.addNode("source", "tpl_source");
net.addNode("router", "tpl_router");
net.addNode("confirmed", "tpl_sink_conf");
net.addNode("cancelled", "tpl_sink_canc");
net.addNode("refunded", "tpl_sink_ref");
net.addNode("dlq", "tpl_sink_dlq");
net.addNode("log_conf", "tpl_log_conf");
net.addNode("log_canc", "tpl_log_canc");
net.addNode("log_ref", "tpl_log_ref");
net.addNode("log_dlq", "tpl_log_dlq");
net.addConnection("source", "order", "router", "order");
net.addConnection("router", "confirmed", "confirmed", "in");
net.addConnection("router", "cancelled", "cancelled", "in");
net.addConnection("router", "refunded", "refunded", "in");
net.addConnection("router", "other", "dlq", "in");
// Same outports fan to the loggers in parallel — broadcast
// means adding these doesn't reduce traffic to the sinks.
net.addConnection("router", "confirmed", "log_conf", "in");
net.addConnection("router", "cancelled", "log_canc", "in");
net.addConnection("router", "refunded", "log_ref", "in");
net.addConnection("router", "other", "log_dlq", "in");
// Source has no upstream — kick it with a Flow initial.
net.addInitial("source", "_trigger", "{\"type\":\"Flow\"}");
net.start();
Thread.currentThread().join(); // park until SIGTERM
}
}
Run it
docker compose up -d
for t in orders orders.confirmed orders.cancelled orders.refunded orders.dlq; do
docker exec tut06-kafka /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:9092 --create --topic $t \
--partitions 1 --replication-factor 1
done
gradle run & # router process
docker exec -i tut06-kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 --topic orders <<EOF
{"id":"a","status":"confirmed"}
{"id":"b","status":"cancelled"}
{"id":"c","status":"refunded"}
{"id":"d","status":"weird"}
EOF
Verify each output topic received its match:
=== orders.confirmed ===
{"id":"a","status":"confirmed"}
=== orders.cancelled ===
{"id":"b","status":"cancelled"}
=== orders.refunded ===
{"id":"c","status":"refunded"}
=== orders.dlq ===
{"id":"d","status":"weird"}
Notes on the design
- Long-running graph. No per-request setup;
Network.start()fires once at app boot, the source actor's poll loop drives ticks forever. Adding a new output topic is oneAddNode, oneAddConnection, one outport on the Router. ctx.sendfor source actors. Continuous publishers can't use the per-tickemit/donecycle —ctx.sendpushes straight to the outport channel.- Routing topology lives in
Router.run. Hash-by-customer, geo-by-region, schema-version split — all one-method changes. None of them touch the wiring or the sinks. - Transport-independence.
KafkaSinkcould beRabbitSink,HttpSink, an in-memory test sink — same graph shape. The Reflow contract is "messages on ports", not "Kafka records." - Shutdown.
source.stop()exits the poll loop on next iteration (while (!stopped));producer.close()flushes the outbound Kafka batch. The JVM exits when both drain.
What is next
The next post takes the SDK in a different
direction — back to Python, but with a triggered-batch lifecycle
and a workflow assembled almost entirely from catalog templates
(api_github_*, tpl_loop, tpl_switch, api_slack_send_message).
Reads, decides, takes action.