Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/common/lambdaConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type KafkaTrigger struct {
Topics []string `yaml:"topics" json:"topics"` // e.g., ["events", "logs"]
GroupId string `yaml:"-" json:"-"` // Auto-generated based on lambda name
AutoOffsetReset string `yaml:"auto_offset_reset" json:"auto_offset_reset"` // "earliest" or "latest"
CacheSizeMB int `yaml:"cache_size_mb" json:"cache_size_mb"` // Max cache size in MB (default: 16)
}

// LambdaConfig defines the overall configuration for the lambda function.
Expand Down
37 changes: 25 additions & 12 deletions go/worker/event/cachedKafkaClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,38 @@ type seekRequest struct {
offset int64
}

const defaultCacheSize = 1024
const defaultCacheSizeMB = 16

// cachedKafkaClient wraps a KafkaClient and caches records in an LRU map keyed
// by {topic, partition, offset}. When a seek is active, PollFetches serves
// records from the cache. On cache miss, it calls Seek on the underlying
// client so the next poll fetches from the right position.
type cachedKafkaClient struct {
underlying KafkaClient
cache map[cacheKey]*kgo.Record
evictOrder []cacheKey // front = least recently used
maxSize int
seekTarget *seekState
underlying KafkaClient
cache map[cacheKey]*kgo.Record
evictOrder []cacheKey // front = least recently used
maxSizeBytes int64
currentSize int64
seekTarget *seekState
}

func newCachedKafkaClient(underlying KafkaClient, maxSize int) *cachedKafkaClient {
func newCachedKafkaClient(underlying KafkaClient, maxSizeMB int) *cachedKafkaClient {
return &cachedKafkaClient{
underlying: underlying,
cache: make(map[cacheKey]*kgo.Record),
maxSize: maxSize,
underlying: underlying,
cache: make(map[cacheKey]*kgo.Record),
maxSizeBytes: int64(maxSizeMB) * 1024 * 1024,
}
}

// recordSize returns the approximate in-memory size of a cached record in bytes.
func recordSize(r *kgo.Record) int64 {
size := int64(len(r.Key) + len(r.Value) + len(r.Topic))
for _, h := range r.Headers {
size += int64(len(h.Key) + len(h.Value))
}
return size
}

// Seek sets the seek target so that subsequent PollFetches calls serve from cache.
func (c *cachedKafkaClient) Seek(topic string, partition int32, offset int64) {
c.seekTarget = &seekState{topic: topic, partition: partition, offset: offset}
Expand Down Expand Up @@ -88,19 +98,22 @@ func (c *cachedKafkaClient) Close() {
c.underlying.Close()
}

// put adds a record to the cache, evicting the LRU entry if at capacity.
// put adds a record to the cache, evicting LRU entries until there is room.
func (c *cachedKafkaClient) put(key cacheKey, record *kgo.Record) {
if _, exists := c.cache[key]; exists {
c.touchLRU(key)
return
}
if len(c.cache) >= c.maxSize {
newSize := recordSize(record)
for c.currentSize+newSize > c.maxSizeBytes && len(c.evictOrder) > 0 {
evictKey := c.evictOrder[0]
c.evictOrder = c.evictOrder[1:]
c.currentSize -= recordSize(c.cache[evictKey])
delete(c.cache, evictKey)
}
c.cache[key] = record
c.evictOrder = append(c.evictOrder, key)
c.currentSize += newSize
}

// touchLRU moves a key to the back of the eviction order (most recently used).
Expand Down
6 changes: 5 additions & 1 deletion go/worker/event/kafkaServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ func (km *KafkaManager) newLambdaKafkaConsumer(consumerName string, lambdaName s
return nil, fmt.Errorf("failed to create Kafka client for lambda %s: %w", lambdaName, err)
}

cached := newCachedKafkaClient(&kgoClientWrapper{client: client}, defaultCacheSize)
cacheSizeMB := defaultCacheSizeMB
if trigger.CacheSizeMB > 0 {
cacheSizeMB = trigger.CacheSizeMB
}
cached := newCachedKafkaClient(&kgoClientWrapper{client: client}, cacheSizeMB)
return &LambdaKafkaConsumer{
consumerName: consumerName,
lambdaName: lambdaName,
Expand Down
12 changes: 7 additions & 5 deletions go/worker/event/kafkaServer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
// PollFetches returns the next queued fetch, or empty fetches if the queue is
// drained. When the queue is empty and Drained is set, it closes Drained to
// signal that all prior records have been processed.
func (m *MockKafkaClient) PollFetches(ctx context.Context) kgo.Fetches {

Check failure on line 63 in go/worker/event/kafkaServer_test.go

View workflow job for this annotation

GitHub Actions / continuous-integration

parameter 'ctx' seems to be unused, consider removing or renaming it as _
m.mu.Lock()
defer m.mu.Unlock()
if m.callCount < len(m.queue) {
Expand All @@ -75,7 +75,7 @@
return kgo.Fetches{}
}

func (m *MockKafkaClient) Seek(topic string, partition int32, offset int64) {

Check failure on line 78 in go/worker/event/kafkaServer_test.go

View workflow job for this annotation

GitHub Actions / continuous-integration

parameter 'offset' seems to be unused, consider removing or renaming it as _

Check failure on line 78 in go/worker/event/kafkaServer_test.go

View workflow job for this annotation

GitHub Actions / continuous-integration

parameter 'partition' seems to be unused, consider removing or renaming it as _

Check failure on line 78 in go/worker/event/kafkaServer_test.go

View workflow job for this annotation

GitHub Actions / continuous-integration

parameter 'topic' seems to be unused, consider removing or renaming it as _
// No-op for mock — tests control what PollFetches returns via Send/SendError
}

Expand Down Expand Up @@ -354,7 +354,7 @@
&kgo.Record{Topic: "t", Partition: 0, Offset: 1, Value: []byte("b")},
)

cached := newCachedKafkaClient(mock, 100)
cached := newCachedKafkaClient(mock, 1) // 1 MB — plenty for tiny records
cached.PollFetches(context.Background())

// Both records should now be in the cache
Expand All @@ -374,7 +374,7 @@
&kgo.Record{Topic: "t", Partition: 0, Offset: 12, Value: []byte("twelve")},
)

cached := newCachedKafkaClient(mock, 100)
cached := newCachedKafkaClient(mock, 1) // 1 MB — plenty for tiny records
// Populate the cache
cached.PollFetches(context.Background())

Expand Down Expand Up @@ -411,7 +411,7 @@
&kgo.Record{Topic: "t", Partition: 0, Offset: 99, Value: []byte("ninety-nine")},
)

cached := newCachedKafkaClient(mock, 100)
cached := newCachedKafkaClient(mock, 1) // 1 MB — plenty for tiny records
// Populate cache with offset 5
cached.PollFetches(context.Background())

Expand Down Expand Up @@ -439,14 +439,16 @@

func TestCachedClient_LRUEviction(t *testing.T) {
mock := &MockKafkaClient{Drained: make(chan struct{})}
// Each record: topic "t" (1 byte) + value (1 byte) = 2 bytes via recordSize
mock.Send(
&kgo.Record{Topic: "t", Partition: 0, Offset: 0, Value: []byte("a")},
&kgo.Record{Topic: "t", Partition: 0, Offset: 1, Value: []byte("b")},
&kgo.Record{Topic: "t", Partition: 0, Offset: 2, Value: []byte("c")},
)

// Cache can only hold 2 records
cached := newCachedKafkaClient(mock, 2)
// Set maxSizeBytes directly to 4 bytes — fits 2 records (2 bytes each) but not 3
cached := newCachedKafkaClient(mock, 1)
cached.maxSizeBytes = 4
cached.PollFetches(context.Background())

// Offset 0 should have been evicted (LRU), offsets 1 and 2 should remain
Expand Down
Loading