Stateful, graph-based workflow engine for Laravel.
Build multi-step agent pipelines, human-in-the-loop processes, and parallel fan-out/fan-in tasks — all backed by your database and queue.
Inspired by LangGraph
- Installation
- Core Concepts
- Building a Workflow
- Running a Workflow
- State
- Human-in-the-Loop
- Node Contracts
- Built-in Nodes
- Prism Integration
- Laravel AI Integration
- Sub-graph Workflows
- Recursion Limit
- Events
- Configuration
- Testing
composer require cainy/laragraphPublish and run the migration:
php artisan vendor:publish --tag="laragraph-migrations"
php artisan migratePublish the config file:
php artisan vendor:publish --tag="laragraph-config"LaraGraph models a workflow as a directed graph of nodes connected by edges. Each run of that graph is a WorkflowRun — a database record that tracks the current state, status, and active node pointers.
| Term | Meaning |
|---|---|
| Node | A unit of work. Receives the current state, returns a mutation. |
| Edge | A directed connection between two nodes, optionally conditional. |
| State | A plain PHP array that accumulates mutations as nodes execute. |
| Pointer | Tracks which nodes are currently in-flight for a run. |
| WorkflowRun | The persisted record for a single execution of a workflow. |
Execution is fully queue-driven. Each node runs as an independent ExecuteNode job, so parallel branches execute concurrently across your worker pool.
Workflows are classes that extend Workflow and define their graph in a definition() method:
use Cainy\Laragraph\Builder\Workflow;
class MyPipeline extends Workflow
{
public function definition(): void
{
$this->addNode('fetch', FetchNode::class)
->addNode('transform', TransformNode::class)
->addNode('store', StoreNode::class)
->transition(Workflow::START, 'fetch')
->transition('fetch', 'transform')
->transition('transform', 'store')
->transition('store', Workflow::END);
}
}You can also call compile() directly on a Workflow instance if you prefer building inline, but the class-based approach is recommended since workflows are stored by class name.
A node is any class implementing Cainy\Laragraph\Contracts\Node:
use Cainy\Laragraph\Contracts\Node;
use Cainy\Laragraph\Engine\NodeExecutionContext;
class SummarizeNode implements Node
{
public function handle(NodeExecutionContext $context, array $state): array
{
$text = implode("\n", $state['paragraphs'] ?? []);
return ['summary' => substr($text, 0, 200)];
}
}handle() receives a typed NodeExecutionContext and the current full state. It returns an array of mutations — only the keys you want to change.
$context->runId // int — ID of the WorkflowRun
$context->workflowKey // string — class name of the workflow
$context->nodeName // string — name of this node in the graph
$context->attempt // int — current queue attempt (1-based)
$context->maxAttempts // int — maximum attempts configured
$context->createdAt // DateTimeImmutable
$context->isolatedPayload // ?array — payload injected by a Send (see Dynamic Fan-out)
// Helpers for Send-dispatched nodes:
$context->isSendExecution() // bool — true when dispatched via a Send
$context->payload('key', $default) // mixed — read a value from the isolated payload$this->transition(Workflow::START, 'fetch')
->transition('fetch', 'transform')
->transition('transform', Workflow::END);Workflow::START and Workflow::END are reserved entry and exit pseudo-nodes.
Nodes can be registered as class strings (resolved via the container) or as pre-built instances.
Pass a Closure as the third argument to ->transition():
->transition('classify', 'approve', fn(array $state) => $state['score'] > 50)
->transition('classify', 'reject', fn(array $state) => $state['score'] <= 50)A branch edge uses a resolver to return one or more target node names dynamically at runtime:
->branch('router', function(array $state): string {
return $state['approved'] ? 'publish' : 'revise';
}, targets: ['publish', 'revise'])The targets array is optional but recommended — it enables graph visualization without executing the resolver.
To execute multiple nodes in parallel from a single node, add multiple transitions from the same source:
$this->addNode('split', SplitNode::class)
->addNode('branch-a', BranchANode::class)
->addNode('branch-b', BranchBNode::class)
->addNode('merge', MergeNode::class)
->transition(Workflow::START, 'split')
->transition('split', 'branch-a')
->transition('split', 'branch-b')
->transition('branch-a', 'merge')
->transition('branch-b', 'merge')
->transition('merge', Workflow::END);branch-a and branch-b run as independent queue jobs. Use a BarrierNode as the merge node to wait for all branches before continuing.
To fan out over a dynamic list, return Send objects from a branch edge resolver:
use Cainy\Laragraph\Routing\Send;
->branch('planner', function(array $state): array {
return array_map(
fn(string $query) => new Send('worker', ['query' => $query]),
$state['queries']
);
}, targets: ['worker'])Each Send dispatches an independent ExecuteNode job. The target node receives the payload via $context->isolatedPayload or the helper methods:
public function handle(NodeExecutionContext $context, array $state): array
{
$query = $context->payload('query');
// ...
}The same fan-out is available via the SendNode prebuilt (see Built-in Nodes).
use Cainy\Laragraph\Facades\Laragraph;
$run = Laragraph::run(MyPipeline::class, initialState: [
'input' => 'Hello, world!',
]);
echo $run->id; // WorkflowRun ID
echo $run->status; // RunStatus::RunningPass an optional metadata array as the third argument to attach correlation data that travels with the run without being visible to nodes:
$run = Laragraph::run(MyPipeline::class,
initialState: ['input' => 'Hello'],
metadata: ['trace_id' => $traceId, 'user_id' => $userId],
);
$run->metadata; // ['trace_id' => ..., 'user_id' => ...]The run is created synchronously. Node jobs are dispatched to your queue immediately after.
// Pause a running workflow
Laragraph::pause($run->id);
// Resume a paused workflow, optionally merging additional state
Laragraph::resume($run->id, ['approved' => true]);
// Abort a workflow (sets status to Failed, clears all pointers)
Laragraph::abort($run->id);Override any of these methods on your Workflow subclass to react to run lifecycle events. Hook exceptions are swallowed and never affect engine state.
class MyPipeline extends Workflow
{
public function definition(): void { /* ... */ }
public function onStarting(WorkflowRun $run): void
{
Log::info("Run {$run->id} starting");
}
public function onCompleted(WorkflowRun $run): void
{
Cache::forget("pipeline:{$run->metadata['trace_id']}");
}
public function onFailed(WorkflowRun $run, Throwable $exception): void
{
report($exception);
}
}State is a plain PHP array that persists in the workflow_runs.state column. Every node receives the full current state and returns a mutation — a partial array of keys to update.
The reducer determines how mutations are merged into the existing state.
LaraGraph ships with three reducers:
| Class | Behaviour |
|---|---|
SmartReducer (default) |
List arrays are appended. Scalars and associative arrays are overwritten. |
MergeReducer |
Deep recursive merge for all keys. |
OverwriteReducer |
Shallow array_merge — always overwrites. |
SmartReducer is the right default for most agent workflows: message histories accumulate naturally, while scalar values like status or score simply overwrite.
Implement StateReducerInterface and bind it in your service provider, or attach it to a specific workflow:
// Globally
$this->app->bind(StateReducerInterface::class, MyReducer::class);
// Per workflow
$this->withReducer(MyReducer::class)LaraGraph has first-class support for pausing workflows and waiting for human input.
Pause the run before a node executes. On resume, the node runs normally.
$this->addNode('review', ReviewNode::class)
->interruptBefore('review');Pause the run after a node executes but before its outgoing edges are evaluated.
$this->addNode('drafter', DrafterNode::class)
->addNode('publish', PublishNode::class)
->transition(Workflow::START, 'drafter')
->transition('drafter', 'publish')
->transition('publish', Workflow::END)
->interruptAfter('drafter');Call Laragraph::resume() with any additional state to merge before the run continues:
Laragraph::resume($run->id, [
'meta' => ['approved' => true],
]);Any node can pause the run at runtime by throwing NodePausedException:
use Cainy\Laragraph\Exceptions\NodePausedException;
class ConfidenceCheckNode implements Node
{
public function handle(NodeExecutionContext $context, array $state): array
{
if ($state['confidence'] < 0.7) {
throw new NodePausedException($context->nodeName);
}
return ['status' => 'confident'];
}
}You can also pass state mutations to persist before pausing:
throw new NodePausedException(
nodeName: $context->nodeName,
stateMutation: ['gate_reason' => 'Score too low'],
);Nodes can implement optional contracts to declare capabilities to the engine.
Give a node a stable identifier used in edge routing and graph visualization:
use Cainy\Laragraph\Contracts\HasName;
class ResearchAgentNode implements Node, HasName
{
public function name(): string
{
return 'research-agent';
}
}Emit metadata alongside each node execution — useful for tracking token usage, model names, cost centers, or tenant IDs. Tags are automatically persisted to the workflow_node_executions table and broadcast on the NodeCompleted event:
use Cainy\Laragraph\Contracts\HasTags;
class LLMNode implements Node, HasTags
{
private string $model = '';
private int $tokens = 0;
public function handle(NodeExecutionContext $context, array $state): array
{
// ... call LLM, populate $this->model and $this->tokens ...
return ['response' => $result];
}
public function tags(): array
{
return [
'model' => $this->model,
'tokens' => $this->tokens,
'cost_usd' => $this->tokens * 0.000003,
];
}
}The engine calls tags() after handle() returns, so the node can accumulate values during execution and expose them at the end.
// All executions for a run
$run->nodeExecutions;
// Total cost for a run
$run->nodeExecutions->sum(fn($e) => $e->tags['cost_usd'] ?? 0);
// Per-node cost breakdown
$run->nodeExecutions
->groupBy('node_name')
->map(fn($execs) => $execs->sum(fn($e) => $e->tags['cost_usd'] ?? 0));NodeExecution columns: run_id, node_name, attempt, tags (JSON), executed_at.
Define per-node retry behaviour with exponential backoff and optional jitter:
use Cainy\Laragraph\Contracts\HasRetryPolicy;
use Cainy\Laragraph\Engine\RetryPolicy;
class FlakyAPINode implements Node, HasRetryPolicy
{
public function retryPolicy(): RetryPolicy
{
return new RetryPolicy(
initialInterval: 1.0,
backoffFactor: 2.0,
maxInterval: 30.0,
maxAttempts: 5,
jitter: true,
);
}
}Restrict retries to specific exception types:
new RetryPolicy(
maxAttempts: 3,
retryOn: [RateLimitException::class, TimeoutException::class],
)
// Or with a Closure for full control:
new RetryPolicy(
maxAttempts: 3,
retryOn: fn(Throwable $e) => $e->getCode() === 429,
)Route a node's job to a specific queue or connection:
use Cainy\Laragraph\Contracts\HasQueue;
class HeavyLLMNode implements Node, HasQueue
{
public function queue(): string
{
return 'llm';
}
public function connection(): ?string
{
return null; // use default connection
}
}Attach Laravel job middleware to a node's execution job:
use Cainy\Laragraph\Contracts\HasMiddleware;
use Illuminate\Queue\Middleware\RateLimited;
class AnthropicNode implements Node, HasMiddleware
{
public function middleware(): array
{
return [new RateLimited('anthropic')];
}
}Declare that a node should loop — driving tool execution cycles, polling, or any other repeated sub-task. The compiler automatically injects the loop edges at compile time.
use Cainy\Laragraph\Contracts\HasLoop;
class PollingNode implements Node, HasLoop
{
public function loopNode(string $nodeName): Node
{
return new CheckStatusNode();
}
public function loopCondition(): \Closure
{
return fn(array $state) => $state['status'] !== 'done';
}
}When compiled, the engine injects a {name}.__loop__ node and guards existing exit edges with the negated condition. Use Workflow::toolNode('name') to reference the synthetic loop node in interrupt points:
->interruptBefore(Workflow::toolNode('agent'))Mark a node as a fan-in barrier. The engine tracks how many workers were dispatched into this node and how many have committed their results. It serialises concurrent arrivals under a database lock, and only the final arrival — the one that sees all predecessors complete — runs handle(). All earlier arrivals skip cleanly.
use Cainy\Laragraph\Contracts\IsFanInBarrier;
class MyBarrierNode implements Node, IsFanInBarrier
{
public function handle(NodeExecutionContext $context, array $state): array
{
// Only called once — after every predecessor has committed.
return ['merged' => true];
}
}BarrierNode implements IsFanInBarrier out of the box. Implement it on any custom node that acts as a convergence point for parallel branches.
Pauses the workflow unconditionally until manually resumed. Use as a static approval gate.
use Cainy\Laragraph\Nodes\GateNode;
$this->addNode('approve', new GateNode(reason: 'Manager approval required'))
->transition('draft', 'approve')
->transition('approve', 'publish');When the gate triggers, state['gate_reason'] is set to the reason string. Resume via Laragraph::resume($runId).
Fan-out node — dispatches a Send for each item in a state list, sending each to the same target node with an isolated payload.
use Cainy\Laragraph\Nodes\SendNode;
$this->addNode('fanout', new SendNode(
sourceKey: 'queries',
targetNode: 'worker',
payloadKey: 'query',
))
->addNode('worker', WorkerNode::class)
->transition(Workflow::START, 'fanout')
->transition('fanout', 'worker');Inside WorkerNode, access the payload via $context->payload('query').
Fan-in barrier — waits for all parallel workers to complete before allowing the downstream edge to fire. Zero configuration required.
use Cainy\Laragraph\Nodes\BarrierNode;
->addNode('barrier', new BarrierNode())
->transition('worker', 'barrier')
->transition('barrier', 'aggregator')The engine automatically tracks how many workers were dispatched into the barrier and how many have committed their results. Early arrivals skip cleanly (removing their pointer to maintain equilibrium). Only the final arrival — when all predecessors are fully complete — runs handle() and evaluates the downstream edges. The node body itself is a no-op; all logic lives in the engine.
Works with both transition fan-out and Send-based fan-out, including multiple sequential barriers in the same workflow.
Makes an HTTP request and stores the response in state. The URL supports {state.key} interpolation.
use Cainy\Laragraph\Nodes\HttpNode;
->addNode('fetch', new HttpNode(
url: 'https://api.example.com/items/{state.item_id}',
method: 'GET',
headers: ['Authorization' => 'Bearer token'],
responseKey: 'api_response',
))The response is stored as ['status' => 200, 'body' => [...], 'ok' => true] under responseKey.
For POST/PUT/PATCH requests, set bodyKey to a state key whose value will be sent as the request body:
new HttpNode(url: '...', method: 'POST', bodyKey: 'payload', responseKey: 'result')Pauses execution for a given number of seconds, then continues.
use Cainy\Laragraph\Nodes\DelayNode;
->addNode('wait', new DelayNode(seconds: 300))On first execution the node stores a resume-after timestamp, dispatches a delayed queue job, and pauses. The job automatically calls Laragraph::resume() when the delay elapses — no scheduled command or polling required.
Reads from or writes to the Laravel cache. The cache key supports {state.key} interpolation.
use Cainy\Laragraph\Nodes\CacheNode;
->addNode('load', new CacheNode(operation: 'get', cacheKey: 'report:{state.user_id}', stateKey: 'cached_report'))
->addNode('store', new CacheNode(operation: 'put', cacheKey: 'report:{state.user_id}', stateKey: 'report', ttl: 3600))
->addNode('bust', new CacheNode(operation: 'forget', cacheKey: 'report:{state.user_id}', stateKey: 'report'))Dispatches a Laravel event with values from state as constructor arguments.
use Cainy\Laragraph\Nodes\NotifyNode;
->addNode('notify', new NotifyNode(
eventClass: ReportReady::class,
dataKeys: ['user_id', 'report_url'],
))LaraGraph ships with first-class support for Prism via the Cainy\Laragraph\Integrations\Prism namespace.
composer require prism-php/prismA concrete, configurable LLM node. No subclass needed for common use cases:
use Cainy\Laragraph\Integrations\Prism\PrismNode;
use Prism\Prism\Enums\Provider;
use Prism\Prism\Tool;
class MyPipeline extends Workflow
{
public function definition(): void
{
$this->addNode('agent', new PrismNode(
provider: Provider::Anthropic,
model: 'claude-sonnet-4-6',
systemPrompt: 'You are a helpful assistant.',
maxTokens: 1024,
tools: [
(new Tool)
->as('get_weather')
->for('Get weather for a city')
->withStringParameter('city', 'City name')
->using(fn(string $city): string => "Sunny, 22°C in {$city}"),
],
))
->transition(Workflow::START, 'agent')
->transition('agent', Workflow::END);
}
}PrismNode serializes Prism Message objects to/from plain arrays for state storage and returns the assistant's response appended to state['messages'].
Override getPrompt() or tools() for dynamic behaviour:
class ResearchAgent extends PrismNode
{
protected function getPrompt(array $state): string
{
return 'Research: ' . $state['topic'];
}
}Abstract base for nodes that manually execute tool calls from state['messages']. Implement toolMap() to return a map of tool names to callables:
use Cainy\Laragraph\Integrations\Prism\ToolNode;
class WeatherToolNode extends ToolNode
{
protected function toolMap(): array
{
return [
'get_weather' => fn(array $args): string =>
"Sunny, 22°C in " . ($args['city'] ?? 'unknown'),
];
}
}Tool results are appended to state['messages'] in Prism's tool_result format.
PrismNode implements HasLoop. When a node has tools, calling ->compile() automatically injects a tool execution loop:
START → agent ──(tool calls present)──→ agent.__loop__ → agent
──(no tool calls)──────→ END
To interrupt before tool execution runs:
->interruptBefore(Workflow::toolNode('agent'))For full control, skip HasLoop and wire edges explicitly:
$this->addNode('agent', MyAgentNode::class)
->addNode('tools', WeatherToolNode::class)
->transition(Workflow::START, 'agent')
->transition('agent', 'tools', fn($s) => ! empty($s['messages'][array_key_last($s['messages'])]['tool_calls'] ?? []))
->transition('agent', Workflow::END, fn($s) => empty($s['messages'][array_key_last($s['messages'])]['tool_calls'] ?? []))
->transition('tools', 'agent');LaraGraph integrates with Laravel AI via the AsGraphNode trait.
composer require laravel/aiAdd AsGraphNode to a standard Laravel AI agent to make it a Laragraph node:
use Cainy\Laragraph\Contracts\Node;
use Cainy\Laragraph\Integrations\LaravelAi\AsGraphNode;
use Laravel\Ai\Contracts\Agent;
use Laravel\Ai\Promptable;
class ResearchAgent implements Agent, Node
{
use AsGraphNode, Promptable;
public function instructions(): string
{
return 'You are a research assistant.';
}
protected function getAgentPrompt(): string
{
return 'Research: ' . ($this->state['topic'] ?? 'general');
}
}If your agent implements HasStructuredOutput, the trait maps structured response keys directly to state mutation keys:
use Laravel\Ai\Contracts\HasStructuredOutput;
use Illuminate\Contracts\JsonSchema\JsonSchema;
class ClassifierAgent implements Agent, Node, HasStructuredOutput
{
use AsGraphNode, Promptable;
public function instructions(): string
{
return 'Classify the input into a category and confidence score.';
}
public function schema(JsonSchema $schema): array
{
return [
'category' => $schema->string()->required(),
'confidence' => $schema->number()->min(0)->max(1)->required(),
];
}
}After execution, state['category'] and state['confidence'] are set directly.
Laravel AI agents implementing HasTools are automatically detected by the compiler — tool loop injection works exactly as with PrismNode:
use Laravel\Ai\Contracts\HasTools;
class WeatherAgent implements Agent, Node, HasTools
{
use AsGraphNode, Promptable;
public function tools(): array { return [new GetWeather]; }
}Any Workflow subclass implements Node and can be embedded inside another workflow. The sub-graph is identified by its class name — no snapshot serialization required.
class ResearchSubgraph extends Workflow
{
public function definition(): void
{
$this->addNode('search', SearchNode::class)
->addNode('extract', ExtractNode::class)
->transition(Workflow::START, 'search')
->transition('search', 'extract')
->transition('extract', Workflow::END);
}
}
class ParentPipeline extends Workflow
{
public function definition(): void
{
$this->addNode('research', ResearchSubgraph::class)
->addNode('write', WriteNode::class)
->transition(Workflow::START, 'research')
->transition('research', 'write')
->transition('write', Workflow::END);
}
}When the engine executes a sub-graph node:
- A child
WorkflowRunis created and linked viaparent_run_id/parent_node_name. - The child workflow starts normally — its nodes run as independent queue jobs.
- The parent run pauses at the sub-graph node.
- When the child completes, the engine resumes the parent automatically.
- The parent node returns the state delta from the child's final state as a mutation.
$run->parent; // ?WorkflowRun
$run->children; // Collection<WorkflowRun>The engine tracks total node executions per run and throws RecursionLimitExceeded if the limit is hit.
The default limit is config('laragraph.recursion_limit', 25). Override it per workflow:
class MyPipeline extends Workflow
{
public function definition(): void
{
$this->withRecursionLimit(100);
// ...
}
}LaraGraph fires events throughout the workflow lifecycle. All events implement ShouldBroadcast and are broadcast on the workflow channel when broadcasting is enabled.
| Event | Payload |
|---|---|
WorkflowStarted |
runId, workflowKey |
NodeExecuting |
runId, nodeName |
NodeCompleted |
runId, nodeName, mutation, tags |
NodeFailed |
runId, nodeName, exception |
WorkflowCompleted |
runId, workflowKey |
WorkflowFailed |
runId, exception, workflowKey |
WorkflowResumed |
runId, workflowKey |
Enable broadcasting in your .env:
LARAGRAPH_BROADCASTING_ENABLED=true
LARAGRAPH_CHANNEL_TYPE=private # public | private | presence
LARAGRAPH_CHANNEL_PREFIX=workflow.Each run broadcasts on channel {prefix}{runId} (e.g. workflow.42). Authorize the channel in routes/channels.php as needed.
// config/laragraph.php
return [
// Queue name for ExecuteNode jobs (overridden per-node via HasQueue)
'queue' => env('LARAGRAPH_QUEUE', 'default'),
// Queue connection (null = default connection)
'connection' => env('LARAGRAPH_QUEUE_CONNECTION'),
// Hold jobs until the wrapping transaction commits (enable if you call
// Laragraph::run() inside your own DB transactions)
'after_commit' => env('LARAGRAPH_AFTER_COMMIT', false),
// Default max attempts per node (overridden per-node via HasRetryPolicy)
'max_node_attempts' => 3,
// Default node timeout in seconds
'node_timeout' => 60,
// Maximum node executions per run before RecursionLimitExceeded is thrown
'recursion_limit' => 25,
// Prune completed/failed runs older than this many days
'prunable_after_days' => 30,
// Default retry backoff settings (overridden per-node via HasRetryPolicy)
'retry' => [
'initial_interval' => 0.5,
'backoff_factor' => 2.0,
'max_interval' => 128.0,
'jitter' => true,
],
'broadcasting' => [
'enabled' => env('LARAGRAPH_BROADCASTING_ENABLED', false),
'channel_type' => env('LARAGRAPH_CHANNEL_TYPE', 'private'),
'channel_prefix' => env('LARAGRAPH_CHANNEL_PREFIX', 'workflow.'),
],
];composer testLaraGraph works with the sync queue driver in tests — set QUEUE_CONNECTION=sync in your phpunit.xml and runs execute synchronously, making assertions straightforward:
use Cainy\Laragraph\Facades\Laragraph;
use Cainy\Laragraph\Enums\RunStatus;
it('completes the pipeline', function () {
$run = Laragraph::run(MyPipeline::class, ['input' => 'hello']);
expect($run->fresh())
->status->toBe(RunStatus::Completed)
->state->toHaveKey('output');
});For unit-testing individual nodes, use the makeContext() test helper:
use function Cainy\Laragraph\Tests\makeContext;
it('returns a summary mutation', function () {
$node = new SummarizeNode();
$mutation = $node->handle(
makeContext(nodeName: 'summarize'),
['text' => 'Long article...'],
);
expect($mutation)->toHaveKey('summary');
});The MIT License (MIT). Please see License File for more information.