Skip to content

[Bug]: Manual job processing not working as described regarding events and stalled handling #3911

@vykintazo

Description

@vykintazo

Version

5.71.1

Platform

NodeJS

What happened?

This is regarding 2 separate issues, one of which I discovered while reproducing the first one.

1. In manual processing job stalling does not work as expected or described in the docs

In documentation it's explicitly stated:

This checker is needed to move stalled jobs (whose lock has expired) back to the wait status (or failed if they have exhausted the maximum number of stalled attempts

But in reality, the checker moves stalled jobs to waiting status and only sets deferredFailure on them. They are moved to failed status only by the automatic job processing, so that never gets done here. So the documentation is incorrect and arguably functionality is too.

2. Certain Queue and Worker events never get triggered when they should

  1. Queue waiting event - get's triggered only on queue.add() but not when as in my case job moves back to waiting due to being stalled.
  2. Worker active event - again it seems it's only triggered in automatic processing mode, so not actually representative of the jobs state transition - worker.getNextJob() moves job to active, but doesn't trigger the event.

Everything can be reproduced by running the snippet below.

How to reproduce.

import { randomUUID } from 'node:crypto';
import { Queue, Worker } from 'bullmq';
import type Redis from 'ioredis';

// Implement this to connect to Redis
declare function createConnection(): Redis;

const sleep = (interval: number = 60_000) =>
  new Promise((resolve) =>
    setTimeout(async () => {
      resolve(null);
    }, interval),
  );

async function main() {
  const conn1 = createConnection();
  const conn2 = createConnection();

  const queue = new Queue('test-queue', { connection: conn1 });

  queue.on('error', (err) => {
    console.log('QUEUE ERROR', { message: err.message });
  });
  queue.on('waiting', async (job) => {
    // NOTE: gets only called on queue.add, not status transition.
    console.log('QUEUE EVENT: waiting', { jobId: job?.id });
    await sleep(2_000);
    console.log('Attempting to fetch waiting job.');
    const fetchedJob = await worker.getNextJob(randomUUID());
    console.log('Fetched successfully', { jobId: fetchedJob.id });
  });

  const cleaned = await queue.clean(0, 100, 'wait');
  console.log(`Cleaned ${cleaned.length} jobs.`);
  const worker = new Worker('test-queue', async (_job) => {}, {
    connection: conn2,
    autorun: false,
    lockDuration: 5_000,
    stalledInterval: 2_000,
    maxStalledCount: 0,
  });

  worker.on('error', (err) => {
    console.log('WORKER EVENT: error', { message: err.message });
  });
  worker.on('stalled', async (jobId, prev) => {
    console.log('WORKER EVENT: stalled', { jobId, prev });
    await sleep(2_000);
    const counts = await queue.getJobCounts();
    console.log('Job counts', counts);
  });
  worker.on('failed', (job, err) => {
    console.log('WORKER EVENT: failed', {
      jobId: job?.id,
      attemptsMade: job?.attemptsMade,
      message: err?.message,
    });
  });
  worker.on('completed', (job) => {
    console.log('WORKER EVENT: completed', { jobId: job?.id });
  });
  worker.on('active', (job) => {
    // NOTE: never gets called
    console.log('WORKER EVENT: active', { jobId: job?.id });
  });
  worker.on('ready', () => {
    console.log('WORKER EVENT: ready');
  });

  // Start stall checker — this is the mechanism under test
  await worker.startStalledCheckTimer();
  await sleep(2_000);

  // Add job
  const job = await queue.add('test-job', {
    callId: 'test-123',
    payload: 'hello',
  });
  console.log('Job added to queue', { jobId: job.id });

  // Stop after 60 seconds
  await sleep(60_000);
  console.log('Exiting');
  process.exit();
}

main().catch((err) => {
  console.log('Error', err);
});

Relevant log output

Cleaned 0 jobs.
WORKER EVENT: ready
QUEUE EVENT: waiting { jobId: '1' }
Job added to queue { jobId: '1' }
Attempting to fetch waiting job.
Fetched successfully { jobId: '1' }
WORKER EVENT: stalled { jobId: '1', prev: 'active' }
Job counts {
  active: 0,
  completed: 0,
  delayed: 0,
  failed: 0,
  paused: 0,
  prioritized: 0,
  waiting: 1,
  'waiting-children': 0
}
Exiting

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions