remote_write: implement restart from segment-based savepoint#18485
remote_write: implement restart from segment-based savepoint#18485x1unix wants to merge 18 commits intoprometheus:mainfrom
Conversation
Signed-off-by: x1unix <[email protected]>
Signed-off-by: x1unix <[email protected]>
Signed-off-by: x1unix <[email protected]>
kgeckhart
left a comment
There was a problem hiding this comment.
You're on the right track, there's a few things to consider with the implementation.
| if startSegment >= 0 { | ||
| t.watcher.SetStartSegment(startSegment) | ||
| } | ||
|
|
There was a problem hiding this comment.
Is it necessary to do this as a separate check vs passing the param and letting the watcher do it?
| @@ -513,6 +539,7 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error { | |||
| // Also used with readCheckpoint - implements segmentReadFn. | |||
| // TODO(bwplotka): Rename tail to !onlySeries; extremely confusing and easy to miss. | |||
There was a problem hiding this comment.
I think we should do this TODO. I was also burned by this when reading this code in the past. I renamed it during my hacking (grafana/prometheus@staleness_disabling_v3.4.2...kgeckhart:prometheus:kgeckhart/replay-hacking#diff-0ab108a20060eb76d034c4fd1a9c5112cf981141f870d0211b1e73faead7f888L335) for very similar reasons.
| // Also used with readCheckpoint - implements segmentReadFn. | ||
| // TODO(bwplotka): Rename tail to !onlySeries; extremely confusing and easy to miss. | ||
| func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { | ||
| replay := w.startSegment >= 0 |
There was a problem hiding this comment.
Is this true? We would only be replaying if startSegment >=0 and currentSegment is < startSegment.
| var currentSegment int | ||
| if w.startSegment < 0 { | ||
| currentSegment, err = w.findSegmentForIndex(checkpointIndex) | ||
| } else { | ||
| // Respect checkpoint if it's ahead of the savepoint | ||
| // (segments before the checkpoint have been compacted away). | ||
| startIdx := max(w.startSegment, checkpointIndex) | ||
| currentSegment, err = w.findSegmentForIndex(startIdx) | ||
| } |
There was a problem hiding this comment.
IIUC this is going to move the current segment ahead based on the starting segment which we don't want to do. When we startup we need to read everything to load the queue manager caches. We need to control the cut over point between reading to load the cache and reading to send data.
| func (rws *WriteStorage) persistSavepoint() { | ||
| rws.mtx.Lock() | ||
| sp := rws.collectSavepoint() | ||
| rws.mtx.Unlock() | ||
|
|
||
| if err := sp.Save(rws.dir); err != nil { | ||
| rws.logger.Error("Failed to persist remote write savepoint", "err", err) | ||
| } | ||
| } | ||
|
|
||
| // persistSavepointLocked persists the savepoint while the mutex is already held. | ||
| func (rws *WriteStorage) persistSavepointLocked() { | ||
| sp := rws.collectSavepoint() | ||
| if err := sp.Save(rws.dir); err != nil { | ||
| rws.logger.Error("Failed to persist remote write savepoint", "err", err) | ||
| } | ||
| } |
There was a problem hiding this comment.
Probably worth a comment about why these both exist. IIUC persistSavepointLocked is used for shutdown where the mutex is already held for the length of shutdown and persistSavepoint is more narrowly scoped lock wise.
|
|
||
| // collectSavepoint updates the savepoint from current queue positions and returns a copy. | ||
| // Must be called with rws.mtx held. | ||
| func (rws *WriteStorage) collectSavepoint() Savepoint { |
There was a problem hiding this comment.
I'm not a fan of the side effect that we update rws and return the new Savepoint. I think we should decouple this because persistSavepointLocked doesn't need to update the rws value because it's being closed. persistSavepoint needs to do it but if the only place that it's done is there it can be done without holding the lock because it's only touched by persistSavepoint which won't be called in parallel.
There was a problem hiding this comment.
Addressed together with a previous item.
Signed-off-by: x1unix <[email protected]>
Signed-off-by: x1unix <[email protected]>
Signed-off-by: x1unix <[email protected]>
Signed-off-by: x1unix <[email protected]>
Signed-off-by: x1unix <[email protected]>
Signed-off-by: x1unix <[email protected]>
e7f9a72 to
eabd9de
Compare
Signed-off-by: x1unix <[email protected]>
eabd9de to
fe0e5e1
Compare
Signed-off-by: x1unix <[email protected]>
Signed-off-by: x1unix <[email protected]>
…ient Signed-off-by: x1unix <[email protected]>
Signed-off-by: x1unix <[email protected]>
Signed-off-by: x1unix <[email protected]>
Signed-off-by: x1unix <[email protected]>
Signed-off-by: x1unix <[email protected]>
Signed-off-by: x1unix <[email protected]>
Which issue(s) does the PR fix:
This PR implements a mechanism for remote write to track and resume from last WAL segment.
PR is based on the proposal prometheus/proposals#72
Release notes for end users (ALL commits must be considered).
Reviewers should verify clarity and quality.