From 22d5880786286728d5cb33cba73bedf462bd02f4 Mon Sep 17 00:00:00 2001 From: Yashwanth Ranjan Singaravel Date: Thu, 9 Apr 2026 13:07:23 -0500 Subject: [PATCH] Added cache size in MB --- go/common/lambdaConfig.go | 1 + go/worker/event/cachedKafkaClient.go | 37 +++++++++++++++++++--------- go/worker/event/kafkaServer.go | 6 ++++- go/worker/event/kafkaServer_test.go | 12 +++++---- 4 files changed, 38 insertions(+), 18 deletions(-) diff --git a/go/common/lambdaConfig.go b/go/common/lambdaConfig.go index 4ce23bd93..fa45ff670 100644 --- a/go/common/lambdaConfig.go +++ b/go/common/lambdaConfig.go @@ -39,6 +39,7 @@ type KafkaTrigger struct { Topics []string `yaml:"topics" json:"topics"` // e.g., ["events", "logs"] GroupId string `yaml:"-" json:"-"` // Auto-generated based on lambda name AutoOffsetReset string `yaml:"auto_offset_reset" json:"auto_offset_reset"` // "earliest" or "latest" + CacheSizeMB int `yaml:"cache_size_mb" json:"cache_size_mb"` // Max cache size in MB (default: 16) } // LambdaConfig defines the overall configuration for the lambda function. diff --git a/go/worker/event/cachedKafkaClient.go b/go/worker/event/cachedKafkaClient.go index 61d72ffc6..b9801a718 100644 --- a/go/worker/event/cachedKafkaClient.go +++ b/go/worker/event/cachedKafkaClient.go @@ -26,28 +26,38 @@ type seekRequest struct { offset int64 } -const defaultCacheSize = 1024 +const defaultCacheSizeMB = 16 // cachedKafkaClient wraps a KafkaClient and caches records in an LRU map keyed // by {topic, partition, offset}. When a seek is active, PollFetches serves // records from the cache. On cache miss, it calls Seek on the underlying // client so the next poll fetches from the right position. type cachedKafkaClient struct { - underlying KafkaClient - cache map[cacheKey]*kgo.Record - evictOrder []cacheKey // front = least recently used - maxSize int - seekTarget *seekState + underlying KafkaClient + cache map[cacheKey]*kgo.Record + evictOrder []cacheKey // front = least recently used + maxSizeBytes int64 + currentSize int64 + seekTarget *seekState } -func newCachedKafkaClient(underlying KafkaClient, maxSize int) *cachedKafkaClient { +func newCachedKafkaClient(underlying KafkaClient, maxSizeMB int) *cachedKafkaClient { return &cachedKafkaClient{ - underlying: underlying, - cache: make(map[cacheKey]*kgo.Record), - maxSize: maxSize, + underlying: underlying, + cache: make(map[cacheKey]*kgo.Record), + maxSizeBytes: int64(maxSizeMB) * 1024 * 1024, } } +// recordSize returns the approximate in-memory size of a cached record in bytes. +func recordSize(r *kgo.Record) int64 { + size := int64(len(r.Key) + len(r.Value) + len(r.Topic)) + for _, h := range r.Headers { + size += int64(len(h.Key) + len(h.Value)) + } + return size +} + // Seek sets the seek target so that subsequent PollFetches calls serve from cache. func (c *cachedKafkaClient) Seek(topic string, partition int32, offset int64) { c.seekTarget = &seekState{topic: topic, partition: partition, offset: offset} @@ -88,19 +98,22 @@ func (c *cachedKafkaClient) Close() { c.underlying.Close() } -// put adds a record to the cache, evicting the LRU entry if at capacity. +// put adds a record to the cache, evicting LRU entries until there is room. func (c *cachedKafkaClient) put(key cacheKey, record *kgo.Record) { if _, exists := c.cache[key]; exists { c.touchLRU(key) return } - if len(c.cache) >= c.maxSize { + newSize := recordSize(record) + for c.currentSize+newSize > c.maxSizeBytes && len(c.evictOrder) > 0 { evictKey := c.evictOrder[0] c.evictOrder = c.evictOrder[1:] + c.currentSize -= recordSize(c.cache[evictKey]) delete(c.cache, evictKey) } c.cache[key] = record c.evictOrder = append(c.evictOrder, key) + c.currentSize += newSize } // touchLRU moves a key to the back of the eviction order (most recently used). diff --git a/go/worker/event/kafkaServer.go b/go/worker/event/kafkaServer.go index 22d9c8c7a..55916903b 100644 --- a/go/worker/event/kafkaServer.go +++ b/go/worker/event/kafkaServer.go @@ -111,7 +111,11 @@ func (km *KafkaManager) newLambdaKafkaConsumer(consumerName string, lambdaName s return nil, fmt.Errorf("failed to create Kafka client for lambda %s: %w", lambdaName, err) } - cached := newCachedKafkaClient(&kgoClientWrapper{client: client}, defaultCacheSize) + cacheSizeMB := defaultCacheSizeMB + if trigger.CacheSizeMB > 0 { + cacheSizeMB = trigger.CacheSizeMB + } + cached := newCachedKafkaClient(&kgoClientWrapper{client: client}, cacheSizeMB) return &LambdaKafkaConsumer{ consumerName: consumerName, lambdaName: lambdaName, diff --git a/go/worker/event/kafkaServer_test.go b/go/worker/event/kafkaServer_test.go index 0b0a414cc..6c01027bf 100644 --- a/go/worker/event/kafkaServer_test.go +++ b/go/worker/event/kafkaServer_test.go @@ -354,7 +354,7 @@ func TestCachedClient_CachesRecords(t *testing.T) { &kgo.Record{Topic: "t", Partition: 0, Offset: 1, Value: []byte("b")}, ) - cached := newCachedKafkaClient(mock, 100) + cached := newCachedKafkaClient(mock, 1) // 1 MB — plenty for tiny records cached.PollFetches(context.Background()) // Both records should now be in the cache @@ -374,7 +374,7 @@ func TestCachedClient_SeekCacheHit(t *testing.T) { &kgo.Record{Topic: "t", Partition: 0, Offset: 12, Value: []byte("twelve")}, ) - cached := newCachedKafkaClient(mock, 100) + cached := newCachedKafkaClient(mock, 1) // 1 MB — plenty for tiny records // Populate the cache cached.PollFetches(context.Background()) @@ -411,7 +411,7 @@ func TestCachedClient_SeekCacheMiss(t *testing.T) { &kgo.Record{Topic: "t", Partition: 0, Offset: 99, Value: []byte("ninety-nine")}, ) - cached := newCachedKafkaClient(mock, 100) + cached := newCachedKafkaClient(mock, 1) // 1 MB — plenty for tiny records // Populate cache with offset 5 cached.PollFetches(context.Background()) @@ -439,14 +439,16 @@ func TestCachedClient_SeekCacheMiss(t *testing.T) { func TestCachedClient_LRUEviction(t *testing.T) { mock := &MockKafkaClient{Drained: make(chan struct{})} + // Each record: topic "t" (1 byte) + value (1 byte) = 2 bytes via recordSize mock.Send( &kgo.Record{Topic: "t", Partition: 0, Offset: 0, Value: []byte("a")}, &kgo.Record{Topic: "t", Partition: 0, Offset: 1, Value: []byte("b")}, &kgo.Record{Topic: "t", Partition: 0, Offset: 2, Value: []byte("c")}, ) - // Cache can only hold 2 records - cached := newCachedKafkaClient(mock, 2) + // Set maxSizeBytes directly to 4 bytes — fits 2 records (2 bytes each) but not 3 + cached := newCachedKafkaClient(mock, 1) + cached.maxSizeBytes = 4 cached.PollFetches(context.Background()) // Offset 0 should have been evicted (LRU), offsets 1 and 2 should remain