A shared log library for Java 21 that provides a single append-only, totally-ordered log as the source of truth for your entire system. Stateless executors derive all their state by replaying the log, process it, and write new entries back — making every component crash-safe and independently restartable with no coordination.
Inspired by Boki (SOSP 2021), Gumbo brings the shared log primitive to Java with a focus on simplicity, flexibility, and production readiness. See Boki Comparison for details on how Gumbo relates to and differs from Boki.
- Single source of truth: All state derived from an append-only log
- Crash-safe by design: Replay the log to recover exact state after any failure
- Flexible deployment: Single-node (zero dependencies) to multi-node (FoundationDB)
- Type-safe API:
TypedLogView<T>with pluggable serialization (Kryo included) - Virtual threads: Lightweight concurrency for executors and subscriptions
- Pluggable persistence: In-memory, file-based, or FoundationDB backends
- Tag-based filtering: Multiple logical streams in one physical log
- Stateless executors: Functional fold pattern for deterministic state rebuilding
// 1. Open the log with in-memory storage
SharedLogConfig config = SharedLogConfig.builder()
.persistenceAdapter(new InMemoryPersistenceAdapter())
.build();
try (SharedLogService log = SharedLogService.open(config)) {
LogTag orders = LogTag.of("orders");
// 2. Append
AppendResult r = log.append(AppendRequest.to(orders, "order-placed".getBytes())).join();
System.out.println("seqnum=" + r.seqnum() + " localId=" + r.localId());
// 3. Read
List<LogEntry> entries = log.readAll(orders).join();
// 4. Subscribe (runs listener on a virtual thread)
SharedLog.Subscription sub = log.subscribe(orders, LogPosition.BEGINNING, entry ->
System.out.println("received: " + new String(entry.data())));
// 5. Use a view
LogView view = log.getView(orders);
view.append("order-confirmed".getBytes()).join();
sub.close();
}Switch to file-backed storage:
SharedLogConfig config = SharedLogConfig.builder()
.persistenceAdapter(new FileBasedPersistenceAdapter("/var/data/myapp"))
.build();- Installation
- Core concepts
- API overview
- Persistence adapters
- Sequencers
- Typed log views
- Executor model
- Actor checkpoints
- Virtual threads
- Architecture
- Module map
- Data model
- Examples
- Building
- Further reading
Gumbo is distributed via JitPack. No local build required.
Add the JitPack repository to your pom.xml:
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>Then add the dependency:
<dependency>
<groupId>com.github.CajunSystems</groupId>
<artifactId>gumbo</artifactId>
<version>0.2.0</version>
</dependency>repositories {
maven { url 'https://jitpack.io' }
}
dependencies {
implementation 'com.github.CajunSystems:gumbo:0.2.0'
}Note: Gumbo requires Java 21+.
Every meaningful event in the system is represented as an immutable LogEntry. Nothing
is stored anywhere else. If you need the current state of an entity, you read all log
entries for that entity's tag and fold them together. This is exactly how event sourcing
and CQRS work, but with the log itself as the only durable store.
time ──────────────────────────────────────────────────────────▶
seqnum 0 1 2 3 4
│ │ │ │ │
tag orders inventory orders orders inventory
│ │ │ │ │
data placed -10 units confirmed shipped +20 units
Any reader who wants the state of orders filters to seqnums {0, 2, 3} and replays.
Any reader who wants inventory gets {1, 4}. The global order is preserved; tag
filtering is a read-time projection.
A LogTag has a namespace (entity type) and an optional key (entity instance):
LogTag allOrders = LogTag.of("orders"); // namespace-wide
LogTag singleOrder = LogTag.of("orders", "order-42"); // instance-scopedOne log entry can carry multiple tags so it is visible from several views:
AppendRequest.to(
Set.of(LogTag.of("orders"), LogTag.of("orders", "order-42")),
payload);A LogView is a lightweight, tag-scoped window over the shared log. It supports
reads, appends (which write back to the underlying log with the view's tag included),
and push subscriptions.
Every entry has two identifiers:
seqnum— globally unique, monotonically increasing across all tags. Provides total ordering. Assigned by theSequencer.localId— monotonically increasing within a tag's stream. Useful as a per-entity cursor (equivalent to Boki'slocalid).
In Boki, seqnum = [logspace_id:32 | position:32] encodes the physical log shard.
Here it is a plain long from an AtomicLong; a distributed implementation would
encode the node and term in the high bits.
Gumbo provides three levels of API abstraction:
The main interface for interacting with the shared log:
SharedLogService log = SharedLogService.open(config);
// Append entries
AppendResult result = log.append(AppendRequest.to(tag, data)).join();
// Read entries
List<LogEntry> all = log.readAll(tag).join();
List<LogEntry> from = log.readFrom(tag, position, maxEntries).join();
// Subscribe to new entries
Subscription sub = log.subscribe(tag, position, entry -> {
// Process entry on virtual thread
});
// Batch operations
List<AppendResult> results = log.appendBatch(requests).join();
// Trim old entries
log.trim(upToSeqnum).join();A lightweight view over a specific tag:
LogView view = log.getView(LogTag.of("orders"));
// Append (automatically includes the view's tag)
view.append(data).join();
// Read operations
List<LogEntry> entries = view.readAll().join();
LogEntry next = view.readNextAfter(seqnum).join();
LogEntry prev = view.readPrevBefore(seqnum).join();
long tail = view.checkTail().join();
// Subscribe to new entries only (no backlog)
Subscription sub = view.subscribeTail(entry -> process(entry));
// Durable KV storage (for checkpoints)
view.setValue("checkpoint", bytes).join();
byte[] saved = view.getValue("checkpoint").join();Eliminates byte[] handling with automatic serialization:
LogSerializer<OrderEvent> serializer = new KryoLogSerializer<>(OrderEvent.class);
TypedLogView<OrderEvent> view = log.getTypedView(tag, serializer);
// Append domain objects directly
view.append(new OrderEvent("order-42", "placed")).join();
// Read typed objects
List<OrderEvent> events = view.readAll().join();
// Typed subscriptions
Subscription sub = view.subscribeTail(event ->
System.out.println("Order: " + event.orderId()));// Tag: identifies a logical stream
LogTag tag = LogTag.of("orders"); // namespace-wide
LogTag tag = LogTag.of("orders", "order-42"); // instance-scoped
// Entry: immutable log record
record LogEntry(
long seqnum, // global sequence number
long localId, // per-tag sequence number
Set<LogTag> tags, // tags this entry belongs to
byte[] data, // payload
long timestamp // append timestamp
)
// Position: read cursor
LogPosition pos = new LogPosition(seqnum);
LogPosition.BEGINNING // start from seqnum 0
LogPosition.END // start from latest
// Append request
AppendRequest req = AppendRequest.to(tag, data);
AppendRequest req = AppendRequest.to(Set.of(tag1, tag2), data); // multi-tag
// Append result
record AppendResult(
long seqnum,
long localId,
LogTag primaryTag,
long timestamp
)┌─────────────────────────────────────────────────────────────────────┐
│ SharedLogService │
│ │
│ append(AppendRequest) │
│ │ │
│ ├── writeLock.lock() │
│ ├── sequencer.next() ←── LocalSequencer (AtomicLong) │
│ ├── localIdCounters[tag]++ │
│ ├── new LogEntry(seqnum, localId, tags, data, now) │
│ ├── persistenceAdapter.append(entry) │
│ └── notifySubscribers(entry) ──▶ [virtual thread per listener] │
│ │
│ read(tag, from, max) │
│ └── persistenceAdapter.readByTag(tag, from.seqnum()) │
│ (lock-free, concurrent) │
│ │
│ getView(tag) ──▶ DefaultLogView │
│ ├── readNextAfter(minSeqnum) ← Boki SharedLogReadNext │
│ ├── readPrevBefore(maxSeqnum) ← Boki SharedLogReadPrev │
│ └── checkTail() ← Boki SharedLogCheckTail │
└─────────────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌──────────────────────┐ ┌──────────────────────────────────────┐
│ PersistenceAdapter │ │ ExecutorEngine │
│ (pluggable) │ │ │
│ │ │ register(Executor<S>) │
│ ┌────────────────┐ │ │ start() → one virtual thread each │
│ │ InMemory │ │ │ │
│ │ (test/dev) │ │ │ ┌──────────────────────────────┐ │
│ └────────────────┘ │ │ │ ExecutorRunner<S> │ │
│ │ │ │ │ │
│ ┌────────────────┐ │ │ │ 1. buildFullState() │ │
│ │ FileBased │ │ │ │ readAll(inputTag) │ │
│ │ (production) │ │ │ │ fold: apply(state, e)* │ │
│ │ │ │ │ │ │ │
│ │ log.dat │ │ │ │ 2. subscribe(inputTag) │ │
│ │ index.dat │ │ │ │ park on inbox.poll() │ │
│ │ trim.dat │ │ │ │ │ │
│ └────────────────┘ │ │ │ 3. on new entries: │ │
└──────────────────────┘ │ │ apply(state, entry)* │ │
│ │ execute(state, ctx) │ │
│ │ → List<AppendRequest> │ │
│ │ append each to log │ │
│ └──────────────────────────────┘ │
└──────────────────────────────────────┘
- Caller invokes
sharedLog.append(request)— returns aCompletableFuture. - The async pool submits the work to a virtual-thread executor.
- Inside the write lock:
sequencer.next()issues the next seqnum; the per-taglocalIdcounter increments; aLogEntryis constructed. persistenceAdapter.append(entry)persists durably (fsync for file adapter).- Active subscribers for the entry's tags are notified on individual virtual threads.
AppendResult(seqnum, localId, primaryTag, timestamp)is returned.
Reads are lock-free. The persistence adapter's in-memory indices (ConcurrentSkipListMap)
serve tag-scoped reads without touching the write lock.
src/main/java/com/cajunsystems/gumbo/
│
├── core/ Pure value types — no dependencies
│ ├── LogTag.java Logical stream identifier (namespace + key)
│ ├── LogEntry.java Immutable log record (seqnum, localId, tags, data)
│ ├── LogPosition.java Read cursor (wraps seqnum)
│ ├── AppendResult.java Result of a successful append
│ └── AppendRequest.java Payload + target tags for an append call
│
├── api/ Interfaces — depend only on core/
│ ├── SharedLog.java Main log interface (append/read/subscribe/trim)
│ ├── LogView.java Tag-scoped read/append window (byte[])
│ ├── TypedLogView.java Type-safe wrapper over LogView (uses LogSerializer)
│ ├── Executor.java Stateless executor interface (fold + execute)
│ └── ExecutorContext.java Runtime context injected into execute()
│
├── serialization/ Pluggable object ↔ byte[] conversion
│ ├── LogSerializer.java Interface — serialize(T) / deserialize(byte[])
│ └── KryoLogSerializer.java Kryo 5 implementation with internal Pool<Kryo>
│
├── persistence/ Storage back-ends
│ ├── PersistenceAdapter.java Interface — open/close/append/appendBatch/read/trim
│ ├── InMemoryPersistenceAdapter.java
│ ├── FileBasedPersistenceAdapter.java WAL + index + trim files; group-commit via appendBatch
│ ├── BatchingPersistenceAdapter.java Batches writes → one fdatasync per N entries
│ └── FoundationDBPersistenceAdapter.java FDB-backed; distributed durability, optional dep
│
├── sequencer/ Sequence-number generation
│ ├── Sequencer.java Interface — next() / current()
│ ├── LocalSequencer.java AtomicLong; advanceTo() for post-crash reseeding
│ └── FoundationDBSequencer.java Distributed; FDB read-modify-write transaction per seqnum
│
└── service/ Wiring
├── SharedLogConfig.java Builder-pattern configuration
├── SharedLogService.java Main implementation; getTypedView() factory
├── DefaultLogView.java LogView backed by SharedLogService
├── DefaultTypedLogView.java TypedLogView backed by LogView + LogSerializer
└── ExecutorEngine.java Runs Executor<S> instances on virtual threads
┌──────────┬──────────┬──────────────┬──────────┬──────────────────────────┬──────────┬───────────┬──────────┐
│ MAGIC │ seqnum │ timestamp │ localId │ tags (variable) │ dataLen │ data │ CRC32 │
│ 4 bytes │ 8 bytes │ 8 bytes │ 8 bytes │ see below │ 4 bytes │ N bytes │ 4 bytes │
│ 0xC0FFEE42│ big-end │ millis epoch │ big-end │ │ big-end │ │ │
└──────────┴──────────┴──────────────┴──────────┴──────────────────────────┴──────────┴───────────┴──────────┘
Tag (per-tag block):
┌──────────────┬──────────────────┬──────────┬──────────────────┐
│ nsLen 2 B │ namespace UTF-8 │ keyLen │ key UTF-8 │
└──────────────┴──────────────────┴──────────┴──────────────────┘
Index file (index.dat): [seqnum : 8 bytes][fileOffset : 8 bytes] × N entries
Trim file (trim.dat): [trimSeqnum : 8 bytes] (single value, atomically replaced)
The index file provides O(log n) seek-to-seqnum without scanning the log. On startup, if the index is absent or truncated the adapter falls back to a full log scan to rebuild it (same recovery path Boki uses when an engine node restarts cold).
Backed by a ConcurrentSkipListMap<seqnum, LogEntry> plus per-tag
ConcurrentSkipListMap<seqnum, localId> indices. Fully concurrent reads; writes are
externally serialised by SharedLogService's write lock. All data is lost on JVM
exit. Use for unit tests and local development.
Append-only binary WAL (log.dat) with:
- Durability:
FileChannel.force(false)(fdatasync) after every entry and every index record. - Global index (
index.dat):[seqnum:8][offset:8]appended after each write; loaded entirely into memory on open for O(log n) random access. - Trim (
trim.dat): The trim seqnum is written to a temp file and atomically renamed, so a crash during trim never corrupts the log. - Crash recovery: On open, loads the index file; if it is missing or not a
multiple of 16 bytes, falls back to scanning
log.datand rebuilding from scratch. Per-tag in-memory indices are always rebuilt from the verified log entries.
Wraps any PersistenceAdapter and accumulates writes in memory, flushing to
the delegate as a single appendBatch() call. With FileBasedPersistenceAdapter
as the delegate, N entries cost 2 fdatasync calls instead of 2N:
PersistenceAdapter base = new FileBasedPersistenceAdapter("/var/data");
PersistenceAdapter batching = BatchingPersistenceAdapter.of(base); // 64 entries / 10 ms defaults
// or with explicit tuning:
PersistenceAdapter batching = new BatchingPersistenceAdapter(base, 128, 5 /* ms */);
SharedLogConfig config = SharedLogConfig.builder()
.persistenceAdapter(batching)
.build();| Parameter | Default | Effect |
|---|---|---|
maxBatchSize |
64 | Flush immediately when pending entries reach this count |
maxDelayMs |
10 ms | Background virtual-thread fires and flushes after this interval |
Durability tradeoff: entries appended but not yet flushed are in memory only.
A JVM crash within the delay window loses those entries. For strict durability
keep the default FileBasedPersistenceAdapter (1 fdatasync per entry) or use
BatchingPersistenceAdapter with maxDelayMs=0 / maxBatchSize=1.
See docs/PERSISTENCE_EVOLUTION.md for a full analysis of the persistence roadmap including memory-mapped files and io_uring.
Backed by FoundationDB, this adapter replaces the local WAL with a distributed, replicated key-value store. It is the recommended persistence back-end for multi-node or production-scale deployments.
{root} / "log" / seqnum → entry bytes
{root} / "tag" / namespace / key / seqnum → localId (8 bytes)
{root} / "meta" / "latest" → latestSeqnum (8 bytes)
{root} / "meta" / "trim" → trimSeqnum (8 bytes)
{root} / "meta" / "tagcount" / namespace / key → localIdCount (8 bytes)
FDB's tuple-layer key encoding preserves ordering, so range reads over the log
subspace naturally return entries in seqnum order without a separate index file.
// Minimal — uses the default fdb.cluster file
PersistenceAdapter fdb = new FoundationDBPersistenceAdapter();
SharedLogConfig config = SharedLogConfig.builder()
.persistenceAdapter(fdb)
.build();For true multi-node operation, use FoundationDBSequencer alongside the adapter.
Sharing a single Database instance avoids opening two connections:
FDB fdb = FDB.selectAPIVersion(730);
Database db = fdb.open("/etc/foundationdb/fdb.cluster");
// Both components share the same connection
FoundationDBSequencer seq = new FoundationDBSequencer(db, "myapp");
FoundationDBPersistenceAdapter store = new FoundationDBPersistenceAdapter(db, "myapp");
seq.open();
store.open();
SharedLogConfig config = SharedLogConfig.builder()
.sequencer(store) // seqnums assigned by FDB — safe across nodes
.persistenceAdapter(store)
.build();
fdb-javais an optional Maven dependency. It pulls in a platform-specific native library (libfdb_c.soon Linux). Projects that do not use the FDB adapter are not affected.
| Scenario | Recommendation |
|---|---|
| Single-node service, moderate write rate | BatchingPersistenceAdapter wrapping FileBasedPersistenceAdapter — simpler, lower latency |
| Multiple writer processes / nodes sharing one log | FoundationDBPersistenceAdapter + FoundationDBSequencer — only option that is safe |
| Need cross-datacenter replication | FDB — configure FDB's built-in DR replication |
| Write latency is the primary concern (< 1 ms P99) | Local file adapter — FDB adds a network round-trip (~1–5 ms) per commit |
| Log data must outlive the JVM process and survive node failure | FDB — 3-way replication by default |
| Operational simplicity (no cluster to manage) | Local file adapter |
| FaaS / serverless: any node can be the writer | FDB — distributed seqnum assignment is mandatory |
| Configuration | Approximate writes/s |
|---|---|
Single append() (one FDB transaction each) |
~200–1 000 |
appendBatch(64 entries) (one FDB transaction) |
~12 000–64 000 |
BatchingPersistenceAdapter wrapping FDB adapter |
Same as appendBatch above |
FDB transactions have a 10 MB write limit; appendBatch automatically chunks
batches that would exceed 8 MB.
- FoundationDB cluster running and reachable (default:
/etc/foundationdb/fdb.cluster) libfdb_c.so7.x installed on every JVM host (apt install foundationdb-clientson Debian/Ubuntu)- Java binding:
org.foundationdb:fdb-java:7.3.43(already inpom.xmlas optional)
public class RocksDbPersistenceAdapter implements PersistenceAdapter {
@Override public void open() throws IOException { /* ... */ }
@Override public void close() throws IOException { /* ... */ }
@Override public void append(LogEntry entry) throws IOException { /* ... */ }
@Override public List<LogEntry> readAll() throws IOException { /* ... */ }
@Override public List<LogEntry> readFrom(long fromSeqnum) throws IOException { /* ... */ }
@Override public List<LogEntry> readByTag(LogTag tag, long fromSeqnum) throws IOException { /* ... */ }
@Override public void trim(long upToSeqnum) throws IOException { /* ... */ }
@Override public long getLatestSeqnum() { /* ... */ }
@Override public long getLocalIdCountForTag(LogTag tag) { /* ... */ }
}
SharedLogConfig config = SharedLogConfig.builder()
.persistenceAdapter(new RocksDbPersistenceAdapter(path))
.build();The Sequencer interface has two implementations:
AtomicLong-backed. Suitable for any single-process deployment. Starts at
0 by default; call advanceTo(latestSeqnum + 1) after a restart to avoid
reusing seqnums.
// SharedLogService does this automatically on startup
sequencer.advanceTo(persistenceAdapter.getLatestSeqnum() + 1);Uses a read-modify-write FDB transaction to atomically claim the next seqnum. FDB's optimistic concurrency control serialises concurrent callers across all nodes transparently — conflicting transactions are retried by the FDB client.
FoundationDBSequencer seq = new FoundationDBSequencer("/etc/foundationdb/fdb.cluster", "myapp");
seq.open();
// next() — claims one seqnum, blocks for one FDB round-trip (~1–5 ms)
long seqnum = seq.next();
// currentGlobal() — reads the authoritative global counter without claiming
long highWatermark = seq.currentGlobal();When to use FoundationDBSequencer:
- Multiple writer processes share the same log —
LocalSequencerwould produce duplicate seqnums across processes. - You want the sequencer's counter to survive JVM restarts without manual
advanceToreseeding (it is stored durably in FDB). - You are already using
FoundationDBPersistenceAdapterand want the entire write path to be distributed.
Batch seqnum reservation: FoundationDBSequencer implements
Sequencer.nextBatch(int count), which claims count seqnums in a single
FDB transaction. SharedLog.appendBatch(List<AppendRequest>) uses this
automatically — an N-entry write costs 2 FDB round-trips total (1 seqnum claim
- 1 data commit) instead of N + 1. This is the same optimisation Boki uses in its metalog to prevent the sequencer from becoming a throughput bottleneck.
Every method in SharedLog and LogView works with raw byte[]. A
TypedLogView<T> wraps a LogView and uses a LogSerializer<T> to
transparently handle serialization so call sites never touch byte[]:
// 1. Pick a serializer
LogSerializer<OrderEvent> s = new KryoLogSerializer<>(OrderEvent.class);
// 2. Get a typed view
TypedLogView<OrderEvent> view = service.getTypedView(LogTag.of("orders"), s);
// 3. Append domain objects directly — no .getBytes()
view.append(new OrderEvent("ord-42", "placed")).join();
// 4. Read back typed objects — no new String(e.data())
List<OrderEvent> events = view.readAll().join();Subscriptions are also typed:
SharedLog.Subscription sub = view.subscribeTail(event ->
System.out.println("received: " + event.orderId()));LogSerializer<T> is a single-responsibility interface with two methods:
public interface LogSerializer<T> {
byte[] serialize(T value);
T deserialize(byte[] data);
}Plug in any format — JSON, Protobuf, Avro — by implementing this interface.
KryoLogSerializer<T> ships out of the box:
// Default (no registration required — works for most types and records)
new KryoLogSerializer<>(MyClass.class)
// Pre-registered (smaller wire format, faster)
new KryoLogSerializer<>(MyClass.class, kryo -> {
kryo.register(MyClass.class, 10);
kryo.register(MyEnum.class, 11);
})Kryo instances are pooled internally so the serializer is fully thread-safe.
An actor built on gumbo maps to a LogTag inbox. After startup it must replay
messages it missed (backlog), then switch seamlessly to live delivery.
Three APIs make this efficient:
| API | What it does | Complexity |
|---|---|---|
logView.getLatestSeqnum() |
Highest seqnum in the tag's log | O(1) |
logView.readFrom(position, max) |
Read entries starting at a seqnum | O(K) |
logView.subscribeTail(listener) |
Deliver only future entries, no backlog | O(1) |
logView.setValue(key, value) |
Persist a durable KV checkpoint | O(1) |
logView.getValue(key) |
Read a durable KV checkpoint | O(1) |
LogView inbox = service.getView(LogTag.of("actor", "orders"));
// 1. Read saved checkpoint (null on first start)
byte[] saved = inbox.getValue("checkpoint").join();
long resumeFrom = saved != null
? ByteBuffer.wrap(saved).getLong() + 1 // resume after last processed seqnum
: 0L;
// 2. Replay the backlog from checkpoint
List<LogEntry> backlog = inbox.readFrom(new LogPosition(resumeFrom), 1000).join();
for (LogEntry e : backlog) {
process(e);
inbox.setValue("checkpoint", ByteBuffer.allocate(8).putLong(e.seqnum()).array()).join();
}
// 3. Switch to live delivery (no backlog, no polling)
SharedLog.Subscription sub = inbox.subscribeTail(e -> {
process(e);
inbox.setValue("checkpoint", ByteBuffer.allocate(8).putLong(e.seqnum()).array()).join();
});The checkpoint seqnum is stored in the tag's per-actor KV store — durable in
FileBasedPersistenceAdapter and FoundationDBPersistenceAdapter, in-memory only in
InMemoryPersistenceAdapter.
See ActorCheckpointExample.java for a runnable version.
Executors are the stateless workers of the system. Each executor is associated
with one input LogTag. The ExecutorEngine drives a continuous cycle:
read all entries for inputTag
│
▼
fold: state = executor.initialState()
for each entry: state = executor.apply(state, entry)
│
▼
execute: List<AppendRequest> = executor.execute(state, context)
│
▼
append each AppendRequest to the shared log
│
▼
wait for next entry (blocking park on virtual thread)
│
└──▶ repeat with incremental state update
Because state is always derived from the log, executors survive crashes without any additional checkpointing. Restart the executor, replay the log, and it resumes exactly where it left off.
record OrderState(List<Order> pending, List<Order> fulfilled) {}
public class FulfilmentExecutor implements Executor<OrderState> {
@Override
public LogTag getInputTag() { return LogTag.of("orders"); }
@Override
public OrderState initialState() {
return new OrderState(List.of(), List.of());
}
@Override
public OrderState apply(OrderState state, LogEntry entry) {
var event = OrderEvent.parse(entry.data());
return switch (event.type()) {
case PLACED -> state.withPending(append(state.pending(), event.order()));
case FULFILLED -> state.moveToPending(event.orderId());
default -> state;
};
}
@Override
public List<AppendRequest> execute(OrderState state, ExecutorContext ctx) {
// Fulfil the first pending order, if any
return state.pending().stream()
.limit(1)
.map(o -> AppendRequest.to(
Set.of(LogTag.of("orders"), LogTag.of("orders", o.id())),
OrderEvent.fulfilled(o).toBytes()))
.toList();
}
}Register and start:
ExecutorEngine engine = new ExecutorEngine(service);
engine.register(new FulfilmentExecutor());
engine.start();A workflow executor chains two executors together so that the output of the first becomes the input of the second, advancing items through a multi-stage pipeline.
Stage 1 reads workflow.commands (entries like "submit:<id>") and emits
"processing:<id>" entries to workflow.events:
static class WorkflowSubmitExecutor implements Executor<List<String>> {
private final Set<String> emitted = ConcurrentHashMap.newKeySet();
@Override public LogTag getInputTag() { return LogTag.of("workflow.commands"); }
@Override public String getName() { return "WorkflowSubmit"; }
@Override public List<String> initialState() { return List.of(); }
@Override
public List<String> apply(List<String> pending, LogEntry entry) {
String msg = new String(entry.data());
if (msg.startsWith("submit:")) {
String id = msg.substring("submit:".length());
List<String> next = new ArrayList<>(pending);
next.add(id);
return next;
}
return pending;
}
@Override
public List<AppendRequest> execute(List<String> pending, ExecutorContext ctx) {
return pending.stream()
.filter(emitted::add) // idempotency: only emit each id once
.map(id -> AppendRequest.to(
LogTag.of("workflow.events"),
("processing:" + id).getBytes()))
.toList();
}
}Stage 2 reads workflow.events, picks up items in processing state, and
emits "complete:<id>" back to the same tag:
static class WorkflowProcessExecutor implements Executor<List<String>> {
private final Set<String> emitted = ConcurrentHashMap.newKeySet();
@Override public LogTag getInputTag() { return LogTag.of("workflow.events"); }
@Override public String getName() { return "WorkflowProcess"; }
@Override public List<String> initialState() { return List.of(); }
@Override
public List<String> apply(List<String> processing, LogEntry entry) {
String msg = new String(entry.data());
if (msg.startsWith("processing:")) {
String id = msg.substring("processing:".length());
List<String> next = new ArrayList<>(processing);
next.add(id);
return next;
}
return processing;
}
@Override
public List<AppendRequest> execute(List<String> processing, ExecutorContext ctx) {
return processing.stream()
.filter(emitted::add)
.map(id -> AppendRequest.to(
LogTag.of("workflow.events"),
("complete:" + id).getBytes()))
.toList();
}
}Register both executors with a single engine — they run concurrently on separate virtual threads:
ExecutorEngine engine = new ExecutorEngine(service);
engine.register(new WorkflowSubmitExecutor());
engine.register(new WorkflowProcessExecutor());
engine.start();
// Trigger the pipeline
service.append(AppendRequest.to(LogTag.of("workflow.commands"), "submit:order-99".getBytes())).join();
// → WorkflowSubmitExecutor emits "processing:order-99" to workflow.events
// → WorkflowProcessExecutor picks it up and emits "complete:order-99"Executors can read and write other tags via ExecutorContext.openView(tag):
@Override
public List<AppendRequest> execute(OrderState state, ExecutorContext ctx) {
LogView auditLog = ctx.openView(LogTag.of("audit"));
auditLog.append(("processed " + state.pending().size() + " orders").getBytes()).join();
return List.of();
}The ExecutorEngine spawns one Thread.ofVirtual() per registered executor. Each
runner parks on LinkedBlockingQueue.poll(500ms, MILLISECONDS) while waiting for new
entries — a blocking call that costs nothing on a virtual thread because the JVM
unmounts it from its carrier thread while parked.
// From ExecutorEngine — each runner is one virtual thread
Thread.ofVirtual()
.name("executor-" + executor.getName())
.start(runner::run);Subscription delivery (both backlog replay and live notifications) also uses
Thread.ofVirtual(), so slow consumers never block the write path or each other.
The default async thread pool in SharedLogConfig is:
Executors.newVirtualThreadPerTaskExecutor()All CompletableFuture-returning methods in SharedLogService dispatch onto this
pool, meaning append().join() inside an executor's execute() method is safe and
does not risk deadlock or thread starvation.
Runnable end-to-end examples live in
src/test/java/com/cajunsystems/gumbo/examples/.
They are compiled and executed as part of mvn verify, so they always stay in
sync with the library.
| Example | What it demonstrates |
|---|---|
QuickStartExample.java |
Append, read, readFromPosition, subscribe, multi-tag entries, LogView |
OrderFulfilmentExample.java |
Stateless executor: backlog replay on startup, then incremental processing |
FilePersistedExample.java |
File-backed WAL: durability, crash recovery, and sequencer reseeding across restarts |
WorkflowExecutorExample.java |
Multi-stage workflow: chained executors advance items through SUBMITTED → PROCESSING → COMPLETE states |
ActorCheckpointExample.java |
Actor checkpoint pattern: resume from KV checkpoint, replay backlog, switch to live via subscribeTail() |
Run a single example directly:
mvn test -Dtest=QuickStartExample
mvn test -Dtest=OrderFulfilmentExample
mvn test -Dtest=FilePersistedExample
mvn test -Dtest=WorkflowExecutorExampleRequires Java 21 and Maven 3.9+.
mvn verify # compile + test
mvn package # produce gumbo-0.2.0.jarGitHub Actions runs mvn verify on every push using Java 21 (Temurin).
Test reports are uploaded as workflow artifacts.
- Boki Comparison — How Gumbo relates to and differs from Boki
- Boki paper — SOSP 2021
- Boki source code
- Halfmoon — SOSP 2023, extends Boki with cross-log transactions
- Persistence Evolution — Roadmap for persistence optimizations