feat: add buffering layer to BulkWriter#1470
Conversation
| } else { | ||
| // Increment the _lastOp pointer to ensure that the promise returned by | ||
| // flush() includes this operation. | ||
| this._lastOp = this._lastOp.then(() => { |
There was a problem hiding this comment.
I think we should do this in both cases and remove the duplication between here and _sendFn.
| } | ||
| ); | ||
| this._bufferedOperations.push(() => | ||
| this._sendFn(enqueueOnBatchCallback, bulkWriterOp) |
There was a problem hiding this comment.
Do we need to increment pendingOpsCount? (did not check)
Codecov Report
@@ Coverage Diff @@
## master #1470 +/- ##
=======================================
Coverage 98.21% 98.21%
=======================================
Files 32 32
Lines 19567 19651 +84
Branches 1373 1379 +6
=======================================
+ Hits 19217 19301 +84
Misses 346 346
Partials 4 4
Continue to review full report at Codecov.
|
1b236d8 to
57a90b6
Compare
schmidt-sebastian
left a comment
There was a problem hiding this comment.
This looks great! Some optional suggestions.
| * BulkWriter instance. BulkWriter buffers additional writes after this many | ||
| * pending operations in order to avoiding going OOM. | ||
| */ | ||
| const DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT = 10000; |
There was a problem hiding this comment.
I don't think there is any actual benefit to having 500 RPCs outstanding. Do you see any differences in throughput? Ideally, the number of outstanding RPCs should be well below 100, so that we don't have to initialize new clients.
There was a problem hiding this comment.
Changed to 500, after running some throughput tests. I don't see any drastic changes, but the throughput usually has pretty high variance unless I run tests for > 1 hour.
| * The number of pending operations enqueued on this BulkWriter instance. | ||
| * @private | ||
| */ | ||
| private _pendingOpsCount = 0; |
There was a problem hiding this comment.
'pending' might not be the right term since it does not necessarily indicate that only includes operations that have been sent. I wonder if there is a better term.
There was a problem hiding this comment.
Couldn't think of a better term. It seemed like pending was the closest thing, but I added a separate comment to describe what it means.
| this._lastOp = this._lastOp.then(() => { | ||
| return silencePromise(bulkWriterOp.promise); | ||
| }); |
There was a problem hiding this comment.
| this._lastOp = this._lastOp.then(() => { | |
| return silencePromise(bulkWriterOp.promise); | |
| }); | |
| this._lastOp = this._lastOp.then(() => silencePromise(bulkWriterOp.promise)); |
Optional
| // buffer. | ||
| if (this._pendingOpsCount < this._maxPendingOpCount) { | ||
| this._pendingOpsCount++; | ||
| this._sendFn(enqueueOnBatchCallback, bulkWriterOp); |
There was a problem hiding this comment.
You might be able to simplify this further if you move the incrementing and decrementing into _sendFn. You only have to increment once and you can move the call to _processBufferedOps() to:
this._lastOp = this._lastOp.then(() => silencePromise(op.promise));
Optional.
There was a problem hiding this comment.
I moved the incrementing call into sendFn. However, _processedBufferedOps has to be part of the returned promise in order for it to get run. Any promise off of lastOp doesn't get run by Node, which then creates the race condition with bulkCommit() in _sendBatch.
| .catch(err => { | ||
| this._processBufferedOps(); | ||
| throw err; | ||
| }); |
There was a problem hiding this comment.
You can probably return this._lastOp if you move this code to either line 862 above or _sendFn.
There was a problem hiding this comment.
I can't return lastOp since it's Promise<void>, but the return type is Promise<WriteResult>.
| }, | ||
| ]); | ||
| // eslint-disable-next-line no-console | ||
| console.log('BCHEN start buffer test'); |
| * The number of pending operations enqueued on this BulkWriter instance. | ||
| * @private | ||
| */ | ||
| private _pendingOpsCount = 0; |
There was a problem hiding this comment.
Couldn't think of a better term. It seemed like pending was the closest thing, but I added a separate comment to describe what it means.
| this._lastOp = this._lastOp.then(() => { | ||
| return silencePromise(bulkWriterOp.promise); | ||
| }); |
| }, | ||
| ]); | ||
| // eslint-disable-next-line no-console | ||
| console.log('BCHEN start buffer test'); |
| .catch(err => { | ||
| this._processBufferedOps(); | ||
| throw err; | ||
| }); |
There was a problem hiding this comment.
I can't return lastOp since it's Promise<void>, but the return type is Promise<WriteResult>.
| // buffer. | ||
| if (this._pendingOpsCount < this._maxPendingOpCount) { | ||
| this._pendingOpsCount++; | ||
| this._sendFn(enqueueOnBatchCallback, bulkWriterOp); |
There was a problem hiding this comment.
I moved the incrementing call into sendFn. However, _processedBufferedOps has to be part of the returned promise in order for it to get run. Any promise off of lastOp doesn't get run by Node, which then creates the race condition with bulkCommit() in _sendBatch.
| * BulkWriter instance. BulkWriter buffers additional writes after this many | ||
| * pending operations in order to avoiding going OOM. | ||
| */ | ||
| const DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT = 10000; |
There was a problem hiding this comment.
Changed to 500, after running some throughput tests. I don't see any drastic changes, but the throughput usually has pretty high variance unless I run tests for > 1 hour.
1a163fc to
fc262b5
Compare
This PR adds a buffer layer to BulkWriter to limit the number of outstanding RPCs that BulkWriter has. This helps prevent BulkWriter from OOMing when multiple RPCs start failing due to hotspotting or contention.