Conversation
6693a0c to
695236a
Compare
4a02384 to
a585e85
Compare
* switch to `ioredis` * drop callback support BREAKING CHANGE: drop node 4 and 6, switch to ioredis, drop callback support.
|
Very exciting, I'd love to evaluate this in the @mixmaxhq wilderness. |
| // TODO: what DO | ||
| // TODO: reintroduce tests | ||
| if (cb) { | ||
| promise.then( | ||
| (value) => process.nextTick(() => cb(null, value)), | ||
| (err) => process.nextTick(() => cb(err)) | ||
| ); | ||
| } |
There was a problem hiding this comment.
So this is sad, and I would like to find another solution. The checkStalledJobs method supports a callback so that the library user may receive callbacks for all attempted stall checks - not just the first. Removing the callback doesn't permit us to propagate any useful information, as the process won't really finish until Queue#close is called.
Perhaps we use events and emit an event? How do we handle errors in that case?
| const final = Promise.all( | ||
| jobs.map((job) => { | ||
| const defer = helpers.defer(); | ||
| return job | ||
| ._save((evalArgs) => { | ||
| this._evalScriptOn(batch, evalArgs); | ||
| defers.push(defer); | ||
| return defer.promise; | ||
| }) | ||
| .catch((err) => { | ||
| // Catch both errors from defer.reject below (from the EVALSHA), and | ||
| // from any serialization failures. | ||
| errors.set(job, err); | ||
| }); | ||
| }) | ||
| ); | ||
| const results = await batch.exec(); | ||
| for (const [defer, [error, jobId]] of helpers.zip(defers, results)) { | ||
| if (error) { | ||
| defer.reject(error); | ||
| } else { | ||
| defer.resolve(jobId); | ||
| } | ||
| return helpers.callAsync((done) => batch.exec(done)).then(() => errors); | ||
| }); | ||
| } | ||
| await final; |
There was a problem hiding this comment.
ioredis has abysmal support for pipelining as the pipeline commands don't return promises but instead return the pipeline for chaining. This means we have to manually identify which of the jobs each command corresponds to :\
lib/redis.js
Outdated
| isNewClient = false; | ||
| } | ||
| } else { | ||
| client = Redis.createClient(settings); |
There was a problem hiding this comment.
This is deprecated, but we override it in tests and can't replace the bare function as easily as we can the createClient export.
| // HACK: flushQueue is, according to ioredis, private. This is unfortunate, | ||
| // because we need this in order to properly clean up during brpoplpush | ||
| // commands. |
There was a problem hiding this comment.
Rather gross. Other options?
| function errorCode(err) { | ||
| rBoundary.lastIndex = 1; | ||
| const { message } = err, | ||
| match = rBoundary.exec(err.message); | ||
| return err.message.slice(0, match ? match.index : undefined); | ||
| } |
There was a problem hiding this comment.
Probably worth submitting upstream, as this is a nice feature of NodeRedis and impacts #316
| if (name === 'Redis') return 'ioredis'; | ||
| if (name === 'RedisClient') return 'NodeRedis'; |
| // invariant: in this code path, this.running < this.concurrency, always | ||
| // after spoolup, this.running + this.queued === this.concurrency |
There was a problem hiding this comment.
Very much would like to replace this with a more maintainable pattern that specifically handles this concurrency management.
|
Has conflicts and is abandoned, closing. |
I took #112 and rebased it against the default branch - phew!
I probably preserved way more of the individual commits than I needed to 😁 (edit: still reachable from 6693a0c)
Blocking TODOs:
Might release this as a 2.0 alpha so our friends over @mixmaxhq can evaluate it in wild, if they're willing.
@LewisJEllis had some fantastic ideas about next steps in #112, and I look forward to pursuing them after we've landed the simplest possible 2.0 that prevents further breakage.
I'll post a review
in a minuteeventually that calls out some concerns.See also #104. Fixes #16 and closes #21.