-
Notifications
You must be signed in to change notification settings - Fork 14
Description
Status Quo
When producing messages basically all the latency comes back to the sequential s3 uploads. Currently .kfs is uploaded to s3 first and only afterwards .index is uploaded. Total processing time is roughly segment_latency + index_latency.
Proposed solution
We can parallelize the s3 uploads to reduce the e2e latency to max(segment_latency, index_latency). This is effectively the same as segment_latency. I.e. we can eliminate a whole S3 round-trip in latency for the .index. For small segments / high throughput s3 connection where the s3 round trip time dominates the actual time to upload I'd estimate that this would shave off 30-45% in e2e produce latency.
Risks
Prior to this we only had to consider what happens if .kfs is uploaded but .index is missing because .kfs was uploaded first.
Now we also have to consider what happens if .index is uploaded but .kfs is missing if we upload in parallel.
The code paths filter out incomplete pairs where either .index or .kfs is missing. RestoreFromS3 fails if the .kfs file exists and .index is missing but that is an easily fixable, preexisting behavior. Parallel uploads introduce the chance that .index upload succeeds while .kfs upload fails which doesn't matter for RestoreFromS3.
Analysis of the code paths reading .kfs and .index files:
| Reader | Reads .kfs? | Reads .index? | Handles missing .index? | Handles missing .kfs? | Orphan risk? |
|---|---|---|---|---|---|
| Broker: Read() pkg/storage/log.go:315 |
Yes, via DownloadSegment (lines 343, 354) | No — uses in-memory l.indexEntries populated at flush time (line 324) | Yes: segmentRangeForOffset() returns false (line 455), falls through to full segment download (lines 352-364). Slower but correct. | N/A — only reads segments in l.segments list | None — failed flush never adds to l.segments |
| Broker: RestoreFromS3() pkg/storage/log.go:89 |
Yes: lists .kfs files (lines 91-103), downloads footer (line 115) | Yes: downloads .index for each .kfs (line 139) | No: returns hard error if DownloadIndex fails (lines 143-144). Partition fails to initialize. | N/A — discovery starts from .kfs listing, so orphaned .index is invisible | Pre-existing bug: orphaned .kfs without .index blocks partition init on restart |
| Broker: startPrefetch() pkg/storage/log.go:395 |
Yes: downloads full segment for cache (line 411) | No: only downloads .kfs | N/A | N/A — only prefetches segments in l.segments | None — same protection as Read() |
| Iceberg Processor: discovery iceberg-processor/internal/discovery/discovery.go:160 |
Yes: pairs by base offset | Yes: pairs by base offset | Yes: skips if entry.indexKey == "" (line 162) | Yes: skips if entry.kfsKey == "" (line 162) | None |
| Iceberg Processor: decoder iceberg-processor/internal/decoder/decoder.go:100 |
Yes: getObject (line 109) | Yes: getObject (line 101) | Returns error — processor skips segment, keeps running | Returns error — processor skips segment, keeps running | None — only called after discovery filters out incomplete pairs |
| SQL Processor: discovery sql-processor/internal/discovery/discovery.go:153 |
Yes: pairs by base offset | Yes: pairs by base offset | Yes: skips if entry.indexKey == "" (line 155) | Yes: skips if entry.kfsKey == "" (line 155) | None |
| SQL Processor: decoder sql-processor/internal/decoder/decoder.go:119 |
Yes: getObject (line 129) | Yes: getObject (line 120) | Returns error — processor skips segment, keeps running | Returns error — processor skips segment, keeps running | None — same as Iceberg: discovery filters first |
| SQL Processor: time index builder sql-processor/internal/discovery/time_index_builder.go:105 |
Indirectly (calls Decode() at line 156) | Indirectly (calls Decode() at line 156) | Skips if SegmentKey == "" or IndexKey == "" (line 109), plus Decode() errors swallowed (lines 157-158) | Same skip logic (line 109) | None — double-protected |
| SQL Processor: time index reader sql-processor/internal/discovery/time_index.go:58 |
No — reads .kfst sidecars | No — reads .kfst sidecars | N/A — different file type | N/A — different file type | None |
Next Steps
Please double check whether I overlooked something for my first contribution. If my analysis is correct I'd like to implement this. Should just be a few lines of code.