Skip to content

parallelize S3 uploads for .index and .kfs to shave off s3 round trip latency #109

@klaudworks

Description

@klaudworks

Status Quo

When producing messages basically all the latency comes back to the sequential s3 uploads. Currently .kfs is uploaded to s3 first and only afterwards .index is uploaded. Total processing time is roughly segment_latency + index_latency.

Proposed solution

We can parallelize the s3 uploads to reduce the e2e latency to max(segment_latency, index_latency). This is effectively the same as segment_latency. I.e. we can eliminate a whole S3 round-trip in latency for the .index. For small segments / high throughput s3 connection where the s3 round trip time dominates the actual time to upload I'd estimate that this would shave off 30-45% in e2e produce latency.

Risks

Prior to this we only had to consider what happens if .kfs is uploaded but .index is missing because .kfs was uploaded first.
Now we also have to consider what happens if .index is uploaded but .kfs is missing if we upload in parallel.

The code paths filter out incomplete pairs where either .index or .kfs is missing. RestoreFromS3 fails if the .kfs file exists and .index is missing but that is an easily fixable, preexisting behavior. Parallel uploads introduce the chance that .index upload succeeds while .kfs upload fails which doesn't matter for RestoreFromS3.

Analysis of the code paths reading .kfs and .index files:

Reader Reads .kfs? Reads .index? Handles missing .index? Handles missing .kfs? Orphan risk?
Broker: Read()
pkg/storage/log.go:315
Yes, via DownloadSegment (lines 343, 354) No — uses in-memory l.indexEntries populated at flush time (line 324) Yes: segmentRangeForOffset() returns false (line 455), falls through to full segment download (lines 352-364). Slower but correct. N/A — only reads segments in l.segments list None — failed flush never adds to l.segments
Broker: RestoreFromS3()
pkg/storage/log.go:89
Yes: lists .kfs files (lines 91-103), downloads footer (line 115) Yes: downloads .index for each .kfs (line 139) No: returns hard error if DownloadIndex fails (lines 143-144). Partition fails to initialize. N/A — discovery starts from .kfs listing, so orphaned .index is invisible Pre-existing bug: orphaned .kfs without .index blocks partition init on restart
Broker: startPrefetch()
pkg/storage/log.go:395
Yes: downloads full segment for cache (line 411) No: only downloads .kfs N/A N/A — only prefetches segments in l.segments None — same protection as Read()
Iceberg Processor: discovery
iceberg-processor/internal/discovery/discovery.go:160
Yes: pairs by base offset Yes: pairs by base offset Yes: skips if entry.indexKey == "" (line 162) Yes: skips if entry.kfsKey == "" (line 162) None
Iceberg Processor: decoder
iceberg-processor/internal/decoder/decoder.go:100
Yes: getObject (line 109) Yes: getObject (line 101) Returns error — processor skips segment, keeps running Returns error — processor skips segment, keeps running None — only called after discovery filters out incomplete pairs
SQL Processor: discovery
sql-processor/internal/discovery/discovery.go:153
Yes: pairs by base offset Yes: pairs by base offset Yes: skips if entry.indexKey == "" (line 155) Yes: skips if entry.kfsKey == "" (line 155) None
SQL Processor: decoder
sql-processor/internal/decoder/decoder.go:119
Yes: getObject (line 129) Yes: getObject (line 120) Returns error — processor skips segment, keeps running Returns error — processor skips segment, keeps running None — same as Iceberg: discovery filters first
SQL Processor: time index builder
sql-processor/internal/discovery/time_index_builder.go:105
Indirectly (calls Decode() at line 156) Indirectly (calls Decode() at line 156) Skips if SegmentKey == "" or IndexKey == "" (line 109), plus Decode() errors swallowed (lines 157-158) Same skip logic (line 109) None — double-protected
SQL Processor: time index reader
sql-processor/internal/discovery/time_index.go:58
No — reads .kfst sidecars No — reads .kfst sidecars N/A — different file type N/A — different file type None

Next Steps

Please double check whether I overlooked something for my first contribution. If my analysis is correct I'd like to implement this. Should just be a few lines of code.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions