Skip to content

[CELEBORN-2221][CIP-14] Support writing with compression in C++ client#3575

Open
afterincomparableyum wants to merge 5 commits intoapache:mainfrom
afterincomparableyum:cpp-client/celeborn-2221
Open

[CELEBORN-2221][CIP-14] Support writing with compression in C++ client#3575
afterincomparableyum wants to merge 5 commits intoapache:mainfrom
afterincomparableyum:cpp-client/celeborn-2221

Conversation

@afterincomparableyum
Copy link

@afterincomparableyum afterincomparableyum commented Dec 28, 2025

Integrate existing compression infrastructure (LZ4 and ZSTD) into the C++ client write path. This enables compression during pushData operations, matching the functionality available in the Java client.

Changes:

  • Add compression support to ShuffleClientImpl:

    • Add shuffleCompressionEnabled_ flag and compressor_ member
    • Initialize compressor from CelebornConf in constructor
    • Compress data in pushData() when compression is enabled
    • Use compressed size for batchBytesSize tracking
  • Configuration integration:

    • Read compression codec from celeborn.client.shuffle.compression.codec
    • Read ZSTD compression level from celeborn.client.shuffle.compression.zstd.level
    • Default to NONE (compression disabled)
  • Retry/revive support:

    • Retry path correctly uses pre-compressed body buffer
    • No re-compression needed during retries
  • Testing:

    • Add CompressorFactoryTest for factory pattern and config integration
    • Add compression config tests to CelebornConfTest
    • Test offset compression support for both LZ4 and ZSTD

How was this patch tested?

Unit Tests, as well as compiling code

@afterincomparableyum
Copy link
Author

afterincomparableyum commented Dec 28, 2025

@HolyLow this PR is a WIP, I will rebase it off of main after #3568 gets merged.

This is the commit for write compression: 6abae43

Integrate existing compression infrastructure (LZ4 and ZSTD) into the C++ client write path. This enables compression during pushData operations, matching the functionality available in the Java client.

Changes:
- Add compression support to ShuffleClientImpl:
  * Add shuffleCompressionEnabled_ flag and compressor_ member
  * Initialize compressor from CelebornConf in constructor
  * Compress data in pushData() when compression is enabled
  * Use compressed size for batchBytesSize tracking

- Configuration integration:
  * Read compression codec from celeborn.client.shuffle.compression.codec
  * Read ZSTD compression level from celeborn.client.shuffle.compression.zstd.level
  * Default to NONE (compression disabled)

- Retry/revive support:
  * Retry path correctly uses pre-compressed body buffer
  * No re-compression needed during retries

- Testing:
  * Add CompressorFactoryTest for factory pattern and config integration
  * Add compression config tests to CelebornConfTest
  * Test offset compression support for both LZ4 and ZSTD
@afterincomparableyum afterincomparableyum changed the title [WIP][CELEBORN-2221][CIP-14] Add support for write compression in CppClient [CELEBORN-2221][CIP-14] Support writing with compression in C++ client Jan 17, 2026
@afterincomparableyum afterincomparableyum marked this pull request as ready for review January 17, 2026 23:22
@afterincomparableyum
Copy link
Author

@HolyLow @SteNicholas @FMX @RexXiong Could you please help review this PR? Appreciate your help in improving this as needed!

@SteNicholas
Copy link
Member

@afterincomparableyum, please fix the failure of CI which is caused by format.

@afterincomparableyum
Copy link
Author

@SteNicholas I have fixed the clang format issues.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Integrates existing compression (LZ4/ZSTD) into the C++ client shuffle write path so pushData() can send compressed payloads driven by CelebornConf.

Changes:

  • Add optional compression in ShuffleClientImpl::pushData() and initialize a compressor from config.
  • Extend C++ conf tests for shuffle compression codec and ZSTD level settings.
  • Add unit tests for compressor factory/config integration and wire them into the C++ test target.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
cpp/celeborn/conf/tests/CelebornConfTest.cpp Adds default/override assertions for shuffle compression codec and ZSTD level.
cpp/celeborn/client/tests/CompressorFactoryTest.cpp Adds tests for compressor creation from conf and “offset” compression behavior.
cpp/celeborn/client/tests/CMakeLists.txt Registers the new compressor factory test in the test executable.
cpp/celeborn/client/ShuffleClient.h Adds compressor-related members to ShuffleClientImpl.
cpp/celeborn/client/ShuffleClient.cpp Initializes compressor from conf and compresses payload in pushData().

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@SteNicholas
Copy link
Member

@afterincomparableyum, could you take a look at the review comment of Copilot?

@afterincomparableyum
Copy link
Author

@SteNicholas I took a look at the comments of Copilot and made changes as needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants