Skip to content
This repository was archived by the owner on Mar 4, 2026. It is now read-only.

feat: add buffering layer to BulkWriter#1470

Merged
thebrianchen merged 6 commits intomasterfrom
bc/bulk-buffer
Apr 13, 2021
Merged

feat: add buffering layer to BulkWriter#1470
thebrianchen merged 6 commits intomasterfrom
bc/bulk-buffer

Conversation

@thebrianchen
Copy link
Copy Markdown

@thebrianchen thebrianchen commented Apr 12, 2021

This PR adds a buffer layer to BulkWriter to limit the number of outstanding RPCs that BulkWriter has. This helps prevent BulkWriter from OOMing when multiple RPCs start failing due to hotspotting or contention.

@thebrianchen thebrianchen self-assigned this Apr 12, 2021
@thebrianchen thebrianchen requested review from a team April 12, 2021 16:03
@product-auto-label product-auto-label bot added the api: firestore Issues related to the googleapis/nodejs-firestore API. label Apr 12, 2021
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Apr 12, 2021
Comment thread dev/src/bulk-writer.ts Outdated
} else {
// Increment the _lastOp pointer to ensure that the promise returned by
// flush() includes this operation.
this._lastOp = this._lastOp.then(() => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do this in both cases and remove the duplication between here and _sendFn.

Comment thread dev/src/bulk-writer.ts Outdated
}
);
this._bufferedOperations.push(() =>
this._sendFn(enqueueOnBatchCallback, bulkWriterOp)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to increment pendingOpsCount? (did not check)

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 12, 2021

Codecov Report

Merging #1470 (740882f) into master (cf1949f) will increase coverage by 0.00%.
The diff coverage is 100.00%.

Impacted file tree graph

@@           Coverage Diff           @@
##           master    #1470   +/-   ##
=======================================
  Coverage   98.21%   98.21%           
=======================================
  Files          32       32           
  Lines       19567    19651   +84     
  Branches     1373     1379    +6     
=======================================
+ Hits        19217    19301   +84     
  Misses        346      346           
  Partials        4        4           
Impacted Files Coverage Δ
dev/src/bulk-writer.ts 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update cf1949f...740882f. Read the comment docs.

Copy link
Copy Markdown
Contributor

@schmidt-sebastian schmidt-sebastian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great! Some optional suggestions.

Comment thread dev/src/bulk-writer.ts Outdated
* BulkWriter instance. BulkWriter buffers additional writes after this many
* pending operations in order to avoiding going OOM.
*/
const DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT = 10000;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is any actual benefit to having 500 RPCs outstanding. Do you see any differences in throughput? Ideally, the number of outstanding RPCs should be well below 100, so that we don't have to initialize new clients.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to 500, after running some throughput tests. I don't see any drastic changes, but the throughput usually has pretty high variance unless I run tests for > 1 hour.

Comment thread dev/src/bulk-writer.ts
* The number of pending operations enqueued on this BulkWriter instance.
* @private
*/
private _pendingOpsCount = 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'pending' might not be the right term since it does not necessarily indicate that only includes operations that have been sent. I wonder if there is a better term.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't think of a better term. It seemed like pending was the closest thing, but I added a separate comment to describe what it means.

Comment thread dev/src/bulk-writer.ts Outdated
Comment on lines +861 to +866
this._lastOp = this._lastOp.then(() => {
return silencePromise(bulkWriterOp.promise);
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this._lastOp = this._lastOp.then(() => {
return silencePromise(bulkWriterOp.promise);
});
this._lastOp = this._lastOp.then(() => silencePromise(bulkWriterOp.promise));

Optional

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But is it? :)

Comment thread dev/src/bulk-writer.ts
// buffer.
if (this._pendingOpsCount < this._maxPendingOpCount) {
this._pendingOpsCount++;
this._sendFn(enqueueOnBatchCallback, bulkWriterOp);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be able to simplify this further if you move the incrementing and decrementing into _sendFn. You only have to increment once and you can move the call to _processBufferedOps() to:

this._lastOp = this._lastOp.then(() => silencePromise(op.promise));

Optional.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the incrementing call into sendFn. However, _processedBufferedOps has to be part of the returned promise in order for it to get run. Any promise off of lastOp doesn't get run by Node, which then creates the race condition with bulkCommit() in _sendBatch.

Comment thread dev/src/bulk-writer.ts
.catch(err => {
this._processBufferedOps();
throw err;
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably return this._lastOp if you move this code to either line 862 above or _sendFn.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't return lastOp since it's Promise<void>, but the return type is Promise<WriteResult>.

Comment thread dev/test/bulk-writer.ts Outdated
},
]);
// eslint-disable-next-line no-console
console.log('BCHEN start buffer test');
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Copy link
Copy Markdown
Author

@thebrianchen thebrianchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Comment thread dev/src/bulk-writer.ts
* The number of pending operations enqueued on this BulkWriter instance.
* @private
*/
private _pendingOpsCount = 0;
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't think of a better term. It seemed like pending was the closest thing, but I added a separate comment to describe what it means.

Comment thread dev/src/bulk-writer.ts Outdated
Comment on lines +861 to +866
this._lastOp = this._lastOp.then(() => {
return silencePromise(bulkWriterOp.promise);
});
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment thread dev/test/bulk-writer.ts Outdated
},
]);
// eslint-disable-next-line no-console
console.log('BCHEN start buffer test');
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment thread dev/src/bulk-writer.ts
.catch(err => {
this._processBufferedOps();
throw err;
});
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't return lastOp since it's Promise<void>, but the return type is Promise<WriteResult>.

Comment thread dev/src/bulk-writer.ts
// buffer.
if (this._pendingOpsCount < this._maxPendingOpCount) {
this._pendingOpsCount++;
this._sendFn(enqueueOnBatchCallback, bulkWriterOp);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the incrementing call into sendFn. However, _processedBufferedOps has to be part of the returned promise in order for it to get run. Any promise off of lastOp doesn't get run by Node, which then creates the race condition with bulkCommit() in _sendBatch.

Comment thread dev/src/bulk-writer.ts Outdated
* BulkWriter instance. BulkWriter buffers additional writes after this many
* pending operations in order to avoiding going OOM.
*/
const DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT = 10000;
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to 500, after running some throughput tests. I don't see any drastic changes, but the throughput usually has pretty high variance unless I run tests for > 1 hour.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

api: firestore Issues related to the googleapis/nodejs-firestore API. cla: yes This human has signed the Contributor License Agreement.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants