diff --git a/go/common/config.go b/go/common/config.go index de83d93c1..fe3b4caf6 100644 --- a/go/common/config.go +++ b/go/common/config.go @@ -62,6 +62,11 @@ type Config struct { // pass through to sandbox envirenment variable Sandbox_config any `json:"sandbox_config"` + // KAFKA CACHE OPTIONS + Kafka_cache_size_mb int `json:"kafka_cache_size_mb"` // max message cache size in MB (default: 256) + Kafka_max_concurrent_fetches int `json:"kafka_max_concurrent_fetches"` // max simultaneous Kafka consumers (default: 10) + Kafka_prefetch_count int `json:"kafka_prefetch_count"` // messages to prefetch on cache miss (default: 5) + Docker DockerConfig `json:"docker"` Limits LimitsConfig `json:"limits"` InstallerLimits LimitsConfig `json:"installer_limits"` // limits profile for installers @@ -291,9 +296,12 @@ func getDefaultConfigForPatching(olPath string) (*Config, error) { Pkgs_dir: packagesDir, Sandbox_config: map[string]any{}, SOCK_base_path: baseImgDir, - Registry_cache_ms: 5000, // 5 seconds - Mem_pool_mb: memPoolMb, - Import_cache_tree: zygoteTreePath, + Registry_cache_ms: 5000, // 5 seconds + Mem_pool_mb: memPoolMb, + Import_cache_tree: zygoteTreePath, + Kafka_cache_size_mb: 256, + Kafka_max_concurrent_fetches: 10, + Kafka_prefetch_count: 5, Docker: DockerConfig{ Base_image: "ol-min", }, diff --git a/go/worker/event/kafkaFetcher.go b/go/worker/event/kafkaFetcher.go new file mode 100644 index 000000000..0402497b4 --- /dev/null +++ b/go/worker/event/kafkaFetcher.go @@ -0,0 +1,145 @@ +package event + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/twmb/franz-go/pkg/kgo" +) + +// KafkaFetcher retrieves Kafka messages, using a shared MessageCache as a +// read-through layer. On cache hit the message is returned immediately. +// On miss it prefetches multiple messages from Kafka, caches them all, +// and returns the requested one. +type KafkaFetcher struct { + cache *MessageCache + sem chan struct{} + prefetchCount int +} + +// NewKafkaFetcher creates a KafkaFetcher backed by the given cache. +func NewKafkaFetcher(cache *MessageCache, maxConcurrent int, prefetchCount int) *KafkaFetcher { + if maxConcurrent <= 0 { + maxConcurrent = 10 + } + if prefetchCount <= 0 { + prefetchCount = 5 + } + return &KafkaFetcher{ + cache: cache, + sem: make(chan struct{}, maxConcurrent), + prefetchCount: prefetchCount, + } +} + +// Get returns the message at the given topic/partition/offset. +// It checks the cache first; on miss it fetches prefetchCount messages +// starting at offset, caches them all, and returns the requested one. +// Returns nil if no message is available. +func (kf *KafkaFetcher) Get(ctx context.Context, brokers []string, topic string, partition int32, offset int64) (*CachedMessage, error) { + key := CacheKey{Topic: topic, Partition: partition, Offset: offset} + + if msg, hit := kf.cache.Get(key); hit { + return msg, nil + } + + // Cache miss — prefetch from Kafka + records, err := kf.fetchFromKafka(ctx, brokers, topic, partition, offset, kf.prefetchCount) + if err != nil { + return nil, err + } + + // Cache all fetched records + var result *CachedMessage + for _, r := range records { + headers := make(map[string]string) + for _, h := range r.Headers { + headers[h.Key] = string(h.Value) + } + msg := &CachedMessage{ + Key: r.Key, + Value: r.Value, + Headers: headers, + Timestamp: r.Timestamp, + size: int64(len(r.Key) + len(r.Value) + 64), + } + kf.cache.Put(CacheKey{ + Topic: r.Topic, + Partition: r.Partition, + Offset: r.Offset, + }, msg) + + if r.Offset == offset { + result = msg + } + } + + return result, nil +} + +// fetchFromKafka creates a short-lived consumer, reads up to count records +// starting at offset, and closes the consumer. Blocks if the semaphore is full. +func (kf *KafkaFetcher) fetchFromKafka(ctx context.Context, brokers []string, topic string, partition int32, offset int64, count int) ([]*kgo.Record, error) { + select { + case kf.sem <- struct{}{}: + case <-ctx.Done(): + return nil, ctx.Err() + } + defer func() { <-kf.sem }() + + client, err := kgo.NewClient( + kgo.SeedBrokers(brokers...), + kgo.ConsumePartitions(map[string]map[int32]kgo.Offset{ + topic: { + partition: kgo.NewOffset().At(offset), + }, + }), + ) + if err != nil { + return nil, fmt.Errorf("failed to create consumer for %s partition %d: %w", topic, partition, err) + } + defer client.Close() + + var records []*kgo.Record + deadline := time.After(10 * time.Second) + + for len(records) < count { + pollCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + fetches := client.PollFetches(pollCtx) + cancel() + + if errs := fetches.Errors(); len(errs) > 0 { + for _, fe := range errs { + if fe.Err == context.DeadlineExceeded { + continue + } + slog.Warn("KafkaFetcher fetch error", + "topic", topic, + "partition", partition, + "error", fe.Err) + } + } + + fetches.EachRecord(func(r *kgo.Record) { + if r.Partition == partition && r.Offset >= offset { + records = append(records, r) + } + }) + + if len(records) >= count { + break + } + + select { + case <-deadline: + return records, nil + case <-ctx.Done(): + return records, ctx.Err() + default: + } + } + + return records, nil +} diff --git a/go/worker/event/kafkaFetcher_test.go b/go/worker/event/kafkaFetcher_test.go new file mode 100644 index 000000000..7c6198032 --- /dev/null +++ b/go/worker/event/kafkaFetcher_test.go @@ -0,0 +1,90 @@ +package event + +import ( + "context" + "testing" + "time" +) + +func newTestFetcher(maxConcurrent int) *KafkaFetcher { + cache := NewMessageCache(1024 * 1024) + return NewKafkaFetcher(cache, maxConcurrent, 5) +} + +func TestNewKafkaFetcher_DefaultConcurrency(t *testing.T) { + kf := newTestFetcher(0) + if cap(kf.sem) != 10 { + t.Fatalf("expected default capacity 10, got %d", cap(kf.sem)) + } +} + +func TestNewKafkaFetcher_CustomConcurrency(t *testing.T) { + kf := newTestFetcher(5) + if cap(kf.sem) != 5 { + t.Fatalf("expected capacity 5, got %d", cap(kf.sem)) + } +} + +func TestKafkaFetcher_CacheHit(t *testing.T) { + cache := NewMessageCache(1024 * 1024) + kf := NewKafkaFetcher(cache, 1, 5) + + // Pre-populate the cache + key := CacheKey{Topic: "t", Partition: 0, Offset: 42} + cache.Put(key, &CachedMessage{ + Value: []byte("cached-value"), + size: 100, + }) + + // Get should return the cached message without hitting Kafka + msg, err := kf.Get(context.Background(), []string{"localhost:9092"}, "t", 0, 42) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if msg == nil { + t.Fatal("expected cache hit, got nil") + } + if string(msg.Value) != "cached-value" { + t.Fatalf("expected 'cached-value', got %q", string(msg.Value)) + } +} + +func TestKafkaFetcher_SemaphoreBlocksAtCapacity(t *testing.T) { + kf := newTestFetcher(1) + + // Fill the semaphore slot + kf.sem <- struct{}{} + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + // Get should block on semaphore and then fail with context deadline + _, err := kf.Get(ctx, []string{"localhost:9092"}, "test-topic", 0, 0) + if err == nil { + t.Fatal("expected error when semaphore is full and context expires") + } + if err != context.DeadlineExceeded { + t.Fatalf("expected DeadlineExceeded, got %v", err) + } + + // Release the slot + <-kf.sem +} + +func TestKafkaFetcher_SemaphoreReleasedAfterFetch(t *testing.T) { + kf := newTestFetcher(1) + + // Get will fail (no real broker) but should still release the semaphore slot + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + kf.Get(ctx, []string{"localhost:19092"}, "nonexistent", 0, 0) + + // Verify the semaphore slot was released by acquiring it without blocking + select { + case kf.sem <- struct{}{}: + <-kf.sem + default: + t.Fatal("semaphore slot was not released after Get") + } +} diff --git a/go/worker/event/kafkaServer.go b/go/worker/event/kafkaServer.go index 793c421ad..f3308bf7e 100644 --- a/go/worker/event/kafkaServer.go +++ b/go/worker/event/kafkaServer.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "log/slog" "net/http" @@ -16,290 +15,254 @@ import ( "github.com/open-lambda/open-lambda/go/boss/lambdastore" "github.com/open-lambda/open-lambda/go/common" "github.com/open-lambda/open-lambda/go/worker/lambda" - "github.com/twmb/franz-go/pkg/kgo" ) -type KafkaClient interface { - PollFetches(context.Context) kgo.Fetches - Close() -} - -// LambdaKafkaConsumer manages Kafka consumption for a specific lambda function -type LambdaKafkaConsumer struct { - consumerName string // Unique name for this consumer - lambdaName string // lambda function name - kafkaTrigger *common.KafkaTrigger - client KafkaClient // kgo.client implements the KafkaClient interface - lambdaManager *lambda.LambdaMgr // Reference to lambda manager for direct calls - stopChan chan struct{} // Shutdown signal for this consumer - // When this channel is closed, the goroutine for the consumer exits -} - -// KafkaManager manages multiple lambda-specific Kafka consumers +// KafkaManager manages Kafka consumption for all lambdas on a worker. +// It maintains a shared message cache and a KafkaFetcher. When triggers are +// registered for a lambda, a background loop automatically consumes messages +// and invokes the lambda. type KafkaManager struct { - lambdaConsumers map[string]*LambdaKafkaConsumer // lambdaName -> consumer - lambdaManager *lambda.LambdaMgr // Reference to lambda manager - mu sync.Mutex // Protects lambdaConsumers map + triggerConfigs map[string][]common.KafkaTrigger // lambdaName → trigger configs + lambdaManager *lambda.LambdaMgr + fetcher *KafkaFetcher + offsets map[string]map[string]map[int32]int64 // groupId → topic → partition → next offset + stopChans map[string]chan struct{} // lambdaName → stop signal for consumption loop + mu sync.Mutex } -// newLambdaKafkaConsumer creates a new Kafka consumer for a specific lambda function -func (km *KafkaManager) newLambdaKafkaConsumer(consumerName string, lambdaName string, trigger *common.KafkaTrigger) (*LambdaKafkaConsumer, error) { - // Validate that we have brokers and topics - if len(trigger.BootstrapServers) == 0 { - return nil, fmt.Errorf("no bootstrap servers configured for lambda %s", lambdaName) +// NewKafkaManager creates a KafkaManager with a shared message cache and fetcher. +func NewKafkaManager(lambdaManager *lambda.LambdaMgr) (*KafkaManager, error) { + cacheSizeMb := common.Conf.Kafka_cache_size_mb + if cacheSizeMb <= 0 { + cacheSizeMb = 256 } - if len(trigger.Topics) == 0 { - return nil, fmt.Errorf("no topics configured for lambda %s", lambdaName) + maxConcurrent := common.Conf.Kafka_max_concurrent_fetches + if maxConcurrent <= 0 { + maxConcurrent = 10 } + prefetchCount := common.Conf.Kafka_prefetch_count + if prefetchCount <= 0 { + prefetchCount = 5 + } + + cache := NewMessageCache(int64(cacheSizeMb) * 1024 * 1024) - // Setup kgo client options - opts := []kgo.Opt{ - kgo.SeedBrokers(trigger.BootstrapServers...), - kgo.ConsumerGroup(trigger.GroupId), - kgo.ConsumeTopics(trigger.Topics...), - kgo.SessionTimeout(10 * time.Second), - kgo.HeartbeatInterval(3 * time.Second), + manager := &KafkaManager{ + triggerConfigs: make(map[string][]common.KafkaTrigger), + lambdaManager: lambdaManager, + fetcher: NewKafkaFetcher(cache, maxConcurrent, prefetchCount), + offsets: make(map[string]map[string]map[int32]int64), + stopChans: make(map[string]chan struct{}), } - // Use trigger-specific offset reset or default to latest - if trigger.AutoOffsetReset == "earliest" { - opts = append(opts, kgo.ConsumeResetOffset(kgo.NewOffset().AtStart())) - } else { - opts = append(opts, kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd())) + slog.Info("Kafka manager initialized", + "cache_size_mb", cacheSizeMb, + "max_concurrent_fetches", maxConcurrent, + "prefetch_count", prefetchCount) + return manager, nil +} + +// RegisterLambdaKafkaTriggers stores Kafka trigger configs for a lambda and +// starts a background consumption loop that automatically fetches messages +// and invokes the lambda. +func (km *KafkaManager) RegisterLambdaKafkaTriggers(lambdaName string, triggers []common.KafkaTrigger) error { + km.mu.Lock() + defer km.mu.Unlock() + + if len(triggers) == 0 { + return nil + } + + // Stop any existing consumption loop for this lambda + if stopChan, ok := km.stopChans[lambdaName]; ok { + close(stopChan) + delete(km.stopChans, lambdaName) } - // Create kgo client - client, err := kgo.NewClient(opts...) - if err != nil { - return nil, fmt.Errorf("failed to create Kafka client for lambda %s: %w", lambdaName, err) + // Store trigger configs with auto-generated group IDs + stored := make([]common.KafkaTrigger, len(triggers)) + for i, trigger := range triggers { + trigger.GroupId = fmt.Sprintf("lambda-%s", lambdaName) + stored[i] = trigger } + km.triggerConfigs[lambdaName] = stored + + // Start background consumption loop + stopChan := make(chan struct{}) + km.stopChans[lambdaName] = stopChan + go km.consumeLoop(lambdaName, stopChan) + + slog.Info("Started Kafka consumption for lambda", + "lambda", lambdaName, + "trigger_count", len(triggers)) - return &LambdaKafkaConsumer{ - consumerName: consumerName, - lambdaName: lambdaName, - kafkaTrigger: trigger, - client: client, - lambdaManager: km.lambdaManager, - stopChan: make(chan struct{}), - }, nil + return nil } -// NewKafkaManager creates and configures a new Kafka manager -func NewKafkaManager(lambdaManager *lambda.LambdaMgr) (*KafkaManager, error) { - manager := &KafkaManager{ - lambdaConsumers: make(map[string]*LambdaKafkaConsumer), - lambdaManager: lambdaManager, +// getOffset returns the next offset to consume for a given group/topic/partition. +// Returns 0 if no offset has been tracked yet. +func (km *KafkaManager) getOffset(groupId, topic string, partition int32) int64 { + if gm, ok := km.offsets[groupId]; ok { + if tm, ok := gm[topic]; ok { + if off, ok := tm[partition]; ok { + return off + } + } } - - slog.Info("Kafka manager initialized") - return manager, nil + return 0 } -// StartConsuming starts consuming messages for this lambda's Kafka triggers -func (lkc *LambdaKafkaConsumer) StartConsuming() { - slog.Info("Starting Kafka consumer for lambda", - "consumer", lkc.consumerName, - "lambda", lkc.lambdaName, - "topics", lkc.kafkaTrigger.Topics, - "brokers", lkc.kafkaTrigger.BootstrapServers, - "group_id", lkc.kafkaTrigger.GroupId) - - // Start consuming loop - go lkc.consumeLoop() +// setOffset stores the next offset to consume for a given group/topic/partition. +func (km *KafkaManager) setOffset(groupId, topic string, partition int32, offset int64) { + if _, ok := km.offsets[groupId]; !ok { + km.offsets[groupId] = make(map[string]map[int32]int64) + } + if _, ok := km.offsets[groupId][topic]; !ok { + km.offsets[groupId][topic] = make(map[int32]int64) + } + km.offsets[groupId][topic][partition] = offset } -// consumeLoop handles Kafka message consumption using kgo polling -func (lkc *LambdaKafkaConsumer) consumeLoop() { +// consumeLoop continuously consumes messages for a lambda until stopped. +// On each iteration it tries the cache, falls back to Kafka on miss, +// and invokes the lambda. Backs off when no messages are available. +func (km *KafkaManager) consumeLoop(lambdaName string, stopChan chan struct{}) { for { select { - case <-lkc.stopChan: - slog.Info("Stopping Kafka consumer for lambda", "lambda", lkc.lambdaName) + case <-stopChan: + slog.Info("Stopping consumption loop", "lambda", lambdaName) return default: - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - fetches := lkc.client.PollFetches(ctx) - cancel() - - if errs := fetches.Errors(); len(errs) > 0 { - for _, err := range errs { - // The franz-go package uses context.DeadlineExceeded to signal that the - // timeout has expired with no new messages. This is expected - // behaviour during idle periods and should not be logged. - if errors.Is(err.Err, context.DeadlineExceeded) { - continue - } - - // TODO: Surface Kafka consumer errors to lambda developers by invoking an error - // handler lambda function. Could allow lambdas to specify an onError callback in - // ol.yaml that gets invoked with error details. - slog.Warn("Kafka fetch error", - "lambda", lkc.lambdaName, - "error", err) - } - continue + } + + consumed, err := km.consumeNext(lambdaName) + if err != nil { + slog.Error("Consumption error", "lambda", lambdaName, "error", err) + select { + case <-stopChan: + return + case <-time.After(100 * time.Millisecond): } + continue + } - // Process each record - fetches.EachRecord(func(record *kgo.Record) { - slog.Info("Received Kafka message for lambda", - "consumer", lkc.consumerName, - "lambda", lkc.lambdaName, - "topic", record.Topic, - "partition", record.Partition, - "offset", record.Offset, - "size", len(record.Value)) - lkc.processMessage(record) - }) + if !consumed { + select { + case <-stopChan: + return + case <-time.After(100 * time.Millisecond): + } } } } -// processMessage handles a single Kafka message by invoking the lambda function directly -func (lkc *LambdaKafkaConsumer) processMessage(record *kgo.Record) { - t := common.T0("kafka-message-processing") +// consumeNext tries to consume a single message for the lambda. It checks the +// cache first, fetches from Kafka on miss, caches the result, and invokes the +// lambda. Returns true if a message was consumed. +func (km *KafkaManager) consumeNext(lambdaName string) (bool, error) { + t := common.T0("kafka-consume-next") defer t.T1() - // Create synthetic HTTP request from Kafka message - // Path must be /run// for the Python runtime to parse correctly - requestPath := fmt.Sprintf("/run/%s/", lkc.lambdaName) - req, err := http.NewRequest("POST", requestPath, bytes.NewReader(record.Value)) - if err != nil { - slog.Error("Failed to create request for lambda invocation", - "lambda", lkc.lambdaName, - "error", err, - "topic", record.Topic) - return - } - // RequestURI must be set explicitly for synthetic requests (http.NewRequest doesn't set it) - req.RequestURI = requestPath - - // Set headers with Kafka metadata (The X- prefix indicates a custom non-standard header) - req.Header.Set("Content-Type", "application/json") - req.Header.Set("X-Kafka-Topic", record.Topic) - req.Header.Set("X-Kafka-Partition", fmt.Sprintf("%d", record.Partition)) - req.Header.Set("X-Kafka-Offset", fmt.Sprintf("%d", record.Offset)) - req.Header.Set("X-Kafka-Group-Id", lkc.kafkaTrigger.GroupId) - - // Create response recorder to capture lambda output. - // TODO: Capture and log the lambda response body using httptest's response recorder - // for kafka triggered lambda invocations. - w := httptest.NewRecorder() - - // Get lambda function and invoke directly - lambdaFunc := lkc.lambdaManager.Get(lkc.lambdaName) - lambdaFunc.Invoke(w, req) - - // Log the result - slog.Info("Kafka message processed via direct invocation", - "consumer", lkc.consumerName, - "lambda", lkc.lambdaName, - "topic", record.Topic, - "partition", record.Partition, - "offset", record.Offset, - "status", w.Code) -} - -// cleanup closes the kgo client -func (lkc *LambdaKafkaConsumer) cleanup() { - slog.Info("Shutting down Kafka consumer for lambda", "lambda", lkc.lambdaName) + km.mu.Lock() + triggers := km.triggerConfigs[lambdaName] + km.mu.Unlock() - // Signal all goroutines to stop - close(lkc.stopChan) + for _, trigger := range triggers { + groupId := trigger.GroupId - // Close kgo client - if lkc.client != nil { - lkc.client.Close() - } + for _, topic := range trigger.Topics { + // TODO: support multi-partition consumption by discovering partition count. + partition := int32(0) - slog.Info("Kafka consumer shutdown complete for lambda", "lambda", lkc.lambdaName) -} + km.mu.Lock() + offset := km.getOffset(groupId, topic, partition) + km.mu.Unlock() -// RegisterLambdaKafkaTriggers registers Kafka triggers for a lambda function -func (km *KafkaManager) RegisterLambdaKafkaTriggers(lambdaName string, triggers []common.KafkaTrigger) error { - km.mu.Lock() - defer km.mu.Unlock() + fetchCtx, fetchCancel := context.WithTimeout(context.Background(), 15*time.Second) + msg, err := km.fetcher.Get(fetchCtx, trigger.BootstrapServers, topic, partition, offset) + fetchCancel() + if err != nil { + slog.Error("Failed to get message", + "lambda", lambdaName, + "topic", topic, + "error", err) + continue + } + if msg == nil { + continue + } - if len(triggers) == 0 { - return nil // No Kafka triggers for this lambda - } + // Invoke the lambda + requestPath := fmt.Sprintf("/run/%s/", lambdaName) + req, err := http.NewRequest("POST", requestPath, bytes.NewReader(msg.Value)) + if err != nil { + return false, fmt.Errorf("failed to create request: %w", err) + } + req.RequestURI = requestPath - // If lambda already has consumers, clean them up first - for consumerName, consumer := range km.lambdaConsumers { - if strings.HasPrefix(consumerName, lambdaName+"-") { - consumer.cleanup() - delete(km.lambdaConsumers, consumerName) - slog.Info("Cleaned up existing Kafka consumer for lambda", "lambda", lambdaName, "consumer", consumerName) - } - } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Kafka-Topic", topic) + req.Header.Set("X-Kafka-Partition", fmt.Sprintf("%d", partition)) + req.Header.Set("X-Kafka-Offset", fmt.Sprintf("%d", offset)) + req.Header.Set("X-Kafka-Group-Id", groupId) - // Create consumers for each Kafka trigger - for i, trigger := range triggers { - trigger.GroupId = fmt.Sprintf("lambda-%s", lambdaName) - consumerName := fmt.Sprintf("%s-%d", lambdaName, i) - consumer, err := km.newLambdaKafkaConsumer(consumerName, lambdaName, &trigger) - if err != nil { - slog.Error("Failed to create Kafka consumer for lambda", - "lambda", lambdaName, - "trigger_index", i, - "error", err) - continue - } + w := httptest.NewRecorder() + lambdaFunc := km.lambdaManager.Get(lambdaName) + lambdaFunc.Invoke(w, req) - km.lambdaConsumers[consumerName] = consumer + km.mu.Lock() + km.setOffset(groupId, topic, partition, offset+1) + km.mu.Unlock() - // Start consuming in background - go func(c *LambdaKafkaConsumer) { - c.StartConsuming() - }(consumer) + slog.Info("Kafka message consumed and lambda invoked", + "lambda", lambdaName, + "topic", topic, + "partition", partition, + "offset", offset, + "status", w.Code) - slog.Info("Registered Kafka trigger for lambda", - "lambda", lambdaName, - "topics", trigger.Topics, - "brokers", trigger.BootstrapServers, - "group_id", trigger.GroupId) + return true, nil + } } - return nil + return false, nil } -// UnregisterLambdaKafkaTriggers removes Kafka triggers for a lambda function +// UnregisterLambdaKafkaTriggers stops consumption and removes Kafka triggers for a lambda. func (km *KafkaManager) UnregisterLambdaKafkaTriggers(lambdaName string) { km.mu.Lock() defer km.mu.Unlock() - // Find and cleanup all consumers for this lambda - for consumerName, consumer := range km.lambdaConsumers { - if strings.HasPrefix(consumerName, lambdaName+"-") { - consumer.cleanup() - delete(km.lambdaConsumers, consumerName) - slog.Info("Unregistered Kafka consumer for lambda", "lambda", lambdaName) - } + if stopChan, ok := km.stopChans[lambdaName]; ok { + close(stopChan) + delete(km.stopChans, lambdaName) } + + delete(km.triggerConfigs, lambdaName) + + slog.Info("Unregistered Kafka triggers for lambda", "lambda", lambdaName) } -// cleanup closes all lambda consumers +// cleanup shuts down all consumption loops. func (km *KafkaManager) cleanup() { slog.Info("Shutting down Kafka manager") km.mu.Lock() defer km.mu.Unlock() - // Close all lambda consumers - for lambdaName, consumer := range km.lambdaConsumers { - consumer.cleanup() - slog.Info("Cleaned up Kafka consumer for lambda", "lambda", lambdaName) + for lambdaName, stopChan := range km.stopChans { + close(stopChan) + slog.Info("Stopped consumption loop", "lambda", lambdaName) } - - // Clear the map - km.lambdaConsumers = make(map[string]*LambdaKafkaConsumer) + km.stopChans = make(map[string]chan struct{}) + km.triggerConfigs = make(map[string][]common.KafkaTrigger) slog.Info("Kafka manager shutdown complete") } -// HandleKafkaRegister handles Kafka consumer registration/unregistration for lambdas +// HandleKafkaRegister handles Kafka consumer registration/unregistration for lambdas. func HandleKafkaRegister(kafkaManager *KafkaManager, lambdaStore *lambdastore.LambdaStore) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - // Extract lambda name lambdaName := strings.TrimPrefix(r.URL.Path, "/kafka/register/") if lambdaName == "" { http.Error(w, "lambda name required", http.StatusBadRequest) @@ -314,7 +277,6 @@ func HandleKafkaRegister(kafkaManager *KafkaManager, lambdaStore *lambdastore.La switch r.Method { case "POST": - // Read lambda config from registry config, err := lambdaStore.GetConfig(lambdaName) if err != nil { slog.Error("Failed to load lambda config for Kafka registration", @@ -324,13 +286,11 @@ func HandleKafkaRegister(kafkaManager *KafkaManager, lambdaStore *lambdastore.La return } - // Check if lambda has Kafka triggers if config == nil || len(config.Triggers.Kafka) == 0 { http.Error(w, "lambda has no Kafka triggers", http.StatusBadRequest) return } - // Register Kafka triggers err = kafkaManager.RegisterLambdaKafkaTriggers(lambdaName, config.Triggers.Kafka) if err != nil { slog.Error("Failed to register Kafka triggers", @@ -340,7 +300,7 @@ func HandleKafkaRegister(kafkaManager *KafkaManager, lambdaStore *lambdastore.La return } - slog.Info("Registered Kafka consumers via API", + slog.Info("Registered Kafka triggers via API", "lambda", lambdaName, "triggers", len(config.Triggers.Kafka)) @@ -348,20 +308,19 @@ func HandleKafkaRegister(kafkaManager *KafkaManager, lambdaStore *lambdastore.La json.NewEncoder(w).Encode(Response{ Status: "success", Lambda: lambdaName, - Message: fmt.Sprintf("Kafka consumers registered for %d trigger(s)", len(config.Triggers.Kafka)), + Message: fmt.Sprintf("Kafka triggers registered for %d trigger(s)", len(config.Triggers.Kafka)), }) case "DELETE": - // Unregister Kafka triggers kafkaManager.UnregisterLambdaKafkaTriggers(lambdaName) - slog.Info("Unregistered Kafka consumers via API", "lambda", lambdaName) + slog.Info("Unregistered Kafka triggers via API", "lambda", lambdaName) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(Response{ Status: "success", Lambda: lambdaName, - Message: "Kafka consumers unregistered", + Message: "Kafka triggers unregistered", }) default: diff --git a/go/worker/event/messageCache.go b/go/worker/event/messageCache.go new file mode 100644 index 000000000..cb9c66538 --- /dev/null +++ b/go/worker/event/messageCache.go @@ -0,0 +1,118 @@ +package event + +import ( + "container/list" + "sync" + "time" +) + +// CacheKey uniquely identifies a Kafka message by topic, partition, and offset. +type CacheKey struct { + Topic string + Partition int32 + Offset int64 +} + +// CachedMessage holds the data of a single Kafka message stored in the cache. +type CachedMessage struct { + Key []byte + Value []byte + Headers map[string]string + Timestamp time.Time + size int64 +} + +// cacheEntry pairs a CacheKey with its CachedMessage for storage in the LRU list. +type cacheEntry struct { + key CacheKey + message *CachedMessage +} + +// MessageCache is a thread-safe, LRU-evicting in-memory cache for Kafka messages. +// It is shared across all lambdas on a worker. +type MessageCache struct { + entries map[CacheKey]*list.Element + lruList *list.List // front = most recently used, back = least recently used + currentSize int64 + maxSize int64 + mu sync.RWMutex +} + +// NewMessageCache creates a MessageCache with the given maximum size in bytes. +func NewMessageCache(maxSizeBytes int64) *MessageCache { + return &MessageCache{ + entries: make(map[CacheKey]*list.Element), + lruList: list.New(), + maxSize: maxSizeBytes, + } +} + +// Get retrieves a single message from the cache. Returns the message and true +// on a hit, or nil and false on a miss. A hit promotes the entry to MRU. +func (mc *MessageCache) Get(key CacheKey) (*CachedMessage, bool) { + mc.mu.Lock() + defer mc.mu.Unlock() + + elem, ok := mc.entries[key] + if !ok { + return nil, false + } + + mc.lruList.MoveToFront(elem) + return elem.Value.(*cacheEntry).message, true +} + +// Put inserts a message into the cache. If the key already exists, the entry is +// updated and promoted to MRU. Evicts LRU entries if the cache exceeds maxSize. +func (mc *MessageCache) Put(key CacheKey, msg *CachedMessage) { + mc.mu.Lock() + defer mc.mu.Unlock() + + // Update existing entry + if elem, ok := mc.entries[key]; ok { + old := elem.Value.(*cacheEntry) + mc.currentSize -= old.message.size + old.message = msg + mc.currentSize += msg.size + mc.lruList.MoveToFront(elem) + mc.evict() + return + } + + // Insert new entry + entry := &cacheEntry{key: key, message: msg} + elem := mc.lruList.PushFront(entry) + mc.entries[key] = elem + mc.currentSize += msg.size + + mc.evict() +} + +// evict removes LRU entries until currentSize is at or below maxSize. +// Caller must hold mc.mu. +func (mc *MessageCache) evict() { + for mc.currentSize > mc.maxSize && mc.lruList.Len() > 0 { + back := mc.lruList.Back() + if back == nil { + break + } + entry := back.Value.(*cacheEntry) + mc.lruList.Remove(back) + delete(mc.entries, entry.key) + mc.currentSize -= entry.message.size + } +} + +// Size returns the current cache size in bytes. +func (mc *MessageCache) Size() int64 { + mc.mu.RLock() + defer mc.mu.RUnlock() + return mc.currentSize +} + +// Len returns the number of entries in the cache. +func (mc *MessageCache) Len() int { + mc.mu.RLock() + defer mc.mu.RUnlock() + return len(mc.entries) +} diff --git a/go/worker/event/messageCache_test.go b/go/worker/event/messageCache_test.go new file mode 100644 index 000000000..547dced9e --- /dev/null +++ b/go/worker/event/messageCache_test.go @@ -0,0 +1,176 @@ +package event + +import ( + "fmt" + "sync" + "testing" + "time" +) + +func makeMsg(value string, size int64) *CachedMessage { + return &CachedMessage{ + Value: []byte(value), + Timestamp: time.Now(), + size: size, + } +} + +func TestMessageCache_PutAndGet(t *testing.T) { + cache := NewMessageCache(1024) + + key := CacheKey{Topic: "t1", Partition: 0, Offset: 0} + msg := makeMsg("hello", 100) + + cache.Put(key, msg) + + got, ok := cache.Get(key) + if !ok { + t.Fatal("expected cache hit") + } + if string(got.Value) != "hello" { + t.Fatalf("expected 'hello', got %q", string(got.Value)) + } +} + +func TestMessageCache_Miss(t *testing.T) { + cache := NewMessageCache(1024) + + key := CacheKey{Topic: "t1", Partition: 0, Offset: 99} + _, ok := cache.Get(key) + if ok { + t.Fatal("expected cache miss") + } +} + +func TestMessageCache_LRUEviction(t *testing.T) { + // Cache can hold 200 bytes. Insert 3 x 100-byte messages → first should be evicted. + cache := NewMessageCache(200) + + k1 := CacheKey{Topic: "t", Partition: 0, Offset: 0} + k2 := CacheKey{Topic: "t", Partition: 0, Offset: 1} + k3 := CacheKey{Topic: "t", Partition: 0, Offset: 2} + + cache.Put(k1, makeMsg("a", 100)) + cache.Put(k2, makeMsg("b", 100)) + // Cache is full at 200. Insert k3 → k1 (LRU) should be evicted. + cache.Put(k3, makeMsg("c", 100)) + + if _, ok := cache.Get(k1); ok { + t.Fatal("k1 should have been evicted (LRU)") + } + if _, ok := cache.Get(k2); !ok { + t.Fatal("k2 should still be in cache") + } + if _, ok := cache.Get(k3); !ok { + t.Fatal("k3 should still be in cache") + } + if cache.Len() != 2 { + t.Fatalf("expected 2 entries, got %d", cache.Len()) + } +} + +func TestMessageCache_LRUEviction_AccessPromotes(t *testing.T) { + cache := NewMessageCache(200) + + k1 := CacheKey{Topic: "t", Partition: 0, Offset: 0} + k2 := CacheKey{Topic: "t", Partition: 0, Offset: 1} + k3 := CacheKey{Topic: "t", Partition: 0, Offset: 2} + + cache.Put(k1, makeMsg("a", 100)) + cache.Put(k2, makeMsg("b", 100)) + + // Access k1 to promote it → k2 becomes LRU + cache.Get(k1) + + // Insert k3 → k2 (now LRU) should be evicted, not k1 + cache.Put(k3, makeMsg("c", 100)) + + if _, ok := cache.Get(k1); !ok { + t.Fatal("k1 should still be in cache (was promoted by Get)") + } + if _, ok := cache.Get(k2); ok { + t.Fatal("k2 should have been evicted (was LRU)") + } + if _, ok := cache.Get(k3); !ok { + t.Fatal("k3 should still be in cache") + } +} + +func TestMessageCache_UpdateExisting(t *testing.T) { + cache := NewMessageCache(1024) + + key := CacheKey{Topic: "t", Partition: 0, Offset: 0} + cache.Put(key, makeMsg("old", 50)) + cache.Put(key, makeMsg("new", 60)) + + got, ok := cache.Get(key) + if !ok { + t.Fatal("expected cache hit") + } + if string(got.Value) != "new" { + t.Fatalf("expected 'new', got %q", string(got.Value)) + } + if cache.Size() != 60 { + t.Fatalf("expected size 60, got %d", cache.Size()) + } + if cache.Len() != 1 { + t.Fatalf("expected 1 entry, got %d", cache.Len()) + } +} + +func TestMessageCache_ConcurrentAccess(t *testing.T) { + cache := NewMessageCache(100000) + var wg sync.WaitGroup + + // 10 writers, each writing 100 entries + for w := 0; w < 10; w++ { + wg.Add(1) + go func(writer int) { + defer wg.Done() + for i := 0; i < 100; i++ { + key := CacheKey{Topic: fmt.Sprintf("t%d", writer), Partition: 0, Offset: int64(i)} + cache.Put(key, makeMsg("data", 10)) + } + }(w) + } + + // 10 readers running concurrently + for r := 0; r < 10; r++ { + wg.Add(1) + go func(reader int) { + defer wg.Done() + for i := 0; i < 100; i++ { + key := CacheKey{Topic: fmt.Sprintf("t%d", reader), Partition: 0, Offset: int64(i)} + cache.Get(key) + } + }(r) + } + + wg.Wait() + + // No panics or data races = pass + if cache.Len() == 0 { + t.Fatal("expected cache to have entries after concurrent writes") + } +} + +func TestMessageCache_SizeTracking(t *testing.T) { + cache := NewMessageCache(4096) + + cache.Put(CacheKey{Topic: "t", Partition: 0, Offset: 0}, makeMsg("a", 100)) + cache.Put(CacheKey{Topic: "t", Partition: 0, Offset: 1}, makeMsg("b", 200)) + + if cache.Size() != 300 { + t.Fatalf("expected size 300, got %d", cache.Size()) + } + + // Evict by inserting into a small cache + small := NewMessageCache(150) + small.Put(CacheKey{Topic: "t", Partition: 0, Offset: 0}, makeMsg("a", 100)) + small.Put(CacheKey{Topic: "t", Partition: 0, Offset: 1}, makeMsg("b", 100)) + + // First entry should be evicted + if small.Size() > 150 { + t.Fatalf("cache size %d exceeds max 150", small.Size()) + } +}