Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,9 @@ func main() {
serverOnlyFlag(a, "storage.tsdb.delayed-compaction.max-percent", "Sets the upper limit for the random compaction delay, specified as a percentage of the head chunk range. 100 means the compaction can be delayed by up to the entire head chunk range. Only effective when the delayed-compaction feature flag is enabled.").
Default("10").Hidden().IntVar(&cfg.tsdb.CompactionDelayMaxPercent)

serverOnlyFlag(a, "storage.tsdb.block-reload-interval", "Interval at which to check for new or removed blocks in storage. Users who manually backfill or drop blocks must wait up to this duration before changes become available.").
Default("1m").Hidden().SetValue(&cfg.tsdb.BlockReloadInterval)

agentOnlyFlag(a, "storage.agent.path", "Base path for metrics storage.").
Default("data-agent/").StringVar(&cfg.agentStoragePath)

Expand Down Expand Up @@ -665,6 +668,10 @@ func main() {
}
cfg.tsdb.MaxExemplars = cfgFile.StorageConfig.ExemplarsConfig.MaxExemplars
}
if cfg.tsdb.BlockReloadInterval < model.Duration(1*time.Second) {
logger.Warn("The option --storage.tsdb.block-reload-interval is set to a value less than 1s. Setting it to 1s to avoid overload.")
cfg.tsdb.BlockReloadInterval = model.Duration(1 * time.Second)
}
if cfgFile.StorageConfig.TSDBConfig != nil {
cfg.tsdb.OutOfOrderTimeWindow = cfgFile.StorageConfig.TSDBConfig.OutOfOrderTimeWindow
}
Expand Down Expand Up @@ -1318,6 +1325,7 @@ func main() {
"RetentionDuration", cfg.tsdb.RetentionDuration,
"WALSegmentSize", cfg.tsdb.WALSegmentSize,
"WALCompressionType", cfg.tsdb.WALCompressionType,
"BlockReloadInterval", cfg.tsdb.BlockReloadInterval,
)

startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000)
Expand Down Expand Up @@ -1860,6 +1868,7 @@ type tsdbOptions struct {
CompactionDelayMaxPercent int
EnableOverlappingCompaction bool
UseUncachedIO bool
BlockReloadInterval model.Duration
}

func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
Expand All @@ -1884,6 +1893,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
UseUncachedIO: opts.UseUncachedIO,
BlockReloadInterval: time.Duration(opts.BlockReloadInterval),
}
}

Expand Down
9 changes: 8 additions & 1 deletion tsdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func DefaultOptions() *Options {
CompactionDelayMaxPercent: DefaultCompactionDelayMaxPercent,
CompactionDelay: time.Duration(0),
PostingsDecoderFactory: DefaultPostingsDecoderFactory,
BlockReloadInterval: 1 * time.Minute,
}
}

Expand Down Expand Up @@ -222,6 +223,9 @@ type Options struct {

// UseUncachedIO allows bypassing the page cache when appropriate.
UseUncachedIO bool

// BlockReloadInterval is the interval at which blocks are reloaded.
BlockReloadInterval time.Duration
}

type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
Expand Down Expand Up @@ -812,6 +816,9 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
if opts.OutOfOrderTimeWindow < 0 {
opts.OutOfOrderTimeWindow = 0
}
if opts.BlockReloadInterval < 1*time.Second {
opts.BlockReloadInterval = 1 * time.Second
}

if len(rngs) == 0 {
// Start with smallest block duration and create exponential buckets until the exceed the
Expand Down Expand Up @@ -1087,7 +1094,7 @@ func (db *DB) run(ctx context.Context) {
}

select {
case <-time.After(1 * time.Minute):
case <-time.After(db.opts.BlockReloadInterval):
db.cmtx.Lock()
if err := db.reloadBlocks(); err != nil {
db.logger.Error("reloadBlocks", "err", err)
Expand Down
60 changes: 58 additions & 2 deletions tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3929,10 +3929,27 @@ func TestQuerierShouldNotFailIfOOOCompactionOccursAfterRetrievingIterators(t *te
require.Eventually(t, compactionComplete.Load, time.Second, 10*time.Millisecond, "compaction should complete after querier was closed")
}

func newTestDB(t *testing.T) *DB {
type testDBOptions struct {
opts *Options
}

type testDBOpt func(*testDBOptions)

func withOpts(opts *Options) testDBOpt {
return func(o *testDBOptions) {
o.opts = opts
}
}

func newTestDB(t *testing.T, fopts ...testDBOpt) *DB {
dir := t.TempDir()

db, err := Open(dir, nil, nil, DefaultOptions(), nil)
o := &testDBOptions{opts: DefaultOptions()}
for _, fopt := range fopts {
fopt(o)
}

db, err := Open(dir, nil, nil, o.opts, nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, db.Close())
Expand Down Expand Up @@ -9517,3 +9534,42 @@ func TestBlockClosingBlockedDuringRemoteRead(t *testing.T) {
case <-blockClosed:
}
}

func TestBlockReloadInterval(t *testing.T) {
t.Parallel()

cases := []struct {
name string
reloadInterval time.Duration
expectedReloads float64
}{
{
name: "extremely small interval",
reloadInterval: 1 * time.Millisecond,
expectedReloads: 5,
},
{
name: "one second interval",
reloadInterval: 1 * time.Second,
expectedReloads: 5,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
t.Parallel()
db := newTestDB(t, withOpts(&Options{
BlockReloadInterval: c.reloadInterval,
}))
if c.reloadInterval < 1*time.Second {
require.Equal(t, 1*time.Second, db.opts.BlockReloadInterval, "interval should be clamped to minimum of 1 second")
}
require.Equal(t, float64(1), prom_testutil.ToFloat64(db.metrics.reloads), "there should be one initial reload")
require.Eventually(t, func() bool {
return prom_testutil.ToFloat64(db.metrics.reloads) == c.expectedReloads
},
5*time.Second,
100*time.Millisecond,
)
})
}
}
Loading