Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,18 @@ func main() {
startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000)
localStorage.Set(db, startTimeMargin)
db.SetWriteNotified(remoteStorage)

if cfg.tsdb.EnableFastStartup {
go func() {
// Wait for queries to become enabled.
<-db.Head().WaitForWALReplay()
localStorage.SetQueryReady()
logger.Info("WAL replay finished. Queries are now enabled.")
}()
} else {
localStorage.SetQueryReady()
}

close(dbOpen)
<-cancel
logger.Info("TSDB stopped")
Expand Down Expand Up @@ -1507,6 +1519,7 @@ func main() {
)

localStorage.Set(db, 0)
localStorage.SetQueryReady()
db.SetWriteNotified(remoteStorage)
close(dbOpen)
<-cancel
Expand Down Expand Up @@ -1739,6 +1752,7 @@ type readyStorage struct {
db storage.Storage
startTimeMargin int64
stats *tsdb.DBStats
queryReady atomic.Bool
}

func (s *readyStorage) ApplyConfig(conf *config.Config) error {
Expand Down Expand Up @@ -1772,6 +1786,10 @@ func (s *readyStorage) getStats() *tsdb.DBStats {
return x
}

func (s *readyStorage) SetQueryReady() {
s.queryReady.Store(true)
}

// StartTime implements the Storage interface.
func (s *readyStorage) StartTime() (int64, error) {
if x := s.get(); x != nil {
Expand All @@ -1797,6 +1815,9 @@ func (s *readyStorage) StartTime() (int64, error) {

// Querier implements the Storage interface.
func (s *readyStorage) Querier(mint, maxt int64) (storage.Querier, error) {
if !s.queryReady.Load() {
return nil, tsdb.ErrNotReady
}
if x := s.get(); x != nil {
return x.Querier(mint, maxt)
}
Expand All @@ -1805,13 +1826,19 @@ func (s *readyStorage) Querier(mint, maxt int64) (storage.Querier, error) {

// ChunkQuerier implements the Storage interface.
func (s *readyStorage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
if !s.queryReady.Load() {
return nil, tsdb.ErrNotReady
}
if x := s.get(); x != nil {
return x.ChunkQuerier(mint, maxt)
}
return nil, tsdb.ErrNotReady
}

func (s *readyStorage) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {
if !s.queryReady.Load() {
return nil, tsdb.ErrNotReady
}
if x := s.get(); x != nil {
switch db := x.(type) {
case *tsdb.DB:
Expand Down Expand Up @@ -1893,6 +1920,9 @@ func (s *readyStorage) Close() error {

// CleanTombstones implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) CleanTombstones() error {
if !s.queryReady.Load() {
return tsdb.ErrNotReady
}
if x := s.get(); x != nil {
switch db := x.(type) {
case *tsdb.DB:
Expand All @@ -1908,6 +1938,9 @@ func (s *readyStorage) CleanTombstones() error {

// BlockMetas implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) BlockMetas() ([]tsdb.BlockMeta, error) {
if !s.queryReady.Load() {
return nil, tsdb.ErrNotReady
}
if x := s.get(); x != nil {
switch db := x.(type) {
case *tsdb.DB:
Expand All @@ -1923,6 +1956,9 @@ func (s *readyStorage) BlockMetas() ([]tsdb.BlockMeta, error) {

// Delete implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
if !s.queryReady.Load() {
return tsdb.ErrNotReady
}
if x := s.get(); x != nil {
switch db := x.(type) {
case *tsdb.DB:
Expand All @@ -1938,6 +1974,9 @@ func (s *readyStorage) Delete(ctx context.Context, mint, maxt int64, ms ...*labe

// Snapshot implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) Snapshot(dir string, withHead bool) error {
if !s.queryReady.Load() {
return tsdb.ErrNotReady
}
if x := s.get(); x != nil {
switch db := x.(type) {
case *tsdb.DB:
Expand All @@ -1953,6 +1992,9 @@ func (s *readyStorage) Snapshot(dir string, withHead bool) error {

// Stats implements the api_v1.TSDBAdminStats interface.
func (s *readyStorage) Stats(statsByLabelName string, limit int) (*tsdb.Stats, error) {
if !s.queryReady.Load() {
return nil, tsdb.ErrNotReady
}
if x := s.get(); x != nil {
switch db := x.(type) {
case *tsdb.DB:
Expand Down
94 changes: 66 additions & 28 deletions tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ type Head struct {
seriesStateQuit chan struct{}
seriesStateWg sync.WaitGroup

// For running the background WAL replay.
walReplayWg sync.WaitGroup

// For signalling when the background WAL replay finishes.
walReplayDone chan struct{}

stats *HeadStats
reg prometheus.Registerer

Expand Down Expand Up @@ -305,6 +311,7 @@ func NewHead(r prometheus.Registerer, l *slog.Logger, wal, wbl *wlog.WL, opts *H
stats: stats,
reg: r,
seriesStateQuit: make(chan struct{}),
walReplayDone: make(chan struct{}),
}
if err := h.resetInMemoryState(); err != nil {
return nil, err
Expand Down Expand Up @@ -678,11 +685,7 @@ func (s *WALReplayStatus) GetWALReplayStatus() WALReplayStatus {

const cardinalityCacheExpirationTime = time.Duration(30) * time.Second

// Init loads data from the write ahead log and prepares the head for writes.
// It should be called before using an appender so that it
// limits the ingested samples to the head min valid time.
func (h *Head) Init(minValidTime int64) error {
h.minValidTime.Store(minValidTime)
func (h *Head) replayDiskChunksAndWAL() error {
defer h.resetWLReplayResources()
defer func() {
h.postings.EnsureOrder(h.opts.WALReplayConcurrency)
Expand Down Expand Up @@ -805,28 +808,6 @@ func (h *Head) Init(minValidTime int64) error {
return fmt.Errorf("finding WAL segments: %w", e)
}

if h.opts.EnableFastStartup {
state, err := h.readSeriesStateFile()
if err != nil && !os.IsNotExist(err) {
h.logger.Warn("Failed to read series state file, skipping the fast startup", "err", err)
}
if err == nil {
if state.CleanShutdown {
h.lastSeriesID.Store(state.LastSeriesID)
h.logger.Info("Fast startup: clean shutdown detected, restored last series ID", "last_series_id", state.LastSeriesID)
} else {
h.logger.Info("Fast startup: unclean shutdown detected, performing WAL scan", "from_segment", state.LastWALSegment, "to_segment", endAt)
id, scanErr := h.findLastSeriesID(state, endAt)
if scanErr != nil {
h.logger.Error("Fast startup: WAL scan failed, skipping fast startup", "err", scanErr)
} else {
h.lastSeriesID.Store(id)
h.logger.Info("Fast startup: WAL scan completed", "last_series_id", id)
}
}
}
}

h.startWALReplayStatus(startFrom, endAt)

syms := labels.NewSymbolTable() // One table for the whole WAL.
Expand Down Expand Up @@ -932,14 +913,68 @@ func (h *Head) Init(minValidTime int64) error {
"total_replay_duration", totalReplayDuration.String(),
)

return nil
}

// Init loads data from the write ahead log and prepares the head for writes.
// It should be called before using an appender so that it
// limits the ingested samples to the head min valid time.
func (h *Head) Init(minValidTime int64) error {
h.minValidTime.Store(minValidTime)

// TODO(RushabhMehta2005): Remove this 'if' block and always run the series state ticker when the feature is fully implemented.
if h.opts.EnableFastStartup {
if h.wal != nil {
// Find the last WAL segment.
_, endAt, e := wlog.Segments(h.wal.Dir())
if e != nil {
return fmt.Errorf("finding WAL segments: %w", e)
}

state, err := h.readSeriesStateFile()
if err != nil && !os.IsNotExist(err) {
h.logger.Warn("Failed to read series state file, skipping the fast startup", "err", err)
}
if err == nil {
if state.CleanShutdown {
h.lastSeriesID.Store(state.LastSeriesID)
h.logger.Info("Fast startup: clean shutdown detected, restored last series ID", "last_series_id", state.LastSeriesID)
} else {
h.logger.Info("Fast startup: unclean shutdown detected, performing WAL scan", "from_segment", state.LastWALSegment, "to_segment", endAt)
id, scanErr := h.findLastSeriesID(state, endAt)
if scanErr != nil {
h.logger.Error("Fast startup: WAL scan failed, skipping fast startup", "err", scanErr)
} else {
h.lastSeriesID.Store(id)
h.logger.Info("Fast startup: WAL scan completed", "last_series_id", id)
}
}
}
}

// Start the background goroutine that writes to series_state.json.
h.seriesStateWg.Add(1)
go h.runSeriesStateTicker()

h.walReplayWg.Go(func() {
defer close(h.walReplayDone)
if err := h.replayDiskChunksAndWAL(); err != nil {
h.logger.Error("Fast startup: Background WAL replay failed", "err", err)
}
})

return nil
}

return nil
// When feature is not enabled, blocking call.
err := h.replayDiskChunksAndWAL()
close(h.walReplayDone)
return err
}

// WaitForWALReplay returns a channel that is closed when the WAL replay has finished.
func (h *Head) WaitForWALReplay() <-chan struct{} {
return h.walReplayDone
}

func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries) (map[chunks.HeadSeriesRef][]*mmappedChunk, map[chunks.HeadSeriesRef][]*mmappedChunk, chunks.ChunkDiskMapperRef, error) {
Expand Down Expand Up @@ -1851,6 +1886,9 @@ func (h *Head) compactable() bool {
// Close flushes the WAL and closes the head.
// It also takes a snapshot of in-memory chunks if enabled.
func (h *Head) Close() error {
// Wait for background WAL replay to finish before shutdown.
h.walReplayWg.Wait()
Comment on lines +1889 to +1890
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that a good idea? WAL replay can take ten minutes; if someone requested to shut down do we want to wait that long?

Could it be cancelable?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am waiting here for it to complete as it is the simplest thing to do. What if we kill that goroutine when Close is called? Could it lead to wal file corruption or mess up some other state? I am not sure of the answer to these questions.


h.closedMtx.Lock()
defer h.closedMtx.Unlock()
h.closed = true
Expand Down
1 change: 1 addition & 0 deletions tsdb/head_wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
errChan <- err
return
}

localRefSeries[csr.ref] = series
for {
seriesID := uint64(series.ref)
Expand Down
Loading