Runtime
The FlowRuntime is the engine that executes workflows.
FlowRuntime Class
constructor(container, options?) or constructor(options)
Creates a new runtime instance using a Dependency Injection (DI) container or legacy options for backward compatibility.
DI Constructor (Recommended)
containerDIContainer: A pre-configured dependency injection container that provides all runtime services.options?RuntimeOptions<TDependencies>: Optional legacy configuration (for backward compatibility).
Legacy Constructor (Backward Compatible)
optionsRuntimeOptions<TDependencies>: Configuration for the runtime.registry?: A record of globally available node implementations.blueprints?: A record of all available blueprints, required for subflow execution.dependencies?: Shared dependencies to be injected into every node's context.logger?: A pluggable logger instance (defaults toNullLogger).eventBus?: A pluggable event bus for observability. See Event Bus for details on available events.evaluator?: A pluggable expression evaluator (defaults toPropertyEvaluator).middleware?: An array of middleware to wrap node execution.serializer?: A pluggable serializer (defaults toJsonSerializer).strict?: Iftrue, the runtime will throw an error if a workflow contains cycles.
Note: The legacy constructor is maintained for backward compatibility. For new code, use the DI container approach for better modularity and testability.
Event Bus
The runtime emits structured events through the IEventBus interface for observability and debugging. These events provide detailed information about workflow execution, including data flow, decision logic, and error conditions.
FlowcraftEvent Type
All events follow this structured format:
export type FlowcraftEvent =
| { type: 'workflow:start'; payload: { blueprintId: string; executionId: string } }
| {
type: 'workflow:finish'
payload: {
blueprintId: string
executionId: string
status: string
errors?: WorkflowError[]
}
}
| {
type: 'workflow:stall'
payload: { blueprintId: string; executionId: string; remainingNodes: number }
}
| { type: 'workflow:pause'; payload: { blueprintId: string; executionId: string } }
| { type: 'workflow:resume'; payload: { blueprintId: string; executionId: string } }
| {
type: 'node:start'
payload: { nodeId: string; executionId: string; input: any; blueprintId: string }
}
| {
type: 'node:finish'
payload: {
nodeId: string
result: NodeResult
executionId: string
blueprintId: string
}
}
| {
type: 'node:error'
payload: {
nodeId: string
error: FlowcraftError
executionId: string
blueprintId: string
}
}
| {
type: 'node:fallback'
payload: { nodeId: string; executionId: string; fallback: string; blueprintId: string }
}
| {
type: 'node:retry'
payload: { nodeId: string; attempt: number; executionId: string; blueprintId: string }
}
| {
type: 'node:skipped'
payload: {
nodeId: string
edge: EdgeDefinition
executionId: string
blueprintId: string
}
}
| {
type: 'edge:evaluate'
payload: { source: string; target: string; condition?: string; result: boolean }
}
| {
type: 'context:change'
payload: { sourceNode: string; key: string; op: 'set' | 'delete'; value?: any }
}
| {
type: 'batch:start'
payload: { batchId: string; scatterNodeId: string; workerNodeIds: string[] }
}
| { type: 'batch:finish'; payload: { batchId: string; gatherNodeId: string; results: any[] } }
| {
type: 'job:enqueued'
payload: { runId: string; blueprintId: string; nodeId: string; queueName?: string }
}
| {
type: 'job:processed'
payload: { runId: string; blueprintId: string; nodeId: string; result: NodeResult }
}
| {
type: 'job:failed'
payload: { runId: string; blueprintId: string; nodeId: string; error: FlowcraftError }
}IEventBus Interface
export interface IEventBus {
emit: (event: FlowcraftEvent) => void | Promise<void>
}Event Descriptions
workflow:start: Emitted when a workflow execution begins.workflow:finish: Emitted when a workflow completes, fails, or is cancelled.workflow:stall: Emitted when a workflow cannot proceed (e.g., due to unresolved dependencies).workflow:pause: Emitted when a workflow is paused (e.g., due to cancellation or stalling).workflow:resume: Emitted when a workflow resumes execution.node:start: Emitted when a node begins execution, including the resolved input.node:finish: Emitted when a node completes successfully.node:error: Emitted when a node fails.node:fallback: Emitted when a fallback node is executed.node:retry: Emitted when a node execution is retried.node:skipped: Emitted when a conditional edge is not taken.edge:evaluate: Emitted when an edge condition is evaluated, showing the condition and result.context:change: Emitted when data is written to or deleted from the workflow context.batch:start: Emitted when a batch operation begins.batch:finish: Emitted when a batch operation completes.job:enqueued: Emitted when a job is enqueued for distributed processing.job:processed: Emitted when a distributed job completes successfully.job:failed: Emitted when a distributed job fails.
.run(blueprint, initialState?, options?)
Executes a workflow using the DefaultOrchestrator, which can handle both standard and Human-in-the-Loop (HITL) workflows.
blueprintWorkflowBlueprint: The workflow to execute.initialState?Partial<TContext> | string: The initial state for the workflow's context. Can be an object or a serialized string.options?:functionRegistry?: AMapof node implementations, typically fromflow.getFunctionRegistry().strict?: Overrides the runtime's strict mode setting for this run.signal?: AnAbortSignalto gracefully cancel the workflow execution.concurrency?: Limits the number of nodes that can execute simultaneously.
- Returns:
Promise<WorkflowResult<TContext>>
.resume(blueprint, serializedContext, resumeData, options?)
Resumes an awaiting workflow from its pause point.
blueprintWorkflowBlueprint: The workflow blueprint.serializedContextstring: The serialized context from an awaiting workflow result.resumeData{ output?: any; action?: string }: Data to provide to the awaiting node.options?: Same as for.run().- Returns:
Promise<WorkflowResult<TContext>>
.replay(blueprint, events, executionId?)
Replays a workflow execution from a pre-recorded event history, reconstructing the final workflow state without re-executing node logic. This enables time-travel debugging and post-mortem analysis.
blueprintWorkflowBlueprint: The workflow blueprint.eventsFlowcraftEvent[]: The recorded event history for the execution.executionId?string: Optional execution ID to filter events (if events contain multiple executions).- Returns:
Promise<WorkflowResult<TContext>>
The replay system processes these event types to reconstruct state:
node:finish: Applies completed node outputs to contextcontext:change: Applies context modifications (including usercontext.set()andcontext.delete()calls)node:error: Records errors in the workflow stateworkflow:finish: Marks workflow completion
Replay always produces a "completed" status since it reconstructs the final state without re-executing logic.
.startScheduler(checkIntervalMs?)
Starts the internal WorkflowScheduler that monitors awaiting workflows and automatically resumes them when their timers expire. Required for sleep nodes to function in in-memory workflows.
checkIntervalMs?number: How often (in ms) to check for expired timers. Defaults to1000.
const runtime = new FlowRuntime()
runtime.startScheduler()
// sleep nodes will now auto-resume.stopScheduler()
Stops the internal scheduler. Call this when shutting down to clean up the polling interval.
scheduler
The runtime's WorkflowScheduler instance. Use it to inspect active workflows and retrieve auto-resumed results.
WorkflowScheduler
Manages awaiting workflows that have timer-based pauses (sleep nodes). The scheduler polls at a configurable interval and calls runtime.resume() automatically when a workflow's timer expires.
.getActiveWorkflows()
Returns a list of currently awaiting workflows being tracked by the scheduler.
- Returns:
AwaitingWorkflow[]
.getResumeResult(executionId)
Retrieves the WorkflowResult from a workflow that was automatically resumed by the scheduler. Results are stored after each auto-resume and can be looked up by execution ID.
executionIdstring: The execution ID, available fromresult.context._executionIdafter the initialrun().- Returns:
WorkflowResult | undefined
const result = await flow.run(runtime)
// result.status === 'awaiting'
// ... scheduler auto-resumes when timer expires ...
const executionId = result.context._executionId as string
const resumed = runtime.scheduler.getResumeResult(executionId)
// resumed.status === 'completed'.executeNode(...)
A lower-level method to execute a single node within a workflow's state. This is primarily used internally by the GraphTraverser and BaseDistributedAdapter.
.determineNextNodes(blueprint, nodeId, result, context, executionId?)
Determines which nodes should run next based on the result of a completed node and the graph's structure.
blueprintWorkflowBlueprint: The workflow blueprint.nodeIdstring: The ID of the completed node.resultNodeResult: The result of the completed node.contextContextImplementation: The current context.executionId?string: Optional execution ID for observability events.- Returns:
Promise<{ node: NodeDefinition; edge: EdgeDefinition }[]>
.applyEdgeTransform(...)
Applies an edge's transform expression to the data flowing between two nodes.