Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions tsdb/chunkenc/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ type Appender interface {
// The Appender app that can be used for the next append is always returned.
AppendHistogram(prev *HistogramAppender, st, t int64, h *histogram.Histogram, appendOnly bool) (c Chunk, isRecoded bool, app Appender, err error)
AppendFloatHistogram(prev *FloatHistogramAppender, st, t int64, h *histogram.FloatHistogram, appendOnly bool) (c Chunk, isRecoded bool, app Appender, err error)

// LastValue returns the most recently appended sample value.
// For float samples, v is set and h, fh are nil.
// For integer histogram samples, h is set and v, fh are zero/nil.
// For float histogram samples, fh is set and v, h are zero/nil.
// If no sample has been appended yet, all return values are zero/nil.
LastValue() (v float64, h *histogram.Histogram, fh *histogram.FloatHistogram)
}

// Iterator is a simple iterator that can only get the next value.
Expand Down
21 changes: 21 additions & 0 deletions tsdb/chunkenc/float_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,32 @@ type FloatHistogramAppender struct {
t, tDelta int64
sum, cnt, zCnt xorValue
pBuckets, nBuckets []xorValue

// lastValue is the most recently appended float histogram, retained for
// duplicate detection and stale-marker tracking.
lastValue *histogram.FloatHistogram
}

func (a *FloatHistogramAppender) GetCounterResetHeader() CounterResetHeader {
return CounterResetHeader(a.b.bytes()[histogramFlagPos] & CounterResetHeaderMask)
}

// LastFloatHistogram returns the most recently appended float histogram, or nil
// if none has been appended yet.
func (a *FloatHistogramAppender) LastFloatHistogram() *histogram.FloatHistogram {
return a.lastValue
}

// SetLastFloatHistogram sets the last float histogram value. Used to restore
// state after loading the appender from a snapshot.
func (a *FloatHistogramAppender) SetLastFloatHistogram(fh *histogram.FloatHistogram) {
a.lastValue = fh
}

func (a *FloatHistogramAppender) LastValue() (float64, *histogram.Histogram, *histogram.FloatHistogram) {
return 0, nil, a.lastValue
}

func (a *FloatHistogramAppender) setCounterResetHeader(cr CounterResetHeader) {
a.b.bytes()[histogramFlagPos] = (a.b.bytes()[histogramFlagPos] & (^CounterResetHeaderMask)) | (byte(cr) & CounterResetHeaderMask)
}
Expand Down Expand Up @@ -606,6 +626,7 @@ func (a *FloatHistogramAppender) appendFloatHistogram(t int64, h *histogram.Floa

a.t = t
a.tDelta = tDelta
a.lastValue = h
}

func (a *FloatHistogramAppender) writeXorValue(old *xorValue, v float64) {
Expand Down
21 changes: 21 additions & 0 deletions tsdb/chunkenc/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,32 @@ type HistogramAppender struct {
sum float64
leading uint8
trailing uint8

// lastValue is the most recently appended histogram, retained for
// duplicate detection and stale-marker tracking.
lastValue *histogram.Histogram
}

func (a *HistogramAppender) GetCounterResetHeader() CounterResetHeader {
return CounterResetHeader(a.b.bytes()[histogramFlagPos] & CounterResetHeaderMask)
}

// LastHistogram returns the most recently appended histogram, or nil if none
// has been appended yet.
func (a *HistogramAppender) LastHistogram() *histogram.Histogram {
return a.lastValue
}

// SetLastHistogram sets the last histogram value. Used to restore state after
// loading the appender from a snapshot.
func (a *HistogramAppender) SetLastHistogram(h *histogram.Histogram) {
a.lastValue = h
}

func (a *HistogramAppender) LastValue() (float64, *histogram.Histogram, *histogram.FloatHistogram) {
return 0, a.lastValue, nil
}

func (a *HistogramAppender) setCounterResetHeader(cr CounterResetHeader) {
a.b.bytes()[histogramFlagPos] = (a.b.bytes()[histogramFlagPos] & (^CounterResetHeaderMask)) | (byte(cr) & CounterResetHeaderMask)
}
Expand Down Expand Up @@ -659,6 +679,7 @@ func (a *HistogramAppender) appendHistogram(t int64, h *histogram.Histogram) {
copy(a.nBuckets, h.NegativeBuckets)
// Note that the bucket deltas were already updated above.
a.sum = h.Sum
a.lastValue = h
}

// recode converts the current chunk to accommodate an expansion of the set of
Expand Down
4 changes: 4 additions & 0 deletions tsdb/chunkenc/xor.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ func (*xorAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, int64,
panic("appended a float histogram sample to a float chunk")
}

func (a *xorAppender) LastValue() (float64, *histogram.Histogram, *histogram.FloatHistogram) {
return a.v, nil, nil
}

type xorIterator struct {
br bstreamReader
numTotal uint16
Expand Down
4 changes: 4 additions & 0 deletions tsdb/chunkenc/xor2.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,10 @@ func (*xor2Appender) AppendFloatHistogram(*FloatHistogramAppender, int64, int64,
panic("appended a float histogram sample to a float chunk")
}

func (a *xor2Appender) LastValue() (float64, *histogram.Histogram, *histogram.FloatHistogram) {
return a.v, nil, nil
}

// xor2Iterator decodes XOR2 chunks.
type xor2Iterator struct {
br bstreamReader
Expand Down
29 changes: 13 additions & 16 deletions tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -2141,9 +2141,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
defer s.locks[refShard].Unlock()
}

if value.IsStaleNaN(series.lastValue) ||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) {
if series.isStaleLastValue() {
staleSeriesDeleted++
}

Expand Down Expand Up @@ -2232,9 +2230,7 @@ func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) {
h.series.hashes[hashShard].del(hash, series.ref)
h.series.locks[hashShard].Unlock()

if value.IsStaleNaN(series.lastValue) ||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) {
if series.isStaleLastValue() {
staleSeriesDeleted++
}

Expand Down Expand Up @@ -2292,9 +2288,7 @@ func (s *stripeSeries) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64)
}

// Check if the series is still stale.
isStale := value.IsStaleNaN(series.lastValue) ||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum))
isStale := series.isStaleLastValue()

if !isStale {
return
Expand Down Expand Up @@ -2481,13 +2475,6 @@ type memSeries struct {
histogramChunkHasComputedEndTime bool // True if nextAt has been predicted for the current histograms chunk; false otherwise.
pendingCommit bool // Whether there are samples waiting to be committed to this series.

// We keep the last value here (in addition to appending it to the chunk) so we can check for duplicates.
lastValue float64

// We keep the last histogram value here (in addition to appending it to the chunk) so we can check for duplicates.
lastHistogramValue *histogram.Histogram
lastFloatHistogramValue *histogram.FloatHistogram

// Current appender for the head chunk. Set when a new head chunk is cut.
// It is nil only if headChunks is nil. E.g. if there was an appender that created a new series, but rolled back the commit
// (the first sample would create a headChunk, hence appender, but rollback skipped it while the Append() call would create a series).
Expand All @@ -2497,6 +2484,16 @@ type memSeries struct {
txs *txRing
}

// isStaleLastValue reports whether the most recently appended sample (of any
// type) is a stale marker. The series lock must be held.
func (s *memSeries) isStaleLastValue() bool {
if s.app == nil {
return false
}
v, h, fh := s.app.LastValue()
return value.IsStaleNaN(v) || (h != nil && value.IsStaleNaN(h.Sum)) || (fh != nil && value.IsStaleNaN(fh.Sum))
}

// memSeriesOOOFields contains the fields required by memSeries
// to handle out-of-order data.
type memSeriesOOOFields struct {
Expand Down
58 changes: 29 additions & 29 deletions tsdb/head_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,14 +660,15 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi
// like federation and erroring out at that time would be extremely noisy.
// This only checks against the latest in-order sample.
// The OOO headchunk has its own method to detect these duplicates.
if s.lastHistogramValue != nil || s.lastFloatHistogramValue != nil {
return false, 0, storage.NewDuplicateHistogramToFloatErr(t, v)
prevV, prevH, prevFH := s.app.LastValue()
if prevH == nil && prevFH == nil && math.Float64bits(prevV) == math.Float64bits(v) {
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
return false, 0, nil
}
if math.Float64bits(s.lastValue) != math.Float64bits(v) {
return false, 0, storage.NewDuplicateFloatErr(t, s.lastValue, v)
if prevH != nil || prevFH != nil {
return false, 0, storage.NewDuplicateHistogramToFloatErr(t, v)
}
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
return false, 0, nil
return false, 0, storage.NewDuplicateFloatErr(t, prevV, v)
}
}

Expand Down Expand Up @@ -705,7 +706,8 @@ func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram, headMax
// like federation and erroring out at that time would be extremely noisy.
// This only checks against the latest in-order sample.
// The OOO headchunk has its own method to detect these duplicates.
if !h.Equals(s.lastHistogramValue) {
_, prevH, _ := s.app.LastValue()
if !h.Equals(prevH) {
return false, 0, storage.ErrDuplicateSampleForTimestamp
}
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
Expand Down Expand Up @@ -747,7 +749,8 @@ func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogr
// like federation and erroring out at that time would be extremely noisy.
// This only checks against the latest in-order sample.
// The OOO headchunk has its own method to detect these duplicates.
if !fh.Equals(s.lastFloatHistogramValue) {
_, _, prevFH := s.app.LastValue()
if !fh.Equals(prevFH) {
return false, 0, storage.ErrDuplicateSampleForTimestamp
}
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
Expand Down Expand Up @@ -1352,8 +1355,8 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte
// sample for this same series in this same batch
// (because any such sample would have triggered a new
// batch).
switch {
case series.lastHistogramValue != nil:
switch series.app.(type) {
case *chunkenc.HistogramAppender:
b.histograms = append(b.histograms, record.RefHistogramSample{
Ref: series.ref,
T: s.T,
Expand All @@ -1365,7 +1368,7 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte
acc.histogramsAppended++
series.Unlock()
continue
case series.lastFloatHistogramValue != nil:
case *chunkenc.FloatHistogramAppender:
b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{
Ref: series.ref,
T: s.T,
Expand Down Expand Up @@ -1433,8 +1436,9 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte
acc.floatsAppended--
}
default:
newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V)
staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V)
isLastStale := series.isStaleLastValue()
newlyStale := !isLastStale && value.IsStaleNaN(s.V)
staleToNonStale := isLastStale && !value.IsStaleNaN(s.V)
ok, chunkCreated = series.append(s.ST, s.T, s.V, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
Expand Down Expand Up @@ -1541,9 +1545,12 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC
default:
newlyStale := value.IsStaleNaN(s.H.Sum)
staleToNonStale := false
if series.lastHistogramValue != nil {
newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum)
if series.app != nil {
if _, prevH, _ := series.app.LastValue(); prevH != nil {
isLastStale := value.IsStaleNaN(prevH.Sum)
newlyStale = newlyStale && !isLastStale
staleToNonStale = isLastStale && !value.IsStaleNaN(s.H.Sum)
}
}
// TODO(krajorama,ywwg): pass ST when available in WAL.
ok, chunkCreated = series.appendHistogram(0, s.T, s.H, a.appendID, acc.appendChunkOpts)
Expand Down Expand Up @@ -1652,9 +1659,12 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo
default:
newlyStale := value.IsStaleNaN(s.FH.Sum)
staleToNonStale := false
if series.lastFloatHistogramValue != nil {
newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum)
if series.app != nil {
if _, _, prevFH := series.app.LastValue(); prevFH != nil {
isLastStale := value.IsStaleNaN(prevFH.Sum)
newlyStale = newlyStale && !isLastStale
staleToNonStale = isLastStale && !value.IsStaleNaN(s.FH.Sum)
}
}
// TODO(krajorama,ywwg): pass ST when available in WAL.
ok, chunkCreated = series.appendFloatHistogram(0, s.T, s.FH, a.appendID, acc.appendChunkOpts)
Expand Down Expand Up @@ -1852,10 +1862,6 @@ func (s *memSeries) append(st, t int64, v float64, appendID uint64, o chunkOpts)

c.maxTime = t

s.lastValue = v
s.lastHistogramValue = nil
s.lastFloatHistogramValue = nil

if appendID > 0 {
s.txs.add(appendID)
}
Expand Down Expand Up @@ -1892,9 +1898,6 @@ func (s *memSeries) appendHistogram(st, t int64, h *histogram.Histogram, appendI

newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, st, t, h, false) // false=request a new chunk if needed

s.lastHistogramValue = h
s.lastFloatHistogramValue = nil

if appendID > 0 {
s.txs.add(appendID)
}
Expand Down Expand Up @@ -1949,9 +1952,6 @@ func (s *memSeries) appendFloatHistogram(st, t int64, fh *histogram.FloatHistogr

newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, st, t, fh, false) // False means request a new chunk if needed.

s.lastHistogramValue = nil
s.lastFloatHistogramValue = fh

if appendID > 0 {
s.txs.add(appendID)
}
Expand Down
5 changes: 1 addition & 4 deletions tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
Expand Down Expand Up @@ -270,9 +269,7 @@ func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.Ser
continue
}

if value.IsStaleNaN(s.lastValue) ||
(s.lastHistogramValue != nil && value.IsStaleNaN(s.lastHistogramValue.Sum)) ||
(s.lastFloatHistogramValue != nil && value.IsStaleNaN(s.lastFloatHistogramValue.Sum)) {
if s.isStaleLastValue() {
series = append(series, s)
}
s.Unlock()
Expand Down
Loading
Loading