Merged
Conversation
|
Important Review skippedToo many files! 150 files out of 300 files are above the max files limit of 150. You can disable this status message by setting the ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
e2c08ea to
3ceeb46
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Introduce a resumable pipeline runtime to the core workflow engine.
This PR adds a new makeResumablePipeline API and a matching runner (initAgnosticResumableRunner) that support pause / snapshot / resume semantics while keeping the existing makePipeline behaviour intact and fully compatible.
The existing non-resumable path remains the default; resumability is opt-in via the new API.
⸻
Motivation
Some workloads need to:
• pause mid-run (e.g. for human input, external approvals, or long-running tools),
• persist a typed snapshot of pipeline state,
• and resume later from the exact point of pause, without re-running completed stages or corrupting state.
Previously, “pause” was only expressible via ad-hoc early returns and bespoke plumbing in callers. There was no standard contract for:
• what a “pause” result looks like,
• what minimal state must be serialised,
• how to restart the runner at a specific stage,
• or how helpers/extensions should behave in the presence of pauses.
This PR formalises that contract.
⸻
What’s new
New types and factory:
// core API
type ResumablePipeline<
TRunOptions,
TRunResult,
TContext,
TReporter,
TState
function makeResumablePipeline<
TRunOptions,
TRunResult,
TContext,
TReporter,
TState
Pause contracts:
type PipelinePauseKind = 'user' | 'helper' | 'extension' | 'internal';
interface PipelinePauseSnapshot {
kind: PipelinePauseKind;
state: TState;
stageIndex: number;
resumeInputSchema?: unknown; // if caller needs structured resume input
}
interface PipelinePaused {
type: 'paused';
snapshot: PipelinePauseSnapshot;
}
Key points:
• run may now return either a final result or a PipelinePaused.
• resume consumes the snapshot and a caller-provided resumeInput and returns either:
• the final result, or
• another PipelinePaused (multi-step interactions are supported).
This entire flow is strongly typed around TState.
⸻
New runner entry points:
// New resumable runner
const runner = initAgnosticResumableRunner<TRunOptions, TRunResult, TContext, TReporter, TState>(...);
const resultOrPause = await runner.executeRunWithPause(runOptions);
// ...
const resumed = await runner.executeResume(snapshot, resumeInput);
Internally, resumable execution is driven by:
• runStagesIteratively(...) – explicit stage array + stageIndex
• prepareResumeContext(...) – reconstructs context from snapshot
• explicit handling of PipelinePaused results at each stage
The existing runner remains:
const runner = initAgnosticRunner(...);
const result = await runner.executeRun(options); // unchanged behaviour
So:
• Non-resumable pipelines still use a composed program (createAgnosticProgram).
• Resumable pipelines use iterative stepping with indexed stages.
⸻
Stages that previously only short-circuited on “halt/error” are now pause-aware.
isPaused is introduced and wired into:
• makeFinalizeFragmentsStage
• makeAfterFragmentsStage
• makeCommitStage
• makeFinalizeResultStage
• helper stage factory via makeHelperStageFactory
• lifecycle stage in program.ts
• standard pipeline runner’s makeFinalizeFragmentsStage
Effect: once a stage returns PipelinePaused, downstream stages do not mutate state. A pause is a first-class terminal outcome for that run, not “just another value”.
⸻
Error handling around helpers and extensions is tightened and better observable:
• Extension rollback failures
reporter.warn('Pipeline extension rollback failed.', {
error,
extensionId,
stageName,
// additional metadata…
});
options.onExtensionRollbackError?.(error, context);
The original extension failure remains the primary error; rollback failures are logged and surfaced via the optional hook.
Similar pattern for helper rollbacks:
reporter.warn('Helper rollback failed.', { error, helperId, ... });
The helper’s original failure still drives control flow; rollback errors are diagnostic.
This matters more in a resumable world where partial commits and staged rollbacks are more likely.
⸻
The resumable runner leans fully into sync + async symmetry via:
• MaybePromise
• isPromiseLike
• maybeThen
• maybeAll
Anywhere the runner touches extensions, helpers, or stages, the contract is now explicit: values can be sync or async; the runtime normalises both.
This is particularly important for resumable helpers which may:
• synchronously declare a pause (e.g. immediate user interaction),
• or asynchronously resolve it (e.g. await external tool / network).
⸻
Usage example (simplified)
const pipeline = makeResumablePipeline({
createState: () => ({ step: 0 }),
createRunResult: (state) => ({ done: true, state }),
stages: [
makeSomeStage(),
makeApprovalStage(),
makeFinalizeStage(),
],
// helpers, extensions, reporter, etc…
});
// Initial run
const resultOrPause = await pipeline.run({ input: '...' });
if (resultOrPause.type === 'paused') {
// Persist snapshot to DB / KV, return token to client, etc.
const { snapshot } = resultOrPause;
// Later, after user action…
const resumed = await pipeline.resume(snapshot, { approval: 'yes' });
// resumed is either final result or another pause
}
⸻
Tests & coverage
New / expanded tests include (names representative):
• makeResumablePipeline.test.ts
• makeResumablePipeline.coverage.test.ts
• execution.coverage.test.ts
• resume-context.test.ts
• runner-rollback.coverage.test.ts
• stage-factories.coverage.test.ts
• standard-pipeline.coverage.test.ts
They cover:
• sync vs async stages/helpers
• pause vs non-pause flows
• single and multi-step resume
• with and without createState / createRunResult
• empty vs non-empty stage arrays
• extension hooks + rollback (success and failure)
• helper rollback failures
• standard pipeline diagnostics:
• conflicts
• missing dependencies
• unused helpers
• IR vs builder kind mismatches with/without createError
Overall coverage after this PR (approx):
• Statements: 95.9%
• Branches: 87.5%
• Functions: 96.1%
• Lines: 96.0%
⸻
Backwards compatibility
• Existing makePipeline API and its callers are unchanged.
• Existing runners (initAgnosticRunner + executeRun) behave exactly as before.
• Resumable execution is opt-in via makeResumablePipeline / initAgnosticResumableRunner.
• The added isPaused checks are conservative and only affect flows where a stage already returned PipelinePaused.
⸻
Notes for reviewers
• Focus on:
• the shape of PipelinePauseSnapshot (does it capture the right minimal state?),
• the pause contract between run and resume,
• helper/extension behaviour when pause or rollback occurs,
• whether the standard pipeline’s use of isPaused is clear enough.
• Naming is up for discussion:
• PipelinePaused / PipelinePauseSnapshot / PipelinePauseKind
• runStagesIteratively / executeRunWithPause / executeResume
• If you have existing pipelines that might want pause/resume semantics, this is a good time to sanity-check whether they can be migrated without surprising side-effects.