Version
5.71.1
Platform
NodeJS
What happened?
This is regarding 2 separate issues, one of which I discovered while reproducing the first one.
1. In manual processing job stalling does not work as expected or described in the docs
In documentation it's explicitly stated:
This checker is needed to move stalled jobs (whose lock has expired) back to the wait status (or failed if they have exhausted the maximum number of stalled attempts
But in reality, the checker moves stalled jobs to waiting status and only sets deferredFailure on them. They are moved to failed status only by the automatic job processing, so that never gets done here. So the documentation is incorrect and arguably functionality is too.
2. Certain Queue and Worker events never get triggered when they should
- Queue
waiting event - get's triggered only on queue.add() but not when as in my case job moves back to waiting due to being stalled.
- Worker
active event - again it seems it's only triggered in automatic processing mode, so not actually representative of the jobs state transition - worker.getNextJob() moves job to active, but doesn't trigger the event.
Everything can be reproduced by running the snippet below.
How to reproduce.
import { randomUUID } from 'node:crypto';
import { Queue, Worker } from 'bullmq';
import type Redis from 'ioredis';
// Implement this to connect to Redis
declare function createConnection(): Redis;
const sleep = (interval: number = 60_000) =>
new Promise((resolve) =>
setTimeout(async () => {
resolve(null);
}, interval),
);
async function main() {
const conn1 = createConnection();
const conn2 = createConnection();
const queue = new Queue('test-queue', { connection: conn1 });
queue.on('error', (err) => {
console.log('QUEUE ERROR', { message: err.message });
});
queue.on('waiting', async (job) => {
// NOTE: gets only called on queue.add, not status transition.
console.log('QUEUE EVENT: waiting', { jobId: job?.id });
await sleep(2_000);
console.log('Attempting to fetch waiting job.');
const fetchedJob = await worker.getNextJob(randomUUID());
console.log('Fetched successfully', { jobId: fetchedJob.id });
});
const cleaned = await queue.clean(0, 100, 'wait');
console.log(`Cleaned ${cleaned.length} jobs.`);
const worker = new Worker('test-queue', async (_job) => {}, {
connection: conn2,
autorun: false,
lockDuration: 5_000,
stalledInterval: 2_000,
maxStalledCount: 0,
});
worker.on('error', (err) => {
console.log('WORKER EVENT: error', { message: err.message });
});
worker.on('stalled', async (jobId, prev) => {
console.log('WORKER EVENT: stalled', { jobId, prev });
await sleep(2_000);
const counts = await queue.getJobCounts();
console.log('Job counts', counts);
});
worker.on('failed', (job, err) => {
console.log('WORKER EVENT: failed', {
jobId: job?.id,
attemptsMade: job?.attemptsMade,
message: err?.message,
});
});
worker.on('completed', (job) => {
console.log('WORKER EVENT: completed', { jobId: job?.id });
});
worker.on('active', (job) => {
// NOTE: never gets called
console.log('WORKER EVENT: active', { jobId: job?.id });
});
worker.on('ready', () => {
console.log('WORKER EVENT: ready');
});
// Start stall checker — this is the mechanism under test
await worker.startStalledCheckTimer();
await sleep(2_000);
// Add job
const job = await queue.add('test-job', {
callId: 'test-123',
payload: 'hello',
});
console.log('Job added to queue', { jobId: job.id });
// Stop after 60 seconds
await sleep(60_000);
console.log('Exiting');
process.exit();
}
main().catch((err) => {
console.log('Error', err);
});
Relevant log output
Cleaned 0 jobs.
WORKER EVENT: ready
QUEUE EVENT: waiting { jobId: '1' }
Job added to queue { jobId: '1' }
Attempting to fetch waiting job.
Fetched successfully { jobId: '1' }
WORKER EVENT: stalled { jobId: '1', prev: 'active' }
Job counts {
active: 0,
completed: 0,
delayed: 0,
failed: 0,
paused: 0,
prioritized: 0,
waiting: 1,
'waiting-children': 0
}
Exiting
Code of Conduct
Version
5.71.1
Platform
NodeJS
What happened?
This is regarding 2 separate issues, one of which I discovered while reproducing the first one.
1. In manual processing job stalling does not work as expected or described in the docs
In documentation it's explicitly stated:
But in reality, the checker moves stalled jobs to waiting status and only sets
deferredFailureon them. They are moved to failed status only by the automatic job processing, so that never gets done here. So the documentation is incorrect and arguably functionality is too.2. Certain Queue and Worker events never get triggered when they should
waitingevent - get's triggered only onqueue.add()but not when as in my case job moves back to waiting due to being stalled.activeevent - again it seems it's only triggered in automatic processing mode, so not actually representative of the jobs state transition -worker.getNextJob()moves job to active, but doesn't trigger the event.Everything can be reproduced by running the snippet below.
How to reproduce.
Relevant log output
Cleaned 0 jobs. WORKER EVENT: ready QUEUE EVENT: waiting { jobId: '1' } Job added to queue { jobId: '1' } Attempting to fetch waiting job. Fetched successfully { jobId: '1' } WORKER EVENT: stalled { jobId: '1', prev: 'active' } Job counts { active: 0, completed: 0, delayed: 0, failed: 0, paused: 0, prioritized: 0, waiting: 1, 'waiting-children': 0 } ExitingCode of Conduct