-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathRedisConsumer.js
More file actions
86 lines (69 loc) · 2.04 KB
/
RedisConsumer.js
File metadata and controls
86 lines (69 loc) · 2.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import redis from 'redis';
import Joi from 'joi';
import config from '../config/config';
import { Consumer } from './Consumer';
redis.add_command('xrange');
redis.add_command('xread');
const parseResponse = response => response.map(event => ({
topic: event[0],
eventId: event[1][0][0],
eventData: event[1][0][1][1]
}));
const parseSliceResponse = response => response.map(event => ({
eventId: event[0],
eventData: event[1][1]
}));
const subscribeArgsSchema = Joi.object().keys({
topics: Joi.array().items(Joi.string()),
topicOffsets: Joi.array().length(Joi.ref('topics.length')).items(Joi.string()).optional(),
readOnce: Joi.boolean().optional()
});
class RedisConsumer extends Consumer {
constructor(client) {
super();
this.client = client || new redis.RedisClient({
host: config.redis.host,
port: config.redis.port
});
}
subscribe(args, callback) {
const result = Joi.validate(args, subscribeArgsSchema);
if (result.error) {
throw result.error;
}
const topicOffsets = args.topicOffsets || args.topics.map(_ => '$');
const xreadParams = ['BLOCK', 0, 'STREAMS', ...args.topics, ...topicOffsets];
const onMessage = (error, response) => {
if (error) {
callback(error, null);
} else {
callback(null, parseResponse(response));
}
if (!args.readOnce) {
this.client.xread(xreadParams, onMessage);
}
};
this.client.xread(xreadParams, onMessage);
}
getSlice(args, callback) {
const startId = (args.startId) ? args.startId : '-';
const stopId = (args.stopId) ? args.stopId : '+';
const argsList = [args.topic, startId, stopId];
const count = args.count ? args.count : 0;
if (count) {
argsList.push('COUNT');
argsList.push(count);
}
this.client.xrange(argsList, (error, response) => {
if (error) {
callback(error, null);
} else {
callback(null, parseSliceResponse(response));
}
});
}
disconect() {
this.client.quit();
}
}
export default { RedisConsumer };