Parallel data enrichment behind a Spring Boot endpoint (Java)
A REST service that enriches a product SKU by fanning out to three slow downstream services in parallel, joining the results, and returning a merged JSON payload. Per-request Reflow network — same shape as tutorial 04, different convergence point: where the Go gRPC tutorial fans out into a streaming response, this one fans out and back in into a single response.
What this replaces
The vanilla Spring version is the standard CompletableFuture
chain:
@PostMapping("/enrich")
public Map<String, Object> enrich(@RequestBody EnrichRequest req) {
var inv = CompletableFuture.supplyAsync(() -> inventory(req.sku()));
var price = CompletableFuture.supplyAsync(() -> price(req.sku()));
var reviews = CompletableFuture.supplyAsync(() -> reviews(req.sku()));
CompletableFuture.allOf(inv, price, reviews).join();
return Map.of(
"inventory", inv.join(),
"price", price.join(),
"reviews", reviews.join());
}
It works. But every dependency is implicit in the allOf argument
list, the executor is pulled from somewhere — usually the common
ForkJoinPool, which is the wrong choice for blocking I/O — and
adding a fourth service means editing four places.
The Reflow version is a graph:
flowchart LR
req([POST /enrich]) --> split[Splitter]
split -->|inv| inv[InventoryActor]
split -->|price| pri[PriceActor]
split -->|reviews| rev[ReviewsActor]
inv --> merge[Merger]
pri --> merge
rev --> merge
merge -.complete.-> req
Merger declares awaitAllInports = true — it fires once when each
distinct inport has a packet. That's Reflow's allOf().join().
Adding a fourth service is one AddNode and two AddConnections.
Backpressure is automatic on every edge.
Prerequisites
Java 17+ and Gradle. Add the JVM SDK as a Maven dependency — the published artifact bundles the native runtime for every supported platform, no manual build:
// build.gradle.kts
plugins {
java
id("org.springframework.boot") version "3.3.5"
id("io.spring.dependency-management") version "1.1.6"
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("ai.offbit:reflow:0.2.7")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
}
Splitter
Reads the SKU and broadcasts it on three named outports — one per downstream service. Reflow connectors are broadcast: every connector from a source fires for every packet on that source's outport. To route the same SKU to three different consumers we declare one outport per consumer and emit on each.
public class Splitter extends Actor {
@Override public String component() { return "splitter"; }
@Override public List<String> inports() { return List.of("sku"); }
@Override public List<String> outports() { return List.of("inv", "price", "reviews"); }
@Override public void run(ActorCallContext ctx) {
String sku = stripQuotes(ctx.inputDataJson("sku"));
ctx.emit("inv", Message.string(sku));
ctx.emit("price", Message.string(sku));
ctx.emit("reviews", Message.string(sku));
ctx.done();
}
}
ctx.inputDataJson("sku") returns the bare JSON payload for the
named inport — a primitive scalar in JSON form ("WIDGET-42"),
ready to use without unwrapping the runtime's {type, data}
envelope. stripQuotes peels the JSON quoting off the string scalar.
Service stubs
Three stand-ins for slow I/O. Each sleeps for a different duration so the test can confirm the wall-clock dominator is the slowest branch (220 ms), not the sum of all three (550 ms).
public class InventoryActor extends Actor {
@Override public String component() { return "inventory"; }
@Override public List<String> inports() { return List.of("sku"); }
@Override public List<String> outports() { return List.of("out"); }
@Override public void run(ActorCallContext ctx) {
String sku = ctx.inputDataJson("sku");
sleep(150);
long stock = (long) (stripped(sku).length() * 7);
String json = String.format("{\"sku\":%s,\"stock\":%d}", sku, stock);
ctx.emit("out", Message.fromJson(
"{\"type\":\"Object\",\"data\":" + json + "}"));
ctx.done();
}
}
PriceActor and ReviewsActor follow the same shape with different
sleeps and payload fields.
Merger
Joins the three branches. awaitAllInports = true flips the
runtime's tick policy: instead of firing on any input, it waits
until every declared inport has a packet, then fires once. The
controller passes in a CompletableFuture<String>; completing it
signals the request handler to return.
public class Merger extends Actor {
private final CompletableFuture<String> done;
public Merger(CompletableFuture<String> done) { this.done = done; }
@Override public String component() { return "merger"; }
@Override public List<String> inports() {
return List.of("inventory", "price", "reviews");
}
@Override public List<String> outports() { return List.of(); }
@Override public boolean awaitAllInports() { return true; }
@Override public void run(ActorCallContext ctx) {
String inv = ctx.inputDataJson("inventory");
String price = ctx.inputDataJson("price");
String reviews = ctx.inputDataJson("reviews");
String merged = String.format(
"{\"inventory\":%s,\"price\":%s,\"reviews\":%s}",
inv, price, reviews);
done.complete(merged);
ctx.done();
}
}
ctx.inputDataJson(port) is the JVM SDK's per-port JSON accessor —
no need to scan the full inputsJson() envelope to find one branch's
data.
The controller
One handler. Per-request network in a try-with-resources block — when the response returns or the timeout fires, the network shuts down cleanly. No shared state between requests.
@RestController
public class EnrichController {
public record EnrichRequest(String sku) {}
@PostMapping(value = "/enrich",
consumes = MediaType.APPLICATION_JSON_VALUE,
produces = MediaType.APPLICATION_JSON_VALUE)
public String enrich(@RequestBody EnrichRequest req) throws Exception {
var done = new CompletableFuture<String>();
try (var net = new Network()) {
net.registerActor("tpl_split", new Splitter());
net.registerActor("tpl_inv", new InventoryActor());
net.registerActor("tpl_price", new PriceActor());
net.registerActor("tpl_reviews", new ReviewsActor());
net.registerActor("tpl_merge", new Merger(done));
net.addNode("split", "tpl_split");
net.addNode("inv", "tpl_inv");
net.addNode("price", "tpl_price");
net.addNode("reviews", "tpl_reviews");
net.addNode("merge", "tpl_merge");
net.addConnection("split", "inv", "inv", "sku");
net.addConnection("split", "price", "price", "sku");
net.addConnection("split", "reviews", "reviews", "sku");
net.addConnection("inv", "out", "merge", "inventory");
net.addConnection("price", "out", "merge", "price");
net.addConnection("reviews", "out", "merge", "reviews");
net.addInitial("split", "sku",
"{\"type\":\"String\",\"data\":\"" + req.sku() + "\"}");
net.start();
return done.get(5, TimeUnit.SECONDS);
}
}
}
Network implements AutoCloseable, so the try-with-resources
wraps the whole graph lifecycle. done.get(...) blocks the request
thread until Merger completes the future or the timeout fires.
Run it
cd sdk/jvm/examples/tutorial-05-spring-enrich
gradle bootRun
# in another terminal
curl -s localhost:8080/enrich \
-H 'content-type: application/json' \
-d '{"sku":"WIDGET-42"}' | jq
Output:
{
"inventory": {"sku": "WIDGET-42", "stock": 63},
"price": {"amount": 14.49, "currency": "USD", "sku": "WIDGET-42"},
"reviews": {"avg": 3.5, "count": 27, "sku": "WIDGET-42"}
}
The repo includes an EnrichTest that boots the full Spring context
via @SpringBootTest and asserts the merged response shape — covers
the full per-request lifecycle:
gradle test
Notes on the design
- Per-request lifecycle.
try (var net = new Network())makes the graph a request-scoped object. No process-wide actor state to clean up; no leaks between requests. awaitAllInports. The fan-in barrier. Unlike aCompletableFuture.allOfjoin, the merger doesn't have to know how many upstream sources there are — it just declares the ports it cares about, and the runtime tracks completion per inport.- Backpressure. Every connector is a bounded channel. If you swap one of the stubs for a real downstream and the merger ends up faster than the network, the runtime throttles upstream automatically.
- Adding a service. One
AddNode, twoAddConnections, one inport on the merger. The handler shape doesn't change. - Routing topology lives in the wiring. The splitter is a template; you can change "fan to all three" to "hash by SKU prefix" by editing one method, no controller changes.
What is next
The next post takes the same SDK to a long-running shape: a Kafka stream router where the graph stays up indefinitely, consuming from one topic and routing events into N output topics by content.