[SPARK-48518][CORE] Make LZF compression be able to run in parallel#46858
[SPARK-48518][CORE] Make LZF compression be able to run in parallel#46858yaooqinn wants to merge 3 commits intoapache:masterfrom
Conversation
| AMD EPYC 7763 64-Core Processor | ||
| Compress large objects: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ----------------------------------------------------------------------------------------------------------------------------- | ||
| Compression 1024 array values in 1 threads 39 45 5 0.0 38475.4 1.0X |
There was a problem hiding this comment.
With GitHub standard action runners, it seems that we only get 1 thread.
There was a problem hiding this comment.
hmm...
[info] OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Mac OS X 14.5
[info] Apple M2 Max
[info] Compress small objects: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] --------------------------------------------------------------------------------------------------------------------------------
[info] Compression 256000000 int values in parallel 548 550 2 467.0 2.1 1.0X
[info] Compression 256000000 int values single-threaded 522 523 1 490.5 2.0 1.1X
[info] Running benchmark: Compress large objects
[info] Running case: Compression 1024 array values in 8 threads
[info] Stopped after 123 iterations, 2009 ms
[info] Running case: Compression 1024 array values single-threaded
[info] Stopped after 83 iterations, 2003 ms
[info] OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Mac OS X 14.5
[info] Apple M2 Max
[info] Compress large objects: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] -----------------------------------------------------------------------------------------------------------------------------
[info] Compression 1024 array values in 8 threads 12 16 13 0.1 11546.1 1.0X
[info] Compression 1024 array values single-threaded 23 24 1 0.0 22767.9 0.5X
I ran this benchmark locally, and it seems that the performance of Compression 256000000 int values single-threaded and Compression 256000000 int values in parallel is almost the same
There was a problem hiding this comment.
I guess the rate is limited by the producer
| .intConf | ||
| .createWithDefault(1) | ||
|
|
||
| private[spark] val IO_COMPRESSION_LZF_PARALLEL = |
There was a problem hiding this comment.
Should we add an explanation of this configuration in configuration.md
|
Thank you @LuciferYang Merged to master |
| override def compressedOutputStream(s: OutputStream): OutputStream = { | ||
| new LZFOutputStream(s).setFinishBlockOnFlush(true) | ||
| if (parallelCompression) { | ||
| new PLZFOutputStream(s) |
There was a problem hiding this comment.
This is creating a threadpool per compressedOutputStream - which can end up being quite expensive (num thread is num processors + some 'interesting' logic which tries to modulate it) .
Did you get a chance to try this on some nontrivial jobs ? Very curious about the experience.
Given this is turned off by default, dont see any concerns with the change itself though ! Would be a good way to understand the impact.
…` by default ### What changes were proposed in this pull request? This PR aims to enable `spark.io.compression.lzf.parallel.enabled` by default at Apache Spark 4.1.0. ### Why are the changes needed? `spark.io.compression.lzf.parallel.enabled` was introduced at Apache Spark 4.0.0 and has been used stably so far. We can enable this by default. - #46858 ### Does this PR introduce _any_ user-facing change? Yes for `LZF` users. The migration guide is updated. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52603 from dongjoon-hyun/SPARK-53896. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…` by default ### What changes were proposed in this pull request? This PR aims to enable `spark.io.compression.lzf.parallel.enabled` by default at Apache Spark 4.1.0. ### Why are the changes needed? `spark.io.compression.lzf.parallel.enabled` was introduced at Apache Spark 4.0.0 and has been used stably so far. We can enable this by default. - apache#46858 ### Does this PR introduce _any_ user-facing change? Yes for `LZF` users. The migration guide is updated. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52603 from dongjoon-hyun/SPARK-53896. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
This PR introduced a config that turns on LZF compression to parallel mode via using PLZFOutputStream.
FYI, https://github.com/ning/compress?tab=readme-ov-file#parallel-processing
Why are the changes needed?
Improve performance
Does this PR introduce any user-facing change?
no
How was this patch tested?
benchmark
Was this patch authored or co-authored using generative AI tooling?
no