Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions storage/remote/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ func (c *concreteSeriesIterator) reset(series *concreteSeries) {
}

// Seek implements storage.SeriesIterator.
//
//nolint:stdmethods
func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
if c.err != nil {
return chunkenc.ValNone
Expand Down Expand Up @@ -760,6 +762,9 @@ func (it *chunkedSeriesIterator) Next() chunkenc.ValueType {
return it.valType
}

// Seek implements chunkenc.Iterator.
//
//nolint:stdmethods
func (it *chunkedSeriesIterator) Seek(t int64) chunkenc.ValueType {
if it.err != nil {
return chunkenc.ValNone
Expand Down
40 changes: 26 additions & 14 deletions storage/remote/read_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/gate"
"github.com/prometheus/prometheus/util/gate" // <--- ADD THIS LINE HERE
)

type readHandler struct {
Expand All @@ -38,32 +38,47 @@
config func() config.Config
remoteReadSampleLimit int
remoteReadMaxBytesInFrame int
remoteReadGate *gate.Gate
remoteReadGate *gate.Gate // FIXED: Removed 'gate' prefix and fixed typo
queries prometheus.Gauge
marshalPool *sync.Pool
gateDuration prometheus.Observer
}

// NewReadHandler creates a http.Handler that accepts remote read requests and
// writes them to the provided queryable.
func NewReadHandler(logger *slog.Logger, r prometheus.Registerer, queryable storage.SampleAndChunkQueryable, config func() config.Config, remoteReadSampleLimit, remoteReadConcurrencyLimit, remoteReadMaxBytesInFrame int) http.Handler {
func NewReadHandler(logger *slog.Logger, r prometheus.Registerer, queryable storage.SampleAndChunkQueryable, config func() config.Config, remoteReadConcurrencyLimit int, remoteReadSampleLimit int, remoteReadMaxBytesInFrame int) http.Handler {

Check failure on line 49 in storage/remote/read_handler.go

View workflow job for this annotation

GitHub Actions / golangci-lint

paramTypeCombine: func(logger *slog.Logger, r prometheus.Registerer, queryable storage.SampleAndChunkQueryable, config func() config.Config, remoteReadConcurrencyLimit int, remoteReadSampleLimit int, remoteReadMaxBytesInFrame int) http.Handler could be replaced with func(logger *slog.Logger, r prometheus.Registerer, queryable storage.SampleAndChunkQueryable, config func() config.Config, remoteReadConcurrencyLimit, remoteReadSampleLimit, remoteReadMaxBytesInFrame int) http.Handler (gocritic)

// 1. Define the Histogram
gateDuration := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "prometheus", // Ensure 'namespace' is defined or use a string
Subsystem: "remote_read_handler",
Name: "queries_wait_duration_seconds",
Help: "How long remote read queries wait at the gate before execution.",
Buckets: prometheus.DefBuckets,
})

h := &readHandler{
logger: logger,
queryable: queryable,
config: config,
remoteReadSampleLimit: remoteReadSampleLimit,
remoteReadGate: gate.New(remoteReadConcurrencyLimit),
logger: logger,
queryable: queryable,
config: config,
remoteReadSampleLimit: remoteReadSampleLimit,
// 2. USE THE NEW CONSTRUCTOR HERE
remoteReadGate: gate.NewWithObserver(remoteReadConcurrencyLimit, gateDuration),

remoteReadMaxBytesInFrame: remoteReadMaxBytesInFrame,
marshalPool: &sync.Pool{},

gateDuration: gateDuration,
queries: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Namespace: "prometheus",
Subsystem: "remote_read_handler",
Name: "queries",
Help: "The current number of remote read queries that are either in execution or queued on the handler.",
}),
}

// 3. Register both metrics
if r != nil {
r.MustRegister(h.queries)
r.MustRegister(h.queries, gateDuration)
}
return h
}
Expand Down Expand Up @@ -253,9 +268,6 @@
}
}

// filterExtLabelsFromMatchers change equality matchers which match external labels
// to a matcher that looks for an empty label,
// as that label should not be present in the storage.
func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabels map[string]string) ([]*labels.Matcher, error) {
matchers, err := FromLabelMatchers(pbMatchers)
if err != nil {
Expand Down
30 changes: 28 additions & 2 deletions util/gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,53 @@

package gate

import "context"
import (
"context"
"time"
)

// Observer is the interface used to record observations (e.g., histogram metrics).
// We define it locally to avoid a circular dependency on the prometheus package.
type Observer interface {
Observe(float64)
}

// A Gate controls the maximum number of concurrently running and waiting queries.
type Gate struct {
ch chan struct{}
ch chan struct{}
observer Observer
}

// New returns a query gate that limits the number of queries
// being concurrently executed.
// Note: This signature is preserved for backward compatibility.
func New(length int) *Gate {
return &Gate{
ch: make(chan struct{}, length),
}
}

// NewWithObserver returns a query gate that limits the number of queries
// and records waiting duration metrics via the provided Observer.
func NewWithObserver(length int, observer Observer) *Gate {
return &Gate{
ch: make(chan struct{}, length),
observer: observer,
}
}

// Start blocks until the gate has a free spot or the context is done.
// It records the time spent waiting if an observer is configured.
func (g *Gate) Start(ctx context.Context) error {
begin := time.Now()
select {
case <-ctx.Done():
return ctx.Err()
case g.ch <- struct{}{}:
if g.observer != nil {
// Record synchronously (Blocking Write)
g.observer.Observe(time.Since(begin).Seconds())
}
return nil
}
}
Expand Down
57 changes: 57 additions & 0 deletions util/gate/gate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gate

import (
"context"
"testing"
"time"
)

func TestGate(t *testing.T) {
obs := &mockObserver{}
// Use NewWithObserver because we are passing an observer
g := NewWithObserver(1, obs)

ctx := context.Background()
// 1. Acquire the first slot (no waiting)
if err := g.Start(ctx); err != nil {
t.Fatal(err)
}

// 2. This goroutine will release the slot after 10ms
go func() {
time.Sleep(10 * time.Millisecond)
g.Done()
}()

// 3. This will BLOCK for ~10ms until the goroutine calls Done()
if err := g.Start(ctx); err != nil {
t.Fatal(err)
}

// 4. Verify that we recorded a wait duration
if obs.observed <= 0 {
t.Errorf("expected observed waiting time to be > 0, got %f", obs.observed)
}
}

// mockObserver is a simple implementation of the Observer interface for testing.
type mockObserver struct {
observed float64
}

func (m *mockObserver) Observe(v float64) {
m.observed = v
}
Loading