Live edits over a stream
Tutorial 01 paced a graph at the browser's animation-frame rate. This one drives a graph from inbound network data — Wikimedia's public Server-Sent Events feed of Wikipedia edits. Same actor model, the trigger comes from outside instead of from a local clock.
What we are building
flowchart LR
sse([Wikimedia SSE]) -->|event| source[source]
source -->|event| filter[substantive?]
filter -->|event| display[display]
Three actors:
sourceopens anEventSourceto Wikimedia and emits each parsed JSON event.filterkeeps edits to en.wikipedia articles by humans whose byte-delta is at least ±200 (skip stubs and typo fixes).displayputs each surviving event at the top of a list on the page.
Setup
One file in any directory:
<!doctype html>
<meta charset="utf-8">
<title>Live Wikipedia edits</title>
<style>
body { margin: 0; background: #0b1020; color: #c9d2e6;
font: 14px/1.5 system-ui; padding: 24px 32px; }
ol { list-style: none; padding: 0; max-width: 720px; }
li { padding: 8px 12px; margin: 6px 0; background: #131a30; border-radius: 4px; }
</style>
<ol id="feed"></ol>
<script type="module">
import { ready, Network, Actor, Message }
from "https://esm.sh/@offbit-ai/reflow";
await ready();
// the rest goes here
</script>
The Wikimedia stream serves CORS-permissive headers, so a static file server is enough.
The actors
Source
Owns an EventSource and bridges it into the graph via an internal
queue. Events arrive whenever the network pushes them; the actor
emits one per run(ctx) call.
class Source extends Actor {
static inports = ["tick"];
static outports = ["tick", "event"];
constructor(url) {
super();
this.queue = [];
this.resume = null;
const es = new EventSource(url);
es.addEventListener("message", (e) => {
try {
this.queue.push(JSON.parse(e.data));
this.resume?.();
this.resume = null;
} catch { /* drop malformed lines */ }
});
}
run(ctx) {
const send = () => {
ctx.send({
event: Message.object(this.queue.shift()),
tick: Message.flow(),
});
ctx.done();
};
if (this.queue.length) send();
else this.resume = send;
}
}
The tick outport is wired back to the tick inport (set up
below), so each successful run schedules the next one. We seed
the loop with one initial Flow on tick at startup.
Two run states. If the queue has events, drain one and emit a fresh
tick. If the queue is empty, park the run by stashing the
continuation in this.resume; the next inbound EventSource message
calls it. The packed send({event, tick}) keeps the self-loop alive.
This pattern — actor-as-source with a queue and a tick self-loop —
plugs any push-based input (sockets, EventSource, observers, native
events) into a Reflow graph. The runtime drains the queue at the
rate downstream consumers can keep up with. If display falls
behind, the queue grows; the rest of the graph keeps running.
Filter
A pure transform. Receives an event, checks a predicate, forwards if it passes.
class Filter extends Actor {
static inports = ["event"];
static outports = ["event"];
constructor(predicate) { super(); this.predicate = predicate; }
run(ctx) {
const event = ctx.input.event?.data;
if (event && this.predicate(event)) {
ctx.send({ event: Message.object(event) });
}
ctx.done();
}
}
The predicate is injected at construction. The same Filter class
works in any pipeline.
Display
Renders. Each event becomes one <li> at the top of the list,
capped at 50 entries.
class Display extends Actor {
static inports = ["event"];
static outports = [];
constructor(list, max = 50) {
super();
this.list = list;
this.max = max;
}
run(ctx) {
const e = ctx.input.event?.data;
if (e) {
const li = document.createElement("li");
const delta = (e.length?.new ?? 0) - (e.length?.old ?? 0);
li.textContent = `${delta >= 0 ? "+" : ""}${delta} ${e.title} — ${e.user}`;
this.list.prepend(li);
while (this.list.children.length > this.max) this.list.lastChild.remove();
}
ctx.done();
}
}
Wiring
const STREAM = "https://stream.wikimedia.org/v2/stream/recentchange";
const substantive = (e) =>
e.wiki === "enwiki" &&
e.namespace === 0 &&
!e.bot &&
Math.abs((e.length?.new ?? 0) - (e.length?.old ?? 0)) >= 200;
const net = new Network();
net.addNode("source", "tpl_wikipedia_source");
net.addNode("filter", "tpl_substantive");
net.addNode("display", "tpl_display");
net.addConnection("source", "tick", "source", "tick"); // self-loop
net.addConnection("source", "event", "filter", "event");
net.addConnection("filter", "event", "display", "event");
net.registerActor("tpl_wikipedia_source", new Source(STREAM));
net.registerActor("tpl_substantive", new Filter(substantive));
net.registerActor("tpl_display", new Display(document.getElementById("feed")));
net.addInitial("source", "tick", Message.flow());
await net.start();
addInitial wakes the source for its first run; the EventSource
drives everything after that.
Run it
npx serve .
Open the page. Substantive edits scroll in within a few seconds. Click a title to open the article.
The full runnable example is at sdk/node/examples/tutorial-02-live-edits.
Notes on the design
- The graph topology is identical to a clock-driven graph; only the source actor changes. A user-driven source (click handler feeding a queue) follows the same shape.
- Swapping the predicate changes what surfaces. Drop
enwikito see every language; dropnamespace === 0to include talk pages; adde.user.includes("bot")to watch bots. No other code changes.
What is next
The next tutorial moves to Python. Reflow runs three LLM agents in parallel against a local Ollama model and a synthesizer combines their findings — same actor primitives, server-side, with streaming output through the graph.