Express-Queued is an independent queue system for Express.js that enables asynchronous execution of tasks in multiple concurrent queues. This system is ideal for time-consuming operations such as image processing, email sending, payment processing, and other operations that shouldn't block the immediate response to user requests.
- Support for N queues: Create as many queues as needed, each with its own configuration.
- Configurable concurrency: Control how many tasks execute simultaneously per queue.
- Custom hooks: Customize processing and fallback logic for each queue using the hooks system from express-hooked.
- Retry system: Automatic failure handling with configurable retries.
- Priority management: Assign priorities to tasks to determine their execution order.
- Fallback system: Handle tasks that definitively fail.
- Fully asynchronous: Does not block the main execution thread.
- Express integration: Middleware to inject the queue system into Express requests.
npm install express-queued express-hookedconst express = require('express');
const ExpressQueueIntegration = require('express-queued');
const app = express();
const expressQueue = new ExpressQueueIntegration();// Create a queue with default configuration
expressQueue.createQueue('general-processing', {
concurrency: 2, // Maximum 2 tasks running simultaneously
retryAttempts: 3, // Retry up to 3 times if it fails
retryDelay: 1000 // Wait 1 second between retries
});// Function representing a task
function processImage(data, taskObj) {
return new Promise((resolve, reject) => {
// Simulate processing
setTimeout(() => {
console.log(`Processing image:`, data);
resolve('Image processing completed');
}, 2000);
});
}// Add a task to the queue
const taskId = expressQueue.addTask(
'general-processing', // Queue name
processImage, // Task function
{ id: 1, filename: 'image1.jpg' }, // Data for the task
0 // Priority (0 is highest)
);// Start task execution
expressQueue.start();// Middleware to inject the queue system into requests
app.use(expressQueue.queueMiddleware());// Route to add a task
app.post('/process-image', (req, res) => {
const { filename, size } = req.body;
const taskId = req.addTask('general-processing', processImage, {
filename,
size
});
res.json({
message: 'Image processing task added to queue',
taskId
});
});
// Route to get queue system status
app.get('/queue-status', (req, res) => {
const status = req.getQueueStatus();
res.json(status);
});The queue system integrates with the hooks system from express-hooked, allowing customization of system behavior through extension points.
queue_execute_task: Executes before processing a task. Allows modifying task logic.queue_task_completed: Executes when a task completes successfully.queue_task_failed: Executes when a task fails.queue_task_retry: Executes when a task is retried.queue_task_fallback: Executes when a task definitively fails after all retries.queue_task_added: Executes when a task is added to the queue.queue_system_started: Executes when the queue system starts.queue_system_stopped: Executes when the queue system stops.
const { HookSystem } = require('express-hooked');
// Create a specific hook system for a queue
const imageHooks = new HookSystem();
// Hook to customize processing logic
imageHooks.addAction('queue_execute_task', (task, queueName, taskObj) => {
console.log(`Preparing to process image: ${taskObj.id}`);
// Wrap the original task with additional logic
const wrappedTask = async (data, taskObj) => {
console.log(`Starting image processing: ${taskObj.id}`);
try {
const result = await task(data, taskObj);
console.log(`Image processed successfully: ${taskObj.id}`);
return result;
} catch (error) {
console.log(`Error processing image: ${taskObj.id} - ${error.message}`);
throw error;
}
};
return wrappedTask;
});
// Create the queue with custom hooks
expressQueue.createQueue('images', {
concurrency: 3,
retryAttempts: 2,
retryDelay: 1000
}, imageHooks);The queue system includes a robust mechanism for handling errors:
expressQueue.createQueue('reliable-processing', {
retryAttempts: 5, // Maximum number of retries
retryDelay: 2000 // Milliseconds to wait between retries
});When a task fails after all retries, the fallback logic executes:
const hooks = new HookSystem();
hooks.addAction('queue_task_fallback', (queueName, task, error) => {
// Implement custom logic to handle failed tasks
console.log(`Definitively failed task:`, task.id);
console.log(`Error:`, error.message);
// For example, save to a database for manual processing
// or send an alert to the support team
});// Image processing queue
expressQueue.createQueue('image-processing', {
concurrency: 3,
retryAttempts: 2,
retryDelay: 1000
});
// Email sending queue
expressQueue.createQueue('email-sending', {
concurrency: 2,
retryAttempts: 3,
retryDelay: 2000
});
// Payment processing queue
expressQueue.createQueue('payment-processing', {
concurrency: 1,
retryAttempts: 5,
retryDelay: 3000
});Tasks can be assigned different priority levels:
// High priority task (priority 0)
expressQueue.addTask('image-processing', processImage, { filename: 'urgent.jpg' }, 0);
// Medium priority task (priority 5)
expressQueue.addTask('image-processing', processImage, { filename: 'normal.jpg' }, 5);
// Low priority task (priority 10)
expressQueue.addTask('image-processing', processImage, { filename: 'optional.jpg' }, 10);// Pause a queue
app.post('/pause-queue/:queueName', (req, res) => {
const { queueName } = req.params;
expressQueue.pauseQueue(queueName);
res.json({ message: `Queue ${queueName} paused` });
});
// Resume a queue
app.post('/resume-queue/:queueName', (req, res) => {
const { queueName } = req.params;
expressQueue.resumeQueue(queueName);
res.json({ message: `Queue ${queueName} resumed` });
});
// Clear a queue
app.post('/clear-queue/:queueName', (req, res) => {
const { queueName } = req.params;
expressQueue.clearQueue(queueName);
res.json({ message: `Queue ${queueName} cleared` });
});
// Retry failed tasks
app.post('/retry-failed/:queueName', (req, res) => {
const { queueName } = req.params;
expressQueue.retryFailedTasks(queueName);
res.json({ message: `Retrying failed tasks in queue ${queueName}` });
});options.concurrency: Default number of concurrent workers (default: 1)options.retryAttempts: Default number of retries (default: 3)options.retryDelay: Default delay between retries in ms (default: 1000)
Creates a queue with the specified configuration.
queueName: Name of the queueoptions.concurrency: Number of concurrent workersoptions.retryAttempts: Number of retriesoptions.retryDelay: Delay between retries in mshooks: Hook system for this queue
Adds a task to the specified queue.
queueName: Name of the queuetask: Function representing the taskdata: Data for the taskpriority: Task priority (lower number = higher priority)
Starts task execution in all queues.
Stops task execution in all queues.
Gets the current status of the queue system.
Retries all failed tasks in the specified queue.
Middleware to inject the queue system into Express requests.
Pauses the specified queue.
Resumes the specified queue.
Clears pending tasks from the specified queue.
Source code available at: https://gitlab.com/bytedogssyndicate1/express-queued
Apache 2.0