Skip to content

Feat/resumable pipelines#396

Merged
pipewrk merged 7 commits intowpkernel:mainfrom
theGeekist:feat/resumable-pipelines
Dec 29, 2025
Merged

Feat/resumable pipelines#396
pipewrk merged 7 commits intowpkernel:mainfrom
theGeekist:feat/resumable-pipelines

Conversation

@pipewrk
Copy link
Contributor

@pipewrk pipewrk commented Dec 29, 2025

Introduce a resumable pipeline runtime to the core workflow engine.

This PR adds a new makeResumablePipeline API and a matching runner (initAgnosticResumableRunner) that support pause / snapshot / resume semantics while keeping the existing makePipeline behaviour intact and fully compatible.

The existing non-resumable path remains the default; resumability is opt-in via the new API.

Motivation

Some workloads need to:
• pause mid-run (e.g. for human input, external approvals, or long-running tools),
• persist a typed snapshot of pipeline state,
• and resume later from the exact point of pause, without re-running completed stages or corrupting state.

Previously, “pause” was only expressible via ad-hoc early returns and bespoke plumbing in callers. There was no standard contract for:
• what a “pause” result looks like,
• what minimal state must be serialised,
• how to restart the runner at a specific stage,
• or how helpers/extensions should behave in the presence of pauses.

This PR formalises that contract.

What’s new

  1. Resumable pipeline API
    New types and factory:

// core API
type ResumablePipeline<
TRunOptions,
TRunResult,
TContext,
TReporter,
TState

= {
run: (options: TRunOptions) => Promise<TRunResult | PipelinePaused>;
resume: (
snapshot: PipelinePauseSnapshot,
input: ResumeInput
) => Promise<TRunResult | PipelinePaused>;
};

function makeResumablePipeline<
TRunOptions,
TRunResult,
TContext,
TReporter,
TState

(options: MakeResumablePipelineOptions<...>): ResumablePipeline<...>;

Pause contracts:

type PipelinePauseKind = 'user' | 'helper' | 'extension' | 'internal';

interface PipelinePauseSnapshot {
kind: PipelinePauseKind;
state: TState;
stageIndex: number;
resumeInputSchema?: unknown; // if caller needs structured resume input
}

interface PipelinePaused {
type: 'paused';
snapshot: PipelinePauseSnapshot;
}

Key points:
• run may now return either a final result or a PipelinePaused.
• resume consumes the snapshot and a caller-provided resumeInput and returns either:
• the final result, or
• another PipelinePaused (multi-step interactions are supported).

This entire flow is strongly typed around TState.

  1. Resumable runner
    New runner entry points:

// New resumable runner
const runner = initAgnosticResumableRunner<TRunOptions, TRunResult, TContext, TReporter, TState>(...);

const resultOrPause = await runner.executeRunWithPause(runOptions);
// ...
const resumed = await runner.executeResume(snapshot, resumeInput);

Internally, resumable execution is driven by:
• runStagesIteratively(...) – explicit stage array + stageIndex
• prepareResumeContext(...) – reconstructs context from snapshot
• explicit handling of PipelinePaused results at each stage

The existing runner remains:

const runner = initAgnosticRunner(...);
const result = await runner.executeRun(options); // unchanged behaviour

So:
• Non-resumable pipelines still use a composed program (createAgnosticProgram).
• Resumable pipelines use iterative stepping with indexed stages.

  1. Pause awareness in stages
    Stages that previously only short-circuited on “halt/error” are now pause-aware.

isPaused is introduced and wired into:
• makeFinalizeFragmentsStage
• makeAfterFragmentsStage
• makeCommitStage
• makeFinalizeResultStage
• helper stage factory via makeHelperStageFactory
• lifecycle stage in program.ts
• standard pipeline runner’s makeFinalizeFragmentsStage

Effect: once a stage returns PipelinePaused, downstream stages do not mutate state. A pause is a first-class terminal outcome for that run, not “just another value”.

  1. Helper & extension rollback semantics
    Error handling around helpers and extensions is tightened and better observable:
    • Extension rollback failures

reporter.warn('Pipeline extension rollback failed.', {
error,
extensionId,
stageName,
// additional metadata…
});

options.onExtensionRollbackError?.(error, context);

The original extension failure remains the primary error; rollback failures are logged and surfaced via the optional hook.

•	Helper rollback failures

Similar pattern for helper rollbacks:

reporter.warn('Helper rollback failed.', { error, helperId, ... });

The helper’s original failure still drives control flow; rollback errors are diagnostic.

This matters more in a resumable world where partial commits and staged rollbacks are more likely.

  1. MaybePromise-driven execution
    The resumable runner leans fully into sync + async symmetry via:
    • MaybePromise
    • isPromiseLike
    • maybeThen
    • maybeAll

Anywhere the runner touches extensions, helpers, or stages, the contract is now explicit: values can be sync or async; the runtime normalises both.

This is particularly important for resumable helpers which may:
• synchronously declare a pause (e.g. immediate user interaction),
• or asynchronously resolve it (e.g. await external tool / network).

Usage example (simplified)

const pipeline = makeResumablePipeline({
createState: () => ({ step: 0 }),
createRunResult: (state) => ({ done: true, state }),
stages: [
makeSomeStage(),
makeApprovalStage(),
makeFinalizeStage(),
],
// helpers, extensions, reporter, etc…
});

// Initial run
const resultOrPause = await pipeline.run({ input: '...' });

if (resultOrPause.type === 'paused') {
// Persist snapshot to DB / KV, return token to client, etc.
const { snapshot } = resultOrPause;

// Later, after user action…
const resumed = await pipeline.resume(snapshot, { approval: 'yes' });

// resumed is either final result or another pause
}

Tests & coverage

New / expanded tests include (names representative):
• makeResumablePipeline.test.ts
• makeResumablePipeline.coverage.test.ts
• execution.coverage.test.ts
• resume-context.test.ts
• runner-rollback.coverage.test.ts
• stage-factories.coverage.test.ts
• standard-pipeline.coverage.test.ts

They cover:
• sync vs async stages/helpers
• pause vs non-pause flows
• single and multi-step resume
• with and without createState / createRunResult
• empty vs non-empty stage arrays
• extension hooks + rollback (success and failure)
• helper rollback failures
• standard pipeline diagnostics:
• conflicts
• missing dependencies
• unused helpers
• IR vs builder kind mismatches with/without createError

Overall coverage after this PR (approx):
• Statements: 95.9%
• Branches: 87.5%
• Functions: 96.1%
• Lines: 96.0%

Backwards compatibility
• Existing makePipeline API and its callers are unchanged.
• Existing runners (initAgnosticRunner + executeRun) behave exactly as before.
• Resumable execution is opt-in via makeResumablePipeline / initAgnosticResumableRunner.
• The added isPaused checks are conservative and only affect flows where a stage already returned PipelinePaused.

Notes for reviewers
• Focus on:
• the shape of PipelinePauseSnapshot (does it capture the right minimal state?),
• the pause contract between run and resume,
• helper/extension behaviour when pause or rollback occurs,
• whether the standard pipeline’s use of isPaused is clear enough.
• Naming is up for discussion:
• PipelinePaused / PipelinePauseSnapshot / PipelinePauseKind
• runStagesIteratively / executeRunWithPause / executeResume
• If you have existing pipelines that might want pause/resume semantics, this is a good time to sanity-check whether they can be migrated without surprising side-effects.

@coderabbitai
Copy link

coderabbitai bot commented Dec 29, 2025

Important

Review skipped

Too many files!

150 files out of 300 files are above the max files limit of 150.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@pipewrk pipewrk added the codex label Dec 29, 2025
@codecov-commenter
Copy link

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

@pipewrk pipewrk force-pushed the feat/resumable-pipelines branch from e2c08ea to 3ceeb46 Compare December 29, 2025 07:32
@pipewrk pipewrk merged commit 4735df7 into wpkernel:main Dec 29, 2025
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants