Declarative event transformation and routing platform built on Apache Kafka. Define field-level mappings between source and destination topics through a visual mapper UI, CLI, or REST API. The platform reads events, applies transforms, and writes results. No custom consumer code needed.
Includes a React-based visual field mapper with drag-and-drop connections, a Picocli command-line interface, REST and gRPC ingestion, pipeline versioning with active-passive deployment, and Kafka Streams processing with dead letter handling.
-
Configuration over code. Transforms are defined declaratively as field mappings, not as compiled Java consumers. Adding a new transformation should never require a code change or redeployment.
-
Events are immutable. Once an event enters the platform, it is never modified in place. Transforms produce new events on destination topics. The original event stays untouched on the source topic.
-
Services are stateless. Each service can be restarted, scaled, or replaced without data loss. All state lives in Kafka (events) or PostgreSQL (pipeline definitions). No in-memory state that cannot be recovered.
-
Fail-safe by default. If a transform fails, the event goes to a dead letter topic with error details. Processing continues for other events. No single failure stops the pipeline.
-
Hot reload. Pipeline definitions can be created, updated, or deleted at runtime through the admin API. The engine picks up changes without restart.
-
Language-agnostic boundaries. The ingest API accepts standard JSON over HTTP. Any system in any language can produce or consume events. The platform does not impose client libraries or SDK requirements.
-
Each service owns its responsibility. Ingest only ingests. Engine only transforms. Admin only manages configuration. No service does two jobs.
| Decision | Choice | Reasoning |
|---|---|---|
| Messaging | Apache Kafka with KRaft | Industry standard for event streaming. KRaft removes Zookeeper dependency. |
| Stream processing | Spring Kafka consumer/producer | Simpler than Kafka Streams for per-pipeline dynamic topology. Pipelines are loaded at runtime, not compiled into a fixed topology. |
| Pipeline storage | PostgreSQL with JSONB | Field mappings are flexible JSON structures. JSONB allows indexing and querying without rigid schemas. |
| Event format | JSON | Universal, human-readable, supported by every language. No schema registry required for v1. |
| Transform model | Field-level source-to-destination mapping | Maps directly to the visual mapper UI planned for later. Each mapping is an independent unit that can be tested, displayed, and modified individually. |
| Module structure | Maven multi-module | Each service is independently deployable but shares the common library. Single build command for the full platform. |
| Error handling | Dead letter topic | Failed events are never dropped. They can be inspected and retried through the admin API. |
| Testing | Mock Kafka for unit tests, real Kafka for E2E | Unit tests run without infrastructure. Docker Compose provides real Kafka for integration and manual testing. |
Producer (any language, any system)
|
| HTTP POST /api/events
v
[event-ingest] ---- validates event ---- assigns ID + timestamp
|
| publishes to Kafka
v
[events.raw] (Kafka topic)
|
| event-engine consumes
v
[event-engine]
|
| 1. loads pipeline definitions from event-admin
| 2. matches event type to pipeline source topic
| 3. applies field mappings (rename, convert, mask, etc.)
| 4. produces transformed event to destination topic
|
v
[destination topic] (e.g. analytics.orders, warehouse.fulfillment)
|
| any consumer picks it up
v
Consumer (any language, any system)
On failure:
[event-engine] --> [events.failed] (dead letter topic)
Event storage (runs in parallel):
[all Kafka topics]
|
| event-store consumes every topic
v
[event-store]
|
| deserializes, deduplicates by ID, persists
v
[PostgreSQL] (stored_events table, JSONB payload)
|
| event-search reads
v
[event-search API]
|
| GET /api/events?type=order.created&source=test&from=...&to=...
v
Query results (paginated, filterable by type, source, status, time range)
{
"id": "evt_8f14e45f-ceea-4d9b-8b56-34a1c4b5c5a2",
"type": "order.created",
"source": "order-service",
"timestamp": "2026-04-03T10:30:00Z",
"payload": {
"orderId": 5001,
"customerId": 42,
"total": "129.99",
"currency": "EUR",
"items": [
{ "sku": "AUD-001", "name": "Headphones", "qty": 1 }
],
"address": {
"street": "Mannerheimintie 1",
"city": "Helsinki",
"country": "fi"
}
}
}| Field | Type | Required | Description |
|---|---|---|---|
id |
UUID | Auto-generated | Unique event identifier |
type |
String | Yes | Dot-notation event type |
source |
String | Yes | Originating system |
timestamp |
ISO 8601 | Auto-generated | When the event was received |
payload |
JSON object | Yes | Event data, any structure |
metadata |
JSON object | No | Correlation IDs, versioning, tracing |
A pipeline defines how events transform from a source topic to a destination topic through field-level mappings.
{
"name": "orders-to-warehouse",
"description": "Map order events to warehouse fulfillment format",
"sourceTopic": "events.order.created",
"destinationTopic": "warehouse.fulfillment",
"enabled": true,
"fieldMappings": [
{ "sourceField": "orderId", "destinationField": "fulfillmentId" },
{ "sourceField": "total", "destinationField": "amount", "conversion": "TO_DOUBLE" },
{ "sourceField": "address.city", "destinationField": "shipping.city" },
{ "sourceField": "address.country", "destinationField": "shipping.countryCode", "conversion": "TO_UPPER" },
{ "sourceField": "debug", "destinationField": "debug", "excluded": true }
],
"errorHandling": {
"retries": 3,
"backoffMs": 1000,
"deadLetterTopic": "events.failed.warehouse"
}
}Each field mapping specifies:
| Property | Description |
|---|---|
sourceField |
JSON path in the source event (supports dot notation for nested fields) |
destinationField |
JSON path in the output event (creates nested structure automatically) |
conversion |
Optional type conversion to apply |
defaultValue |
Value to use if source field is missing. Supports ${now} and ${uuid}. |
excluded |
If true, field is explicitly dropped from output |
| Conversion | From | To | Example |
|---|---|---|---|
TO_STRING |
any | string | 123 to "123" |
TO_INTEGER |
string/number | integer | "42" to 42 |
TO_LONG |
string/number | long | "9999999999" to 9999999999 |
TO_DOUBLE |
string/number | double | "49.99" to 49.99 |
TO_BOOLEAN |
string | boolean | "true" to true |
TO_TIMESTAMP |
string | ISO 8601 | Date string validated and normalized |
TO_UPPER |
string | string | "hello" to "HELLO" |
TO_LOWER |
string | string | "HELLO" to "hello" |
MASK |
string | string | "secret123" to "s*******3" |
FLATTEN |
object | fields | {a: {b: 1}} to {b: 1} |
event-processing/
├── pom.xml Maven aggregator
├── docker-compose.yml Kafka (KRaft), PostgreSQL, all services
├── start.sh Build, start, stop, test
├── Dockerfile Multi-stage build (shared across Java services)
├── event-common/ Shared library (Java, Maven)
├── event-ingest/ REST + gRPC event submission (Java, Maven)
├── event-engine/ Kafka Streams transform execution (Java, Maven)
├── event-admin/ Control center API (Java, Maven)
├── event-store/ Event persistence from Kafka to PostgreSQL (Java, Maven)
├── event-search/ Query API for stored events (Java, Maven)
├── event-detect/ Anomaly detection (Java, Maven)
├── event-cli/ Command-line interface (Java, Picocli)
└── event-mapper-ui/ Visual field mapper (React, Vite)
| Module | Type | Port | Description |
|---|---|---|---|
| event-common | Library | n/a | Event model, field mapping model, type converters, serialization |
| event-ingest | Service | 8090, 9190 | Accepts events via REST and gRPC, validates, publishes to Kafka |
| event-engine | Service | n/a | Consumes from source topics, applies field mappings, produces to destination topics. One instance per pipeline. |
| event-admin | Service | 8091 | Pipeline CRUD, versioning, deployment status, dead letter inspection |
| event-store | Service | n/a | Kafka consumer, persists all events to PostgreSQL. Deduplicates by event ID. |
| event-search | Service | 8092 | REST API for querying stored events by type, source, status, time range |
| event-detect | Service | 8093 | Anomaly detection: statistical baselines, schema drift, embeddings, LLM analysis |
| event-cli | CLI | n/a | Command-line interface for managing the platform (Picocli) |
| event-mapper-ui | UI | 3070 | Visual field mapper with drag-and-drop, schema discovery, live preview |
| Method | Endpoint | Description |
|---|---|---|
| POST | /api/events |
Submit a single event |
| POST | /api/events/batch |
Submit multiple events (max batch size configurable) |
| GET | /api/health |
Service health |
gRPC service EventService available on port 9190 with SubmitEvent and SubmitBatch RPCs. Proto definition at event-ingest/src/main/proto/event_service.proto.
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/pipelines |
List all pipelines |
| GET | /api/pipelines/{name} |
Get pipeline by name |
| POST | /api/pipelines |
Create pipeline |
| PUT | /api/pipelines/{name} |
Update pipeline |
| DELETE | /api/pipelines/{name} |
Delete pipeline |
| POST | /api/pipelines/{name}/pause |
Pause pipeline |
| POST | /api/pipelines/{name}/resume |
Resume pipeline |
| GET | /api/status |
Platform health |
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/alerts |
List alerts (filter by detectorType, severity, eventType, resolved) |
| GET | /api/alerts/{id} |
Get alert details |
| POST | /api/alerts/{id}/resolve |
Mark alert as resolved |
| GET | /api/alerts/stats |
Alert counts (total, open, resolved) |
Swagger UI available at /swagger-ui.html on ingest, admin, search, and detect services.
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/events |
Search events with filters (type, source, status, from, to) |
| GET | /api/events/{id} |
Get event by ID |
| GET | /api/events/types |
List all event types |
| GET | /api/events/sources |
List all event sources |
| GET | /api/events/stats |
Event counts by type and source |
Command-line interface for managing the platform. Built with Picocli, packaged as a single executable jar.
# Build the CLI
./mvnw package -pl event-cli -q
# Run it
java -jar event-cli/target/event-cli-*.jar [command]# Platform status
ep status
# Pipeline management
ep pipelines list
ep pipelines get orders-to-warehouse
ep pipelines create --name my-pipeline --source events.raw --dest output.topic
ep pipelines deploy orders-to-warehouse --version 1
ep pipelines pause orders-to-warehouse
ep pipelines resume orders-to-warehouse
ep pipelines draft orders-to-warehouse
ep pipelines delete orders-to-warehouse
ep pipelines versions orders-to-warehouse
ep pipelines test orders-to-warehouse '{"orderId":1,"total":"49.99"}'
# Topic operations
ep topics list
ep topics schema events.raw
ep topics sample events.raw --count 5
# Event submission
ep events send --type order.created --source test --payload '{"orderId":1}'
ep events send --type order.created --source test --file event.jsonep --admin-url http://remote:8091 --ingest-url http://remote:8090 statusEach pipeline supports multiple versions. Only one version can be active at a time.
| State | Meaning |
|---|---|
DRAFT |
Being designed in the mapper UI. Not deployed, no engine running. |
ACTIVE |
Deployed. Engine instance running, processing events. |
PAUSED |
Engine stopped. Can be resumed with the same configuration. |
DEPLOYING |
New version being rolled out via active-passive switch. |
Creating or editing a pipeline in the mapper UI creates a DRAFT version. Deploying promotes it to ACTIVE.
Changing a live pipeline uses an active-passive pattern to avoid downtime or event loss.
- User opens an ACTIVE pipeline in the mapper UI
- UI warns: "This pipeline is currently processing events. Changes will be deployed as a new version."
- User edits field mappings, tests with sample data (working on a DRAFT version)
- User clicks Deploy
- System starts a new engine instance (passive) with the updated mappings
- Passive engine catches up (consumer lag reaches zero)
- Traffic switches to the new engine
- Old engine stops
- Old version is archived, new version becomes ACTIVE
Rollback: start the old version's engine instance. The archived config is preserved.
Docker handles the engine lifecycle. Each engine instance is a container running one pipeline version.
React application for building field mappings visually. Connects to the admin API.
Stack: React 19, Vite, @xyflow/react v12, Zustand, Tailwind CSS v4
Features:
- Connect a source Kafka topic. Schema is discovered automatically from sample events.
- Source fields displayed on the left, destination fields on the right.
- Draw connections between fields by dragging.
- Configure type conversion on each connection (click the line).
- Set defaults, formatting, exclusions per field.
- Live preview: real sample events flow through the mappings, output shown side by side.
- Test runner: validate mappings before deploying.
- Pipeline state indicator (DRAFT, ACTIVE, PAUSED, DEPLOYING).
- Warning when editing an active pipeline.
| Component | Technology | Version |
|---|---|---|
| Language | Java (OpenJDK Temurin) | 21 |
| Framework | Spring Boot | 3.5.0 |
| Messaging | Spring Kafka | managed |
| Streaming | Apache Kafka (Confluent, KRaft mode) | 7.7.1 |
| RPC | gRPC, Protobuf | 1.62.2 |
| Persistence | Spring Data JPA / Hibernate | 6.x |
| Database | PostgreSQL | 17 |
| JSON storage | PostgreSQL JSONB | n/a |
| Migrations | Flyway | managed |
| Validation | Jakarta Bean Validation | 3.x |
| CLI | Picocli | 4.7.6 |
| API docs | SpringDoc OpenAPI | 2.8.6 |
| Testing | JUnit 5, MockMvc | 5.12+ |
| Build | Maven (wrapper included) | 3.9+ |
| Component | Technology | Version |
|---|---|---|
| Framework | React | 19 |
| Build tool | Vite | latest |
| Visual editor | @xyflow/react | 12 |
| State management | Zustand | latest |
| Styling | Tailwind CSS | 4 |
| Component | Technology | Version |
|---|---|---|
| Containers | Docker, Docker Compose | 24.0+ |
| Service | Port | |
|---|---|---|
| Kafka (KRaft) | 9492 | Event streaming |
| PostgreSQL (admin) | 5877 | Pipeline definitions |
| PostgreSQL (store) | 5878 | Event storage |
| event-ingest (REST) | 8090 | Event submission |
| event-ingest (gRPC) | 9190 | Event submission |
| event-engine | n/a | Transform processing |
| event-admin | 8091 | Control center |
| event-store | n/a | Kafka consumer, persists events |
| event-search | 8092 | Event query API |
| event-detect | 8093 | Anomaly detection |
| event-mapper-ui | 3070 | Visual mapper |
git clone https://github.com/psandis/event-processing.git
cd event-processing
docker compose up --build -d # starts Kafka, PostgreSQL, ingest, admin
./mvnw package -pl event-cli -q # build the CLIUsing the CLI:
java -jar event-cli/target/event-cli-*.jar pipelines create \
--name orders-to-warehouse \
--source events.raw \
--dest warehouse.fulfillmentOr using curl:
curl -X POST http://localhost:8091/api/pipelines \
-H "Content-Type: application/json" \
-d '{
"name": "orders-to-warehouse",
"description": "Transform order events for warehouse fulfillment",
"sourceTopic": "events.raw",
"destinationTopic": "warehouse.fulfillment",
"fieldMappings": [
{ "sourceField": "orderId", "destinationField": "fulfillmentId" },
{ "sourceField": "total", "destinationField": "amount", "conversion": "TO_DOUBLE" },
{ "sourceField": "currency", "destinationField": "currency" },
{ "sourceField": "customer.city", "destinationField": "shipping.city" },
{ "sourceField": "customer.country", "destinationField": "shipping.countryCode", "conversion": "TO_UPPER" },
{ "sourceField": "debug", "destinationField": "debug", "excluded": true }
]
}'Each engine instance processes one pipeline. Start it with the pipeline name.
PIPELINE_NAME=orders-to-warehouse docker compose run -d event-engineThe engine fetches the pipeline definition from admin, builds a Kafka Streams topology, and starts consuming from the source topic.
Using the CLI:
java -jar event-cli/target/event-cli-*.jar events send \
--type order.created --source order-service \
--payload '{"orderId":5001,"total":"129.99","currency":"EUR","customer":{"city":"Helsinki","country":"fi"}}'Or using curl:
curl -X POST http://localhost:8090/api/events \
-H "Content-Type: application/json" \
-d '{
"type": "order.created",
"source": "order-service",
"payload": {
"orderId": 5001,
"total": "129.99",
"currency": "EUR",
"customer": { "city": "Helsinki", "country": "fi" },
"debug": true
}
}'docker exec event-processing-kafka-1 kafka-console-consumer \
--bootstrap-server localhost:29092 \
--topic warehouse.fulfillment \
--from-beginning --timeout-ms 5000The transformed output:
{
"id": "evt_...",
"type": "order.created",
"source": "order-service",
"status": "PROCESSED",
"payload": {
"fulfillmentId": 5001,
"amount": 129.99,
"currency": "EUR",
"shipping": {
"city": "Helsinki",
"countryCode": "FI"
}
}
}The debug field is excluded. total (string) became amount (double). customer.country ("fi") became shipping.countryCode ("FI"). Nested destination structure created automatically.
# Pause a pipeline (stops processing, engine keeps running)
curl -X POST http://localhost:8091/api/pipelines/orders-to-warehouse/pause
# Resume
curl -X POST http://localhost:8091/api/pipelines/orders-to-warehouse/resume
# Update pipeline (stop engine, update, start new engine)
docker stop <engine-container-id>
curl -X PUT http://localhost:8091/api/pipelines/orders-to-warehouse ...
PIPELINE_NAME=orders-to-warehouse docker compose run -d event-engine
# Delete pipeline
docker stop <engine-container-id>
curl -X DELETE http://localhost:8091/api/pipelines/orders-to-warehouse./start.sh test # 93 tests across all modules93 tests across all modules. No Kafka or Docker required to run them.
| Module | Tests | Coverage |
|---|---|---|
| event-common | 14 | Type converters (12 conversions), event serialization (2) |
| event-ingest | 15 | REST endpoints (7), gRPC submit and batch (5), Kafka send behavior (2), Struct conversion (1) |
| event-engine | 23 | Mapping executor (12 including flatten), schema discovery (4), transform topology with dead letter (3), pipeline loader (4) |
| event-admin | 10 | Pipeline CRUD with versioning, deploy, pause/resume, mapping test |
| event-store | 3 | Event consumption, deduplication, invalid JSON handling |
| event-search | 7 | Search with filters, get by ID, types, sources, stats, 404 handling |
| event-detect | 6 | Alert CRUD, resolve, schema drift detection logic |
| event-cli | 15 | Command parsing, help output, required options, custom URLs, defaults |
- Java 21 features used where appropriate (records, text blocks, pattern matching)
- Constructor injection, no field injection
finalon service dependencies- DTOs as Java records
- One responsibility per class
- Logging with SLF4J, structured messages with context (event ID, pipeline name)
- All public API endpoints documented with OpenAPI annotations
- Tests named by behavior, not method name
Event ingestion (REST + gRPC), pipeline definition storage, transform engine with Kafka Streams, dead letter handling. CLI with Picocli.
Pipeline versioning (DRAFT/ACTIVE/PAUSED/DEPLOYING). Visual field mapper UI (React, xyflow). Event store (Kafka to PostgreSQL). Event search API (query by type, source, status, time range).
Anomaly detection module with statistical baselines, schema drift detection, and AI embedding/LLM scaffolds. Alert API with CRUD and resolve. Verified end-to-end: events flow from ingest through Kafka to store, stored in PostgreSQL, queryable via search API, monitored by detect service. 93 tests across 8 Java modules.
AI-powered anomaly detection across event streams. Four detection layers, each independent, all feeding into a unified alert system.
1. Statistical baselines Track event volume per type and source over sliding time windows. Detect spikes, drops, and unusual patterns in throughput. No AI needed, pure math. Configurable thresholds and sensitivity.
Example: "order.created events from payment-service dropped 90% in the last 5 minutes"
2. Schema drift detection Compare incoming event structures against discovered schemas. Detect when fields appear, disappear, or change type. Uses the existing SchemaDiscovery engine.
Example: "order.created events from checkout-v2 now include a 'discount' field that was never seen before"
3. Content anomaly detection (vector embeddings) Generate vector embeddings of event payloads using an LLM API (Claude or OpenAI). Store embeddings in PostgreSQL with Pgvector. Compute cosine distance from the cluster center for each event type. Events far from the center are flagged as anomalous.
Architecture:
event-store (PostgreSQL)
|
| event-detect reads stored events
v
[event-detect]
|
| 1. batch events by type
| 2. generate embeddings via LLM API
| 3. store vectors in Pgvector
| 4. compute distance from centroid
| 5. flag outliers above threshold
|
v
[anomaly alerts] --> admin API --> mapper UI notifications
Example: "This order.created event has unusual field values. The 'total' of 999999.99 is 47x higher than the average for this event type."
4. LLM analysis For flagged anomalies, send the event payload and historical context to an LLM for natural language analysis. Get a human-readable explanation of what's unusual and potential root causes.
Example: "This event contains a negative stock quantity (-5) which has never occurred before. This may indicate a race condition in the inventory service."
event-detect/
├── detector/
│ ├── StatisticalDetector.java Volume baselines, spike/drop detection
│ ├── SchemaDriftDetector.java Field presence and type changes
│ ├── EmbeddingDetector.java Vector similarity with Pgvector
│ └── LlmAnalyzer.java Natural language anomaly explanation
├── embedding/
│ ├── EmbeddingService.java LLM API client for generating vectors
│ └── VectorStore.java Pgvector read/write operations
├── alert/
│ ├── AnomalyAlert.java Alert model
│ └── AlertService.java Stores and publishes alerts
└── config/
└── DetectProperties.java Thresholds, API keys, intervals
| Component | Technology |
|---|---|
| Vector storage | Pgvector (PostgreSQL extension) |
| Embeddings | Claude API or OpenAI API |
| Scheduling | Spring @Scheduled for periodic detection runs |
| Alerts | Kafka topic (events.anomalies) + REST API |
Statistical and schema drift detection run automatically with no setup. Embedding and LLM detection require an API key.
# application.yml or environment variables
detect:
statistical:
window-minutes: 5 # time window for volume analysis
spike-threshold: 3.0 # z-score above this triggers spike alert
drop-threshold: 0.3 # volume below this fraction of average triggers drop alert
minimum-events: 10 # ignore types with fewer events than this
check-interval-ms: 30000 # how often to check (30 seconds)
schema:
sample-size: 20 # number of recent events to analyze per type
check-interval-ms: 60000 # how often to check (60 seconds)
embedding:
enabled: false # set to true to enable AI detection
api-key: ${EMBEDDING_API_KEY}
model: claude-haiku-4-5-20251001
anomaly-threshold: 0.85 # cosine distance above this is anomalous
check-interval-ms: 300000 # how often to check (5 minutes)To enable AI-powered detection:
# Set environment variables before starting
export EMBEDDING_API_KEY=your-anthropic-api-key
export EMBEDDING_API_URL=https://api.anthropic.com
# Or pass via Docker
docker run -e EMBEDDING_API_KEY=... -e detect.embedding.enabled=true event-detect| Severity | Meaning |
|---|---|
HIGH |
Volume spike (z-score above threshold). Immediate attention needed. |
MEDIUM |
Volume drop or schema change. Investigate when possible. |
LOW |
Minor anomaly detected by embedding similarity. Informational. |
# List open alerts
curl http://localhost:8093/api/alerts?resolved=false
# Get alert details
curl http://localhost:8093/api/alerts/1
# Resolve an alert
curl -X POST http://localhost:8093/api/alerts/1/resolve
# Alert stats
curl http://localhost:8093/api/alerts/stats