Skip to content

[SPARK-48518][CORE] Make LZF compression be able to run in parallel#46858

Closed
yaooqinn wants to merge 3 commits intoapache:masterfrom
yaooqinn:SPARK-48518
Closed

[SPARK-48518][CORE] Make LZF compression be able to run in parallel#46858
yaooqinn wants to merge 3 commits intoapache:masterfrom
yaooqinn:SPARK-48518

Conversation

@yaooqinn
Copy link
Member

@yaooqinn yaooqinn commented Jun 4, 2024

What changes were proposed in this pull request?

This PR introduced a config that turns on LZF compression to parallel mode via using PLZFOutputStream.

FYI, https://github.com/ning/compress?tab=readme-ov-file#parallel-processing

Why are the changes needed?

Improve performance

[info] OpenJDK 64-Bit Server VM 17.0.10+0 on Mac OS X 14.5
[info] Apple M2 Max
[info] Compress large objects:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] -----------------------------------------------------------------------------------------------------------------------------
[info] Compression 1024 array values in 7 threads                12             13           1          0.1       11788.2       1.0X
[info] Compression 1024 array values single-threaded             23             23           0          0.0       22512.7       0.5X

Does this PR introduce any user-facing change?

no

How was this patch tested?

benchmark

Was this patch authored or co-authored using generative AI tooling?

no

@github-actions github-actions bot added the CORE label Jun 4, 2024
AMD EPYC 7763 64-Core Processor
Compress large objects: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
Compression 1024 array values in 1 threads 39 45 5 0.0 38475.4 1.0X
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With GitHub standard action runners, it seems that we only get 1 thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm...

[info] OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Mac OS X 14.5
[info] Apple M2 Max
[info] Compress small objects:                           Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] --------------------------------------------------------------------------------------------------------------------------------
[info] Compression 256000000 int values in parallel                548            550           2        467.0           2.1       1.0X
[info] Compression 256000000 int values single-threaded            522            523           1        490.5           2.0       1.1X
[info] Running benchmark: Compress large objects
[info]   Running case: Compression 1024 array values in 8 threads
[info]   Stopped after 123 iterations, 2009 ms
[info]   Running case: Compression 1024 array values single-threaded
[info]   Stopped after 83 iterations, 2003 ms
[info] OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Mac OS X 14.5
[info] Apple M2 Max
[info] Compress large objects:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] -----------------------------------------------------------------------------------------------------------------------------
[info] Compression 1024 array values in 8 threads                12             16          13          0.1       11546.1       1.0X
[info] Compression 1024 array values single-threaded             23             24           1          0.0       22767.9       0.5X

I ran this benchmark locally, and it seems that the performance of Compression 256000000 int values single-threaded and Compression 256000000 int values in parallel is almost the same

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the rate is limited by the producer

.intConf
.createWithDefault(1)

private[spark] val IO_COMPRESSION_LZF_PARALLEL =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add an explanation of this configuration in configuration.md

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, addressed

@github-actions github-actions bot added the DOCS label Jun 4, 2024
Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@yaooqinn yaooqinn closed this in 90ee299 Jun 4, 2024
@yaooqinn yaooqinn deleted the SPARK-48518 branch June 4, 2024 10:59
@yaooqinn
Copy link
Member Author

yaooqinn commented Jun 4, 2024

Thank you @LuciferYang

Merged to master

override def compressedOutputStream(s: OutputStream): OutputStream = {
new LZFOutputStream(s).setFinishBlockOnFlush(true)
if (parallelCompression) {
new PLZFOutputStream(s)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is creating a threadpool per compressedOutputStream - which can end up being quite expensive (num thread is num processors + some 'interesting' logic which tries to modulate it) .

Did you get a chance to try this on some nontrivial jobs ? Very curious about the experience.
Given this is turned off by default, dont see any concerns with the change itself though ! Would be a good way to understand the impact.

dongjoon-hyun added a commit that referenced this pull request Oct 14, 2025
…` by default

### What changes were proposed in this pull request?

This PR aims to enable `spark.io.compression.lzf.parallel.enabled` by default at Apache Spark 4.1.0.

### Why are the changes needed?

`spark.io.compression.lzf.parallel.enabled` was introduced at Apache Spark 4.0.0 and has been used stably so far. We can enable this by default.
- #46858

### Does this PR introduce _any_ user-facing change?

Yes for `LZF` users. The migration guide is updated.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52603 from dongjoon-hyun/SPARK-53896.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
…` by default

### What changes were proposed in this pull request?

This PR aims to enable `spark.io.compression.lzf.parallel.enabled` by default at Apache Spark 4.1.0.

### Why are the changes needed?

`spark.io.compression.lzf.parallel.enabled` was introduced at Apache Spark 4.0.0 and has been used stably so far. We can enable this by default.
- apache#46858

### Does this PR introduce _any_ user-facing change?

Yes for `LZF` users. The migration guide is updated.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52603 from dongjoon-hyun/SPARK-53896.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants