8000 ingest-storage: Use pooled timeseries slices when deserializing by alexweav · Pull Request #11393 · grafana/mimir · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

ingest-storage: Use pooled timeseries slices when deserializing #11393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/mimirpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,13 @@ func ReuseSlice(ts []PreallocTimeseries) {
ReusePreallocTimeseries(&ts[i])
}

ReuseTimeseriesSliceDangerous(ts)
}

// ReuseTimeseriesSliceDangerous reuses the slice of timeseries, but not its contents.
// Only use this if you have another means of reusing the individual timeseries contained within.
// Most times, you want to use ReuseSlice instead.
func ReuseTimeseriesSliceDangerous(ts []PreallocTimeseries) {
preallocTimeseriesSlicePool.Put(ts[:0])
}

Expand Down
20 changes: 13 additions & 7 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,12 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) (returnEr
}

parsed := parsedRecord{
ctx: r.ctx,
tenantID: r.tenantID,
WriteRequest: &mimirpb.WriteRequest{},
index: index,
ctx: r.ctx,
tenantID: r.tenantID,
WriteRequest: &mimirpb.WriteRequest{
Timeseries: mimirpb.PreallocTimeseriesSliceFromPool(),
},
index: index,
}

if r.version > LatestRecordVersion {
Expand Down Expand Up @@ -397,15 +399,19 @@ func labelAdaptersHash(b []byte, ls []mimirpb.LabelAdapter) ([]byte, uint64) {
// PushToStorage aborts the request if it encounters an error.
func (p *parallelStorageShards) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error {
hashBuf := make([]byte, 0, 1024)
for _, ts := range request.Timeseries {
for i := range request.Timeseries {
var shard uint64
hashBuf, shard = labelAdaptersHash(hashBuf, ts.Labels)
hashBuf, shard = labelAdaptersHash(hashBuf, request.Timeseries[i].Labels)
shard = shard % uint64(p.numShards)

if err := p.shards[shard].AddToBatch(ctx, request.Source, ts); err != nil {
if err := p.shards[shard].AddToBatch(ctx, request.Source, request.Timeseries[i]); err != nil {
return fmt.Errorf("encountered a non-client error when ingesting; this error was for a previous write request for the same tenant: %w", err)
}
// We're transferring ownership of the timeseries to the batch, clear the slice as we go so we can reuse it.
request.Timeseries[i] = mimirpb.PreallocTimeseries{}
}
// The slice no longer owns any timeseries, so we can re-use it.
mimirpb.ReuseTimeseriesSliceDangerous(request.Timeseries)

// Push metadata to every shard in a round-robin fashion.
// Start from a random shard to avoid hotspots in the first few shards when there are not many metadata pieces in each request.
Expand Down
58 changes: 55 additions & 3 deletions pkg/storage/ingest/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1284,14 +1284,17 @@ func TestParallelStoragePusher_Fuzzy(t *testing.T) {

req := generateWriteRequest(batchID, numSeriesPerWriteRequest)

// We need this for later, but psp's PushToStorage destroys the request by freeing resources.
requestSeriesLabelValues := []string{}
for _, series := range req.Timeseries {
requestSeriesLabelValues = append(requestSeriesLabelValues, series.Labels[0].Value)
}
if err := psp.PushToStorage(ctx, req); err == nil {
enqueuedTimeSeriesReqs++

// Keep track of the enqueued series. We don't keep track of it if there was an error because, in case
// of an error, only some series may been added to a batch (it breaks on the first failed "append to batch").
for _, series := range req.Timeseries {
enqueuedTimeSeriesPerTenant[tenantID] = append(enqueuedTimeSeriesPerTenant[tenantID], series.Labels[0].Value)
}
enqueuedTimeSeriesPerTenant[tenantID] = append(enqueuedTimeSeriesPerTenant[tenantID], requestSeriesLabelValues...)
} else {
// We received an error. Make sure a server error was reported by the upstream pusher.
require.Greater(t, serverErrsCount.Load(), int64(0))
Expand Down Expand Up @@ -1604,3 +1607,52 @@ func setupQueue(t *testing.T, capacity, batchSize int, series []mimirpb.Prealloc

return queue
}

func BenchmarkPusherConsumer(b *testing.B) {
pusher := pusherFunc(func(ctx context.Context, request *mimirpb.WriteRequest) error {
mimirpb.ReuseSlice(request.Timeseries)
return nil
})

records := make([]record, 50)
for i := range records {
wr := &mimirpb.WriteRequest{Timeseries: make([]mimirpb.PreallocTimeseries, 100)}
for j := range len(wr.Timeseries) {
wr.Timeseries[j] = mockPreallocTimeseries(fmt.Sprintf("series_%d", i))
}
content, err := wr.Marshal()
require.NoError(b, err)
records[i].content = content
records[i].tenantID = "user-1"
records[i].version = 1
records[i].ctx = context.Background()
}

b.Run("sequential pusher", func(b *testing.B) {
kcfg := KafkaConfig{}
flagext.DefaultValues(&kcfg)
kcfg.IngestionConcurrencyMax = 0
metrics := newPusherConsumerMetrics(prometheus.NewPedanticRegistry())
c := newPusherConsumer(pusher, kcfg, metrics, log.NewNopLogger())
b.ResetTimer()

for range b.N {
err := c.Consume(context.Background(), records)
require.NoError(b, err)
}
})

b.Run("parallel pusher", func(b *testing.B) {
kcfg := KafkaConfig{}
flagext.DefaultValues(&kcfg)
kcfg.IngestionConcurrencyMax = 2
metrics := newPusherConsumerMetrics(prometheus.NewPedanticRegistry())
c := newPusherConsumer(pusher, kcfg, metrics, log.NewNopLogger())
b.ResetTimer()

for range b.N {
err := c.Consume(context.Background(), records)
require.NoError(b, err)
}
})
}
0