-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Description
The use of object pooling significantly complicates the unsubscribe behavior and when it is safe to release. This issue is to track research on how we can eliminate pooling to improve or fix correctness while still maintaining good performance (and memory allocation and GC behavior).
The merge and observeOn use cases are good to track performance and already have JMH benchmarks in place.
Here is how to run the tests:
./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorMergePerf.*'
./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorObserveOnPerf.*'
Here are results for merge and observeOn comparing use of pooled SpmcArrayQueue vs SynchronizedQueue (a synchronized LinkedList).
The ** markers indicate where performance degradation happened.
Merge
Benchmark (size) Mode Samples RingBuffer || LinkedList
r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5448592.507 || 5332264.735 5358807.217
r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 54425.611 || 52385.763 52409.485
r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 52.477 || 53.928 54.128
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 92792.488 || 103554.112 104711.192
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.957 || 4.194 4.143
r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4608856.070 || 4294087.613 4688656.691
r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 431920.261 || 451007.165 333391.274**
r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 52309.410 || 49952.664 50057.346
r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5815289.623 || 5558593.305 5575812.189
r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 52.525 || 49.788 49.567
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 77640.706 || 79530.128 79472.088
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 3000.748 || 2358.716** 2397.035**
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 5397752.619 || 5282943.171 5361848.221
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 29257.005 || 34150.793 35196.066
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 35.506 || 25.827** 26.517**
ObserveOn
Benchmark (size) Mode Samples RingBuffer || LinkedList LinkedList
r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 109192.288 || 111037.202 110112.152
r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 6952.955 || 2846.331** 2821.400**
r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 12.267 || 9.988** 10.063**
r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 16430666.170 || 16284869.881 16504292.796
r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 153431.778 || 158892.599 157288.399
r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 150.061 || 159.481 149.546
r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 16899.056 || 16111.396 16532.411
r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 7262.566 || 5742.547** 5504.293**
r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 13.520 || 10.364** 9.954**
The SynchronizedQueue was only ever intended for use by environments without sun.misc.Unsafe (such as Android) so it is worth exploring other alternatives that don't involve a ring buffer (and allocation overhead) but are thread-safe for the single-produce-multi-consumer use cases the RxRingBuffer is used in (and then rename to RxQueue or something like that).