diff --git a/apps/docs/content/docs/en/blocks/human-in-the-loop.mdx b/apps/docs/content/docs/en/blocks/human-in-the-loop.mdx index d4e4705e22b..04f210ad17b 100644 --- a/apps/docs/content/docs/en/blocks/human-in-the-loop.mdx +++ b/apps/docs/content/docs/en/blocks/human-in-the-loop.mdx @@ -78,7 +78,7 @@ Defines the fields approvers fill in when responding. This data becomes availabl } ``` -Access resume data in downstream blocks using ``. +Access resume data in downstream blocks using ``. ## Approval Methods @@ -93,11 +93,12 @@ Access resume data in downstream blocks using ``. ### REST API - Programmatically resume workflows using the resume endpoint. The `contextId` is available from the block's `resumeEndpoint` output or from the paused execution detail. + Programmatically resume workflows using the resume endpoint. The `contextId` is available from the block's `resumeEndpoint` output or from the `_resume` object in the paused execution response. ```bash POST /api/resume/{workflowId}/{executionId}/{contextId} Content-Type: application/json + X-API-Key: your-api-key { "input": { @@ -107,23 +108,44 @@ Access resume data in downstream blocks using ``. } ``` - The response includes a new `executionId` for the resumed execution: + The resume endpoint automatically respects the execution mode used in the original execute call: + + - **Sync mode** (default) — The response waits for the remaining workflow to complete and returns the full result: + + ```json + { + "success": true, + "status": "completed", + "executionId": "", + "output": { ... }, + "metadata": { "duration": 1234, "startTime": "...", "endTime": "..." } + } + ``` + + If the resumed workflow hits another HITL block, the response returns `"status": "paused"` with new `_resume` URLs in the output. + + - **Stream mode** (`stream: true` on the original execute call) — The resume response streams SSE events with `selectedOutputs` chunks, just like the initial execution. + + - **Async mode** (`X-Execution-Mode: async` on the original execute call) — The resume dispatches execution to a background worker and returns immediately with `202`: ```json { "status": "started", "executionId": "", - "message": "Resume execution started." + "message": "Resume execution started asynchronously." } ``` - To poll execution progress after resuming, connect to the SSE stream: + #### Polling execution status + + To check on a paused execution or poll for completion after an async resume: ```bash - GET /api/workflows/{workflowId}/executions/{resumeExecutionId}/stream + GET /api/resume/{workflowId}/{executionId} + X-API-Key: your-api-key ``` - Build custom approval UIs or integrate with existing systems. + Returns the full paused execution detail with all pause points, their statuses, and resume links. Returns `404` when the execution has completed and is no longer paused. ### Webhook @@ -132,6 +154,53 @@ Access resume data in downstream blocks using ``. +## API Execute Behavior + +When triggering a workflow via the execute API (`POST /api/workflows/{id}/execute`), HITL blocks cause the execution to pause and return the `_resume` data in the response: + + + + The response includes the full pause data with resume URLs: + + ```json + { + "success": true, + "executionId": "", + "output": { + "data": { + "operation": "human", + "_resume": { + "apiUrl": "/api/resume/{workflowId}/{executionId}/{contextId}", + "uiUrl": "/resume/{workflowId}/{executionId}", + "contextId": "", + "executionId": "", + "workflowId": "" + } + } + } + } + ``` + + + Blocks before the HITL stream their `selectedOutputs` normally. When execution pauses, the final SSE event includes `status: "paused"` and the `_resume` data: + + ``` + data: {"blockId":"agent1","chunk":"streamed content..."} + data: {"event":"final","data":{"success":true,"output":{...,"_resume":{...}},"status":"paused"}} + data: "[DONE]" + ``` + + On resume, blocks after the HITL stream their `selectedOutputs` the same way. + + + HITL blocks are automatically excluded from the `selectedOutputs` dropdown since their data is always included in the pause response. + + + + Returns `202` immediately. Use the polling endpoint to check when the execution pauses. + + + ## Common Use Cases **Content Approval** - Review AI-generated content before publishing @@ -161,9 +230,9 @@ Agent (Generate) → Human in the Loop (QA) → Gmail (Send) **`response`** - Display data shown to the approver (json) **`submission`** - Form submission data from the approver (json) **`submittedAt`** - ISO timestamp when the workflow was resumed -**`resumeInput.*`** - All fields defined in Resume Form become available after the workflow resumes +**``** - All fields defined in Resume Form become available at the top level after the workflow resumes -Access using ``. +Access using ``. ## Example @@ -187,7 +256,7 @@ Access using ``. **Downstream Usage:** ```javascript // Condition block - === true + === true ``` The example below shows an approval portal as seen by an approver after the workflow is paused. Approvers can review the data and provide inputs as a part of the workflow resumption. The approval portal can be accessed directly via the unique URL, ``. @@ -204,7 +273,7 @@ The example below shows an approval portal as seen by an approver after the work to reference specific fields from the resume form. For example, if your block ID is 'approval1' and the form has an 'approved' field, use ." }, + { question: "How do I access the approver's input in downstream blocks?", answer: "Use the syntax to reference specific fields from the resume form. For example, if your block name is 'approval1' and the form has an 'approved' field, use ." }, { question: "Can I chain multiple Human in the Loop blocks for multi-stage approvals?", answer: "Yes. You can place multiple Human in the Loop blocks in sequence to create multi-stage approval workflows. Each block pauses independently and can have its own notification configuration and resume form fields." }, { question: "Can I resume the workflow programmatically without the portal?", answer: "Yes. Each block exposes a resume API endpoint that you can call with a POST request containing the form data as JSON. This lets you build custom approval UIs or integrate with existing systems like Jira or ServiceNow." }, { question: "What outputs are available after the workflow resumes?", answer: "The block outputs include the approval portal URL, the resume API endpoint URL, the display data shown to the approver, the form submission data, the raw resume input, and an ISO timestamp of when the workflow was resumed." }, diff --git a/apps/sim/app/api/chat/[identifier]/route.test.ts b/apps/sim/app/api/chat/[identifier]/route.test.ts index 31d3a0bfde4..a754356c451 100644 --- a/apps/sim/app/api/chat/[identifier]/route.test.ts +++ b/apps/sim/app/api/chat/[identifier]/route.test.ts @@ -410,14 +410,7 @@ describe('Chat Identifier API Route', () => { expect(createStreamingResponse).toHaveBeenCalledWith( expect.objectContaining({ - workflow: expect.objectContaining({ - id: 'workflow-id', - userId: 'user-id', - }), - input: expect.objectContaining({ - input: 'Hello world', - conversationId: 'conv-123', - }), + executeFn: expect.any(Function), streamConfig: expect.objectContaining({ isSecureMode: true, workflowTriggerType: 'chat', @@ -494,9 +487,9 @@ describe('Chat Identifier API Route', () => { expect(createStreamingResponse).toHaveBeenCalledWith( expect.objectContaining({ - input: expect.objectContaining({ - input: 'Hello world', - conversationId: 'test-conversation-123', + executeFn: expect.any(Function), + streamConfig: expect.objectContaining({ + workflowTriggerType: 'chat', }), }) ) @@ -510,9 +503,7 @@ describe('Chat Identifier API Route', () => { expect(createStreamingResponse).toHaveBeenCalledWith( expect.objectContaining({ - input: expect.objectContaining({ - input: 'Hello world', - }), + executeFn: expect.any(Function), }) ) }) diff --git a/apps/sim/app/api/chat/[identifier]/route.ts b/apps/sim/app/api/chat/[identifier]/route.ts index 3f6e14a41f7..de826d8c7e9 100644 --- a/apps/sim/app/api/chat/[identifier]/route.ts +++ b/apps/sim/app/api/chat/[identifier]/route.ts @@ -199,6 +199,7 @@ export async function POST( } const { createStreamingResponse } = await import('@/lib/workflows/streaming/streaming') + const { executeWorkflow } = await import('@/lib/workflows/executor/execute-workflow') const { SSE_HEADERS } = await import('@/lib/core/utils/sse') const workflowInput: any = { input, conversationId } @@ -252,15 +253,31 @@ export async function POST( const stream = await createStreamingResponse({ requestId, - workflow: workflowForExecution, - input: workflowInput, - executingUserId: workspaceOwnerId, streamConfig: { selectedOutputs, isSecureMode: true, workflowTriggerType: 'chat', }, executionId, + executeFn: async ({ onStream, onBlockComplete, abortSignal }) => + executeWorkflow( + workflowForExecution, + requestId, + workflowInput, + workspaceOwnerId, + { + enabled: true, + selectedOutputs, + isSecureMode: true, + workflowTriggerType: 'chat', + onStream, + onBlockComplete, + skipLoggingComplete: true, + abortSignal, + executionMode: 'stream', + }, + executionId + ), }) const streamResponse = new NextResponse(stream, { diff --git a/apps/sim/app/api/form/[identifier]/route.ts b/apps/sim/app/api/form/[identifier]/route.ts index f69ab9e1886..beee4876b03 100644 --- a/apps/sim/app/api/form/[identifier]/route.ts +++ b/apps/sim/app/api/form/[identifier]/route.ts @@ -9,6 +9,7 @@ import { generateRequestId } from '@/lib/core/utils/request' import { generateId } from '@/lib/core/utils/uuid' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' +import { executeWorkflow } from '@/lib/workflows/executor/execute-workflow' import { normalizeInputFormatValue } from '@/lib/workflows/input-format' import { createStreamingResponse } from '@/lib/workflows/streaming/streaming' import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers' @@ -216,18 +217,33 @@ export async function POST( ...formData, // Spread form fields at top level for convenience } - // Execute workflow using streaming (for consistency with chat) const stream = await createStreamingResponse({ requestId, - workflow: workflowForExecution, - input: workflowInput, - executingUserId: workspaceOwnerId, streamConfig: { selectedOutputs: [], isSecureMode: true, - workflowTriggerType: 'api', // Use 'api' type since form is similar + workflowTriggerType: 'api', }, executionId, + executeFn: async ({ onStream, onBlockComplete, abortSignal }) => + executeWorkflow( + workflowForExecution, + requestId, + workflowInput, + workspaceOwnerId, + { + enabled: true, + selectedOutputs: [], + isSecureMode: true, + workflowTriggerType: 'api', + onStream, + onBlockComplete, + skipLoggingComplete: true, + abortSignal, + executionMode: 'sync', + }, + executionId + ), }) // For forms, we don't stream back - we wait for completion and return success diff --git a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts index 1a75d8aa598..7a407480e3e 100644 --- a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts +++ b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts @@ -1,19 +1,43 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { AuthType } from '@/lib/auth/hybrid' +import { getJobQueue, shouldUseBullMQ } from '@/lib/core/async-jobs' +import { createBullMQJobData } from '@/lib/core/bullmq' import { generateRequestId } from '@/lib/core/utils/request' +import { SSE_HEADERS } from '@/lib/core/utils/sse' import { generateId } from '@/lib/core/utils/uuid' +import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch' import { setExecutionMeta } from '@/lib/execution/event-buffer' import { preprocessExecution } from '@/lib/execution/preprocessing' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' +import { createStreamingResponse } from '@/lib/workflows/streaming/streaming' import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' import { validateWorkflowAccess } from '@/app/api/workflows/middleware' +import type { ResumeExecutionPayload } from '@/background/resume-execution' +import { ExecutionSnapshot } from '@/executor/execution/snapshot' +import type { SerializedSnapshot } from '@/executor/types' const logger = createLogger('WorkflowResumeAPI') export const runtime = 'nodejs' export const dynamic = 'force-dynamic' +function getStoredSnapshotConfig(pausedExecution: { executionSnapshot: unknown }): { + executionMode?: 'sync' | 'stream' | 'async' + selectedOutputs?: string[] +} { + try { + const serialized = pausedExecution.executionSnapshot as SerializedSnapshot + const snapshot = ExecutionSnapshot.fromJSON(serialized.snapshot) + return { + executionMode: snapshot.metadata.executionMode, + selectedOutputs: snapshot.selectedOutputs, + } + } catch { + return {} + } +} + export async function POST( request: NextRequest, { @@ -24,7 +48,6 @@ export async function POST( ) { const { workflowId, executionId, contextId } = await params - // Allow resume from dashboard without requiring deployment const access = await validateWorkflowAccess(request, workflowId, false) if (access.error) { return NextResponse.json({ error: access.error.message }, { status: access.error.status }) @@ -74,12 +97,12 @@ export async function POST( const preprocessResult = await preprocessExecution({ workflowId, userId, - triggerType: 'manual', // Resume is a manual trigger + triggerType: 'manual', executionId: resumeExecutionId, requestId, - checkRateLimit: false, // Manual triggers bypass rate limits - checkDeployment: false, // Resuming existing execution, deployment already checked - skipUsageLimits: true, // Resume is continuation of authorized execution - don't recheck limits + checkRateLimit: false, + checkDeployment: false, + skipUsageLimits: true, useAuthenticatedUserAsActor: isPersonalApiKeyCaller, workspaceId: workflow.workspaceId || undefined, }) @@ -142,8 +165,35 @@ export async function POST( } const isApiCaller = access.auth?.authType === AuthType.API_KEY + const snapshotConfig = isApiCaller ? getStoredSnapshotConfig(enqueueResult.pausedExecution) : {} + const executionMode = isApiCaller ? (snapshotConfig.executionMode ?? 'sync') : undefined - if (isApiCaller) { + if (isApiCaller && executionMode === 'stream') { + const stream = await createStreamingResponse({ + requestId, + streamConfig: { + selectedOutputs: snapshotConfig.selectedOutputs, + timeoutMs: preprocessResult.executionTimeout?.sync, + }, + executionId: enqueueResult.resumeExecutionId, + executeFn: async ({ onStream, onBlockComplete, abortSignal }) => + PauseResumeManager.startResumeExecution({ + ...resumeArgs, + onStream, + onBlockComplete, + abortSignal, + }), + }) + + return new NextResponse(stream, { + headers: { + ...SSE_HEADERS, + 'X-Execution-Id': enqueueResult.resumeExecutionId, + }, + }) + } + + if (isApiCaller && executionMode !== 'async') { const result = await PauseResumeManager.startResumeExecution(resumeArgs) return NextResponse.json({ @@ -162,6 +212,62 @@ export async function POST( }) } + if (isApiCaller && executionMode === 'async') { + const resumePayload: ResumeExecutionPayload = { + resumeEntryId: enqueueResult.resumeEntryId, + resumeExecutionId: enqueueResult.resumeExecutionId, + pausedExecutionId: enqueueResult.pausedExecution.id, + contextId: enqueueResult.contextId, + resumeInput: enqueueResult.resumeInput, + userId: enqueueResult.userId, + workflowId, + parentExecutionId: executionId, + } + + try { + const useBullMQ = shouldUseBullMQ() + if (useBullMQ) { + await enqueueWorkspaceDispatch({ + id: enqueueResult.resumeExecutionId, + workspaceId: workflow.workspaceId, + lane: 'runtime', + queueName: 'resume-execution', + bullmqJobName: 'resume-execution', + bullmqPayload: createBullMQJobData(resumePayload, { + workflowId, + userId, + }), + metadata: { workflowId, userId }, + }) + } else { + const jobQueue = await getJobQueue() + const jobId = await jobQueue.enqueue('resume-execution', resumePayload, { + metadata: { workflowId, workspaceId: workflow.workspaceId, userId }, + }) + logger.info('Enqueued resume execution job', { + jobId, + resumeExecutionId: enqueueResult.resumeExecutionId, + }) + } + } catch (dispatchError) { + logger.error('Failed to dispatch async resume, falling back to in-process', { + error: dispatchError instanceof Error ? dispatchError.message : String(dispatchError), + }) + PauseResumeManager.startResumeExecution(resumeArgs).catch((error) => { + logger.error('Fallback resume execution also failed', { error }) + }) + } + + return NextResponse.json( + { + status: 'started', + executionId: enqueueResult.resumeExecutionId, + message: 'Resume execution started asynchronously.', + }, + { status: 202 } + ) + } + PauseResumeManager.startResumeExecution(resumeArgs).catch((error) => { logger.error('Failed to start resume execution', { workflowId, @@ -200,7 +306,6 @@ export async function GET( ) { const { workflowId, executionId, contextId } = await params - // Allow access without API key for browser-based UI (same as parent execution endpoint) const access = await validateWorkflowAccess(request, workflowId, false) if (access.error) { return NextResponse.json({ error: access.error.message }, { status: access.error.status }) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 86a4a722eb8..c715ca2a8bc 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -39,6 +39,7 @@ import { cleanupExecutionBase64Cache, hydrateUserFilesWithBase64, } from '@/lib/uploads/utils/user-file-base64.server' +import { executeWorkflow } from '@/lib/workflows/executor/execute-workflow' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events' import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence' @@ -213,6 +214,7 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise + executeWorkflow( + streamWorkflow, + requestId, + processedInput, + actorUserId, + { + enabled: true, + selectedOutputs: resolvedSelectedOutputs, + isSecureMode: false, + workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api', + onStream, + onBlockComplete, + skipLoggingComplete: true, + includeFileBase64, + base64MaxBytes, + abortSignal, + executionMode: 'stream', + }, + executionId + ), }) return new NextResponse(stream, { @@ -1310,6 +1333,7 @@ async function handleExecutePost( enforceCredentialAccess: useAuthenticatedUserAsActor, workflowStateOverride: effectiveWorkflowStateOverride, callChain, + executionMode: 'sync', } const sseExecutionVariables = cachedWorkflowData?.variables ?? workflow.variables ?? {} diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx index c126816e09d..213fe3d4199 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx @@ -38,6 +38,8 @@ const TagIcon: React.FC<{ ) +const EXCLUDED_OUTPUT_TYPES = new Set(['starter', 'start_trigger', 'human_in_the_loop'] as const) + /** * Props for the OutputSelect component */ @@ -121,7 +123,7 @@ export function OutputSelect({ if (blockArray.length === 0) return outputs blockArray.forEach((block: any) => { - if (block.type === 'starter' || !block?.id || !block?.type) return + if (EXCLUDED_OUTPUT_TYPES.has(block.type) || !block?.id || !block?.type) return const blockName = block.name && typeof block.name === 'string' diff --git a/apps/sim/background/resume-execution.ts b/apps/sim/background/resume-execution.ts new file mode 100644 index 00000000000..ce4488af7aa --- /dev/null +++ b/apps/sim/background/resume-execution.ts @@ -0,0 +1,75 @@ +import { createLogger } from '@sim/logger' +import { task } from '@trigger.dev/sdk' +import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' + +const logger = createLogger('TriggerResumeExecution') + +export type ResumeExecutionPayload = { + resumeEntryId: string + resumeExecutionId: string + pausedExecutionId: string + contextId: string + resumeInput: unknown + userId: string + workflowId: string + parentExecutionId: string +} + +export async function executeResumeJob(payload: ResumeExecutionPayload) { + const { resumeExecutionId, pausedExecutionId, contextId, workflowId, parentExecutionId } = payload + + logger.info('Starting background resume execution', { + resumeExecutionId, + pausedExecutionId, + contextId, + workflowId, + parentExecutionId, + }) + + try { + const pausedExecution = await PauseResumeManager.getPausedExecutionById(pausedExecutionId) + if (!pausedExecution) { + throw new Error(`Paused execution not found: ${pausedExecutionId}`) + } + + const result = await PauseResumeManager.startResumeExecution({ + resumeEntryId: payload.resumeEntryId, + resumeExecutionId: payload.resumeExecutionId, + pausedExecution, + contextId: payload.contextId, + resumeInput: payload.resumeInput, + userId: payload.userId, + }) + + logger.info('Background resume execution completed', { + resumeExecutionId, + workflowId, + success: result.success, + status: result.status, + }) + + return { + success: result.success, + workflowId, + executionId: resumeExecutionId, + parentExecutionId, + status: result.status, + } + } catch (error) { + logger.error('Background resume execution failed', { + resumeExecutionId, + workflowId, + error: error instanceof Error ? error.message : String(error), + }) + throw error + } +} + +export const resumeExecutionTask = task({ + id: 'resume-execution', + machine: 'medium-1x', + retry: { + maxAttempts: 1, + }, + run: executeResumeJob, +}) diff --git a/apps/sim/background/workflow-execution.ts b/apps/sim/background/workflow-execution.ts index 0990879ba69..794b12d0195 100644 --- a/apps/sim/background/workflow-execution.ts +++ b/apps/sim/background/workflow-execution.ts @@ -44,6 +44,7 @@ export type WorkflowExecutionPayload = { correlation?: AsyncExecutionCorrelation metadata?: Record callChain?: string[] + executionMode?: 'sync' | 'stream' | 'async' } /** @@ -112,6 +113,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { isClientSession: false, callChain: payload.callChain, correlation, + executionMode: payload.executionMode ?? 'async', } const snapshot = new ExecutionSnapshot( diff --git a/apps/sim/executor/execution/snapshot-serializer.ts b/apps/sim/executor/execution/snapshot-serializer.ts index 6674a88ab6e..052d4b284b3 100644 --- a/apps/sim/executor/execution/snapshot-serializer.ts +++ b/apps/sim/executor/execution/snapshot-serializer.ts @@ -112,6 +112,7 @@ export function serializePauseSnapshot( useDraftState, startTime: metadataFromContext?.startTime ?? new Date().toISOString(), isClientSession: metadataFromContext?.isClientSession, + executionMode: metadataFromContext?.executionMode, } const snapshot = new ExecutionSnapshot( diff --git a/apps/sim/executor/execution/types.ts b/apps/sim/executor/execution/types.ts index b343be9f977..3c5130d8220 100644 --- a/apps/sim/executor/execution/types.ts +++ b/apps/sim/executor/execution/types.ts @@ -36,6 +36,7 @@ export interface ExecutionMetadata { } callChain?: string[] correlation?: AsyncExecutionCorrelation + executionMode?: 'sync' | 'stream' | 'async' } export interface SerializableExecutionState { diff --git a/apps/sim/lib/core/async-jobs/backends/bullmq.ts b/apps/sim/lib/core/async-jobs/backends/bullmq.ts index a7bb4647ef4..f67aead4ce0 100644 --- a/apps/sim/lib/core/async-jobs/backends/bullmq.ts +++ b/apps/sim/lib/core/async-jobs/backends/bullmq.ts @@ -95,6 +95,11 @@ export class BullMQJobQueue implements JobQueueBackend { return toJob('schedule-execution', scheduleJob) } + const resumeJob = await getBullMQQueue('resume-execution').getJob(jobId) + if (resumeJob) { + return toJob('resume-execution', resumeJob) + } + return null } diff --git a/apps/sim/lib/core/async-jobs/backends/trigger-dev.ts b/apps/sim/lib/core/async-jobs/backends/trigger-dev.ts index 2a682ee7d1f..c3863d418a8 100644 --- a/apps/sim/lib/core/async-jobs/backends/trigger-dev.ts +++ b/apps/sim/lib/core/async-jobs/backends/trigger-dev.ts @@ -19,6 +19,7 @@ const JOB_TYPE_TO_TASK_ID: Record = { 'workflow-execution': 'workflow-execution', 'schedule-execution': 'schedule-execution', 'webhook-execution': 'webhook-execution', + 'resume-execution': 'resume-execution', } /** diff --git a/apps/sim/lib/core/async-jobs/types.ts b/apps/sim/lib/core/async-jobs/types.ts index 6888ab10723..7b09bdb7b0d 100644 --- a/apps/sim/lib/core/async-jobs/types.ts +++ b/apps/sim/lib/core/async-jobs/types.ts @@ -20,7 +20,11 @@ export const JOB_STATUS = { export type JobStatus = (typeof JOB_STATUS)[keyof typeof JOB_STATUS] -export type JobType = 'workflow-execution' | 'schedule-execution' | 'webhook-execution' +export type JobType = + | 'workflow-execution' + | 'schedule-execution' + | 'webhook-execution' + | 'resume-execution' export type AsyncExecutionCorrelationSource = 'workflow' | 'schedule' | 'webhook' diff --git a/apps/sim/lib/core/bullmq/queues.ts b/apps/sim/lib/core/bullmq/queues.ts index 2278a309d95..f90c2e90c85 100644 --- a/apps/sim/lib/core/bullmq/queues.ts +++ b/apps/sim/lib/core/bullmq/queues.ts @@ -16,6 +16,7 @@ export interface BullMQJobData { let workflowQueueInstance: Queue | null = null let webhookQueueInstance: Queue | null = null let scheduleQueueInstance: Queue | null = null +let resumeQueueInstance: Queue | null = null let knowledgeConnectorSyncQueueInstance: Queue | null = null let knowledgeDocumentProcessingQueueInstance: Queue | null = null let mothershipJobExecutionQueueInstance: Queue | null = null @@ -45,6 +46,12 @@ function getQueueDefaultOptions(type: JobType) { removeOnComplete: { age: 24 * 60 * 60 }, removeOnFail: { age: 3 * 24 * 60 * 60 }, } + case 'resume-execution': + return { + attempts: 1, + removeOnComplete: { age: 24 * 60 * 60 }, + removeOnFail: { age: 3 * 24 * 60 * 60 }, + } } } @@ -121,6 +128,11 @@ export function getBullMQQueue(type: JobType): Queue { scheduleQueueInstance = createQueue(type) } return scheduleQueueInstance + case 'resume-execution': + if (!resumeQueueInstance) { + resumeQueueInstance = createQueue(type) + } + return resumeQueueInstance } } @@ -129,6 +141,7 @@ export function getBullMQQueueByName(queueName: WorkspaceDispatchQueueName): Que case 'workflow-execution': case 'webhook-execution': case 'schedule-execution': + case 'resume-execution': return getBullMQQueue(queueName) case KNOWLEDGE_CONNECTOR_SYNC_QUEUE: return getKnowledgeConnectorSyncQueue() diff --git a/apps/sim/lib/workflows/executor/execute-workflow.ts b/apps/sim/lib/workflows/executor/execute-workflow.ts index 18d42472a6f..fccded08c78 100644 --- a/apps/sim/lib/workflows/executor/execute-workflow.ts +++ b/apps/sim/lib/workflows/executor/execute-workflow.ts @@ -30,6 +30,7 @@ export interface ExecuteWorkflowOptions { startBlockId: string sourceSnapshot: SerializableExecutionState } + executionMode?: 'sync' | 'stream' | 'async' } export interface WorkflowInfo { @@ -70,6 +71,7 @@ export async function executeWorkflow( useDraftState: streamConfig?.useDraftState ?? false, startTime: new Date().toISOString(), isClientSession: false, + executionMode: streamConfig?.executionMode, } const snapshot = new ExecutionSnapshot( diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 2c0d00fcbb7..7a1c8cfb4f4 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -111,6 +111,10 @@ interface StartResumeExecutionArgs { contextId: string resumeInput: unknown userId: string + sendEvent?: (event: ExecutionEvent) => void + onStream?: (streamingExec: StreamingExecution) => Promise + onBlockComplete?: (blockId: string, output: unknown) => Promise + abortSignal?: AbortSignal } export class PauseResumeManager { @@ -293,8 +297,18 @@ export class PauseResumeManager { } static async startResumeExecution(args: StartResumeExecutionArgs): Promise { - const { resumeEntryId, resumeExecutionId, pausedExecution, contextId, resumeInput, userId } = - args + const { + resumeEntryId, + resumeExecutionId, + pausedExecution, + contextId, + resumeInput, + userId, + sendEvent, + onStream, + onBlockComplete, + abortSignal, + } = args const pausePointsRecord = pausedExecution.pausePoints as Record const pausePointForContext = pausePointsRecord?.[contextId] @@ -309,6 +323,10 @@ export class PauseResumeManager { contextId, resumeInput, userId, + sendEvent, + onStream, + onBlockComplete, + abortSignal, }) if (result.status === 'paused') { @@ -384,8 +402,22 @@ export class PauseResumeManager { contextId: string resumeInput: unknown userId: string + sendEvent?: (event: ExecutionEvent) => void + onStream?: (streamingExec: StreamingExecution) => Promise + onBlockComplete?: (blockId: string, output: unknown) => Promise + abortSignal?: AbortSignal }): Promise { - const { resumeExecutionId, pausedExecution, contextId, resumeInput, userId } = args + const { + resumeExecutionId, + pausedExecution, + contextId, + resumeInput, + userId, + sendEvent, + onStream: externalOnStream, + onBlockComplete: externalOnBlockComplete, + abortSignal: externalAbortSignal, + } = args const parentExecutionId = pausedExecution.executionId await db @@ -798,6 +830,7 @@ export class PauseResumeManager { localEventSeq++ event.eventId = localEventSeq eventWriter.write(event).catch(() => {}) + sendEvent?.(event) } writeBufferedEvent({ @@ -887,6 +920,10 @@ export class PauseResumeManager { workflowId, data: hasError ? { ...sharedData, error: output?.error } : { ...sharedData, output }, } as ExecutionEvent) + + if (externalOnBlockComplete) { + await externalOnBlockComplete(blockId, callbackData.output) + } }, onChildWorkflowInstanceReady: ( blockId: string, @@ -911,6 +948,11 @@ export class PauseResumeManager { } as ExecutionEvent) }, onStream: async (streamingExec: StreamingExecution) => { + if (externalOnStream) { + await externalOnStream(streamingExec) + return + } + const blockId = (streamingExec.execution as unknown as Record) .blockId as string const reader = streamingExec.stream.getReader() @@ -949,9 +991,9 @@ export class PauseResumeManager { }, } - const timeoutController = createTimeoutAbortController( - preprocessingResult.executionTimeout?.async - ) + const timeoutController = externalAbortSignal + ? null + : createTimeoutAbortController(preprocessingResult.executionTimeout?.async) let result: ExecutionResult let finalMetaStatus: 'complete' | 'error' | 'cancelled' = 'complete' @@ -963,15 +1005,15 @@ export class PauseResumeManager { skipLogCreation: true, includeFileBase64: true, base64MaxBytes: undefined, - abortSignal: timeoutController.signal, + abortSignal: externalAbortSignal ?? timeoutController?.signal, }) if ( result.status === 'cancelled' && - timeoutController.isTimedOut() && - timeoutController.timeoutMs + timeoutController?.isTimedOut() && + timeoutController?.timeoutMs ) { - const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController!.timeoutMs) logger.info('Resume execution timed out', { resumeExecutionId, timeoutMs: timeoutController.timeoutMs, @@ -1042,7 +1084,7 @@ export class PauseResumeManager { finalMetaStatus = 'error' throw execError } finally { - timeoutController.cleanup() + timeoutController?.cleanup() try { await eventWriter.close() } catch (closeError) { @@ -1246,6 +1288,17 @@ export class PauseResumeManager { ) } + static async getPausedExecutionById( + id: string + ): Promise { + const rows = await db + .select() + .from(pausedExecutions) + .where(eq(pausedExecutions.id, id)) + .limit(1) + return rows[0] ?? null + } + static async getPausedExecutionDetail(options: { workflowId: string executionId: string diff --git a/apps/sim/lib/workflows/streaming/streaming.ts b/apps/sim/lib/workflows/streaming/streaming.ts index 081c497b8ee..759bf4d28e5 100644 --- a/apps/sim/lib/workflows/streaming/streaming.ts +++ b/apps/sim/lib/workflows/streaming/streaming.ts @@ -12,7 +12,6 @@ import { cleanupExecutionBase64Cache, hydrateUserFilesWithBase64, } from '@/lib/uploads/utils/user-file-base64.server' -import { executeWorkflow } from '@/lib/workflows/executor/execute-workflow' import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types' /** @@ -36,25 +35,24 @@ export interface StreamingConfig { timeoutMs?: number } +export type StreamingExecutorFn = (callbacks: { + onStream: (streamingExec: StreamingExecution) => Promise + onBlockComplete: (blockId: string, output: unknown) => Promise + abortSignal: AbortSignal +}) => Promise + export interface StreamingResponseOptions { requestId: string - workflow: { - id: string - userId: string - workspaceId?: string | null - isDeployed?: boolean - variables?: Record - } - input: unknown - executingUserId: string streamConfig: StreamingConfig executionId?: string + executeFn: StreamingExecutorFn } interface StreamingState { streamedChunks: Map processedOutputs: Set streamCompletionTimes: Map + completedBlockIds: Set } function resolveStreamedContent(state: StreamingState): Map { @@ -77,6 +75,7 @@ async function buildMinimalResult( result: ExecutionResult, selectedOutputs: string[] | undefined, streamedContent: Map, + completedBlockIds: Set, requestId: string, includeFileBase64: boolean, base64MaxBytes: number | undefined @@ -87,6 +86,11 @@ async function buildMinimalResult( output: {} as Record, } + if (result.status === 'paused') { + minimalResult.output = result.output || {} + return minimalResult + } + if (!selectedOutputs?.length) { minimalResult.output = result.output || {} return minimalResult @@ -103,6 +107,10 @@ async function buildMinimalResult( continue } + if (!completedBlockIds.has(blockId)) { + continue + } + if (isDangerousKey(blockId)) { logger.warn(`[${requestId}] Blocked dangerous blockId: ${blockId}`) continue @@ -182,7 +190,7 @@ async function completeLoggingSession(result: ExecutionResult): Promise { export async function createStreamingResponse( options: StreamingResponseOptions ): Promise { - const { requestId, workflow, input, executingUserId, streamConfig, executionId } = options + const { requestId, streamConfig, executionId, executeFn } = options const timeoutController = createTimeoutAbortController(streamConfig.timeoutMs) return new ReadableStream({ @@ -191,6 +199,7 @@ export async function createStreamingResponse( streamedChunks: new Map(), processedOutputs: new Set(), streamCompletionTimes: new Map(), + completedBlockIds: new Set(), } const sendChunk = (blockId: string, content: string) => { @@ -250,6 +259,8 @@ export async function createStreamingResponse( const base64MaxBytes = streamConfig.base64MaxBytes const onBlockCompleteCallback = async (blockId: string, output: unknown) => { + state.completedBlockIds.add(blockId) + if (!streamConfig.selectedOutputs?.length) { return } @@ -284,25 +295,11 @@ export async function createStreamingResponse( } try { - const result = await executeWorkflow( - workflow, - requestId, - input, - executingUserId, - { - enabled: true, - selectedOutputs: streamConfig.selectedOutputs, - isSecureMode: streamConfig.isSecureMode, - workflowTriggerType: streamConfig.workflowTriggerType, - onStream: onStreamCallback, - onBlockComplete: onBlockCompleteCallback, - skipLoggingComplete: true, - includeFileBase64: streamConfig.includeFileBase64, - base64MaxBytes: streamConfig.base64MaxBytes, - abortSignal: timeoutController.signal, - }, - executionId - ) + const result = await executeFn({ + onStream: onStreamCallback, + onBlockComplete: onBlockCompleteCallback, + abortSignal: timeoutController.signal, + }) const streamedContent = state.streamedChunks.size > 0 ? resolveStreamedContent(state) : new Map() @@ -336,12 +333,21 @@ export async function createStreamingResponse( result, streamConfig.selectedOutputs, streamedContent, + state.completedBlockIds, requestId, streamConfig.includeFileBase64 ?? true, streamConfig.base64MaxBytes ) - controller.enqueue(encodeSSE({ event: 'final', data: minimalResult })) + controller.enqueue( + encodeSSE({ + event: 'final', + data: { + ...minimalResult, + ...(result.status === 'paused' && { status: 'paused' }), + }, + }) + ) } controller.enqueue(encodeSSE('[DONE]')) diff --git a/apps/sim/worker/index.ts b/apps/sim/worker/index.ts index aaf71dd5aab..e5459af3f37 100644 --- a/apps/sim/worker/index.ts +++ b/apps/sim/worker/index.ts @@ -14,6 +14,7 @@ import { startWorkerHealthServer, updateWorkerHealthState } from '@/worker/healt import { processKnowledgeConnectorSync } from '@/worker/processors/knowledge-connector-sync' import { processKnowledgeDocument } from '@/worker/processors/knowledge-document-processing' import { processMothershipJobExecution } from '@/worker/processors/mothership-job-execution' +import { processResume } from '@/worker/processors/resume' import { processSchedule } from '@/worker/processors/schedule' import { processWebhook } from '@/worker/processors/webhook' import { processWorkflow } from '@/worker/processors/workflow' @@ -25,6 +26,7 @@ const DEFAULT_WORKER_PORT = 3001 const DEFAULT_WORKFLOW_CONCURRENCY = 50 const DEFAULT_WEBHOOK_CONCURRENCY = 30 const DEFAULT_SCHEDULE_CONCURRENCY = 20 +const DEFAULT_RESUME_CONCURRENCY = 20 const DEFAULT_MOTHERSHIP_JOB_CONCURRENCY = 10 const DEFAULT_CONNECTOR_SYNC_CONCURRENCY = 5 const DEFAULT_DOCUMENT_PROCESSING_CONCURRENCY = 20 @@ -80,6 +82,14 @@ async function main() { ), }) + const resumeWorker = new Worker('resume-execution', processResume, { + connection, + concurrency: parseWorkerNumber( + process.env.WORKER_CONCURRENCY_RESUME, + DEFAULT_RESUME_CONCURRENCY + ), + }) + const mothershipJobWorker = new Worker( MOTHERSHIP_JOB_EXECUTION_QUEUE, processMothershipJobExecution, @@ -132,6 +142,7 @@ async function main() { workflowWorker, webhookWorker, scheduleWorker, + resumeWorker, mothershipJobWorker, connectorSyncWorker, documentProcessingWorker, diff --git a/apps/sim/worker/processors/resume.ts b/apps/sim/worker/processors/resume.ts new file mode 100644 index 00000000000..d5dba569bb9 --- /dev/null +++ b/apps/sim/worker/processors/resume.ts @@ -0,0 +1,22 @@ +import { createLogger } from '@sim/logger' +import type { Job } from 'bullmq' +import type { BullMQJobData } from '@/lib/core/bullmq' +import { runDispatchedJob } from '@/lib/core/workspace-dispatch' +import { executeResumeJob, type ResumeExecutionPayload } from '@/background/resume-execution' + +const logger = createLogger('BullMQResumeProcessor') + +export async function processResume(job: Job>) { + const { payload } = job.data + const isFinalAttempt = job.attemptsMade + 1 >= (job.opts.attempts ?? 1) + + logger.info('Processing resume execution job', { + jobId: job.id, + resumeExecutionId: payload.resumeExecutionId, + workflowId: payload.workflowId, + }) + + return runDispatchedJob(job.data.metadata, () => executeResumeJob(payload), { + isFinalAttempt, + }) +} diff --git a/bun.lock b/bun.lock index e05bc532f5e..f8bde9a6cf3 100644 --- a/bun.lock +++ b/bun.lock @@ -1,6 +1,5 @@ { "lockfileVersion": 1, - "configVersion": 0, "workspaces": { "": { "name": "simstudio",