diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 3a6f720b7a0..1c4242f25eb 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -7120,8 +7120,8 @@ func newMockIngesterPusherAdapter(ingester *mockIngester) *mockIngesterPusherAda } } -// PushToStorage implements ingest.Pusher. -func (c *mockIngesterPusherAdapter) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) error { +// PushToStorageAndReleaseRequest implements ingest.Pusher. +func (c *mockIngesterPusherAdapter) PushToStorageAndReleaseRequest(ctx context.Context, req *mimirpb.WriteRequest) error { _, err := c.ingester.Push(ctx, req) return err } diff --git a/pkg/ingester/circuitbreaker_test.go b/pkg/ingester/circuitbreaker_test.go index 64ac841408b..7b0bf5bb33b 100644 --- a/pkg/ingester/circuitbreaker_test.go +++ b/pkg/ingester/circuitbreaker_test.go @@ -545,7 +545,7 @@ func TestCircuitBreaker_FinishRequest(t *testing.T) { } } -func TestIngester_IngestStorage_PushToStorage_CircuitBreaker(t *testing.T) { +func TestIngester_IngestStorage_PushToStorageAndReleaseRequest_CircuitBreaker(t *testing.T) { pushTimeout := 100 * time.Millisecond tests := map[string]struct { expectedErrorWhenCircuitBreakerClosed error @@ -632,7 +632,7 @@ func TestIngester_IngestStorage_PushToStorage_CircuitBreaker(t *testing.T) { nil, mimirpb.API, ) - err := i.PushToStorage(ctx, req) + err := i.PushToStorageAndReleaseRequest(ctx, req) require.NoError(t, err) count := 0 @@ -660,7 +660,7 @@ func TestIngester_IngestStorage_PushToStorage_CircuitBreaker(t *testing.T) { ctx := user.InjectOrgID(context.Background(), userID) count++ i.circuitBreaker.push.testRequestDelay = testCase.pushRequestDelay - err := i.PushToStorage(ctx, req) + err := i.PushToStorageAndReleaseRequest(ctx, req) if initialDelayEnabled { if testCase.expectedErrorWhenCircuitBreakerClosed != nil { require.ErrorAs(t, err, &testCase.expectedErrorWhenCircuitBreakerClosed) @@ -973,7 +973,7 @@ func TestIngester_Push_CircuitBreaker_DeadlineExceeded(t *testing.T) { ) ctx, err = i.StartPushRequest(ctx, int64(req.Size())) require.NoError(t, err) - err = i.PushToStorage(ctx, req) + err = i.PushToStorageAndReleaseRequest(ctx, req) require.NoError(t, err) i.FinishPushRequest(ctx) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index a34a8d0f834..9f77802928c 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -3976,8 +3976,8 @@ func (i *Ingester) checkAvailableForPush() error { return newUnavailableError(ingesterState) } -// PushToStorage implements ingest.Pusher interface for ingestion via ingest-storage. -func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) error { +// PushToStorageAndReleaseRequest implements ingest.Pusher interface for ingestion via ingest-storage. +func (i *Ingester) PushToStorageAndReleaseRequest(ctx context.Context, req *mimirpb.WriteRequest) error { err := i.PushWithCleanup(ctx, req, func() { req.FreeBuffer() mimirpb.ReuseSlice(req.Timeseries) @@ -3994,7 +3994,7 @@ func (i *Ingester) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirp return nil, errPushGrpcDisabled } - err := i.PushToStorage(ctx, req) + err := i.PushToStorageAndReleaseRequest(ctx, req) if err != nil { return nil, err } diff --git a/pkg/mimirpb/timeseries.go b/pkg/mimirpb/timeseries.go index 21cfeb4ba3f..69bf84f4751 100644 --- a/pkg/mimirpb/timeseries.go +++ b/pkg/mimirpb/timeseries.go @@ -515,6 +515,13 @@ func ReuseSlice(ts []PreallocTimeseries) { ReusePreallocTimeseries(&ts[i]) } + ReuseSliceOnly(ts) +} + +// ReuseSliceOnly reuses the slice of timeseries, but not its contents. +// Only use this if you have another means of reusing the individual timeseries contained within. +// Most times, you want to use ReuseSlice instead. +func ReuseSliceOnly(ts []PreallocTimeseries) { preallocTimeseriesSlicePool.Put(ts[:0]) } diff --git a/pkg/storage/ingest/pusher.go b/pkg/storage/ingest/pusher.go index 3fdf5b40645..7cec3108f90 100644 --- a/pkg/storage/ingest/pusher.go +++ b/pkg/storage/ingest/pusher.go @@ -24,12 +24,12 @@ import ( ) type Pusher interface { - PushToStorage(context.Context, *mimirpb.WriteRequest) error + PushToStorageAndReleaseRequest(context.Context, *mimirpb.WriteRequest) error } type PusherCloser interface { - // PushToStorage pushes the write request to the storage. - PushToStorage(context.Context, *mimirpb.WriteRequest) error + // PushToStorageAndReleaseRequest pushes the write request to the storage. + PushToStorageAndReleaseRequest(context.Context, *mimirpb.WriteRequest) error // Close tells the PusherCloser that no more records are coming and it should flush any remaining records. Close() []error } @@ -66,7 +66,7 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) (returnEr }(time.Now()) type parsedRecord struct { - *mimirpb.WriteRequest + *mimirpb.PreallocWriteRequest // ctx holds the tracing baggage for this record/request. ctx context.Context tenantID string @@ -93,10 +93,10 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) (returnEr } parsed := parsedRecord{ - ctx: r.ctx, - tenantID: r.tenantID, - WriteRequest: &mimirpb.WriteRequest{}, - index: index, + ctx: r.ctx, + tenantID: r.tenantID, + PreallocWriteRequest: &mimirpb.PreallocWriteRequest{}, + index: index, } if r.version > LatestRecordVersion { @@ -146,7 +146,7 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) (returnEr } // If we get an error at any point, we need to stop processing the records. They will be retried at some point. - err := c.pushToStorage(r.ctx, r.tenantID, r.WriteRequest, writer) + err := c.pushToStorage(r.ctx, r.tenantID, &r.WriteRequest, writer) if err != nil { cancel(cancellation.NewErrorf("error while pushing to storage")) // Stop the unmarshalling goroutine. return fmt.Errorf("consuming record at index %d for tenant %s: %w", r.index, r.tenantID, err) @@ -183,7 +183,7 @@ func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req // Note that the implementation of the Pusher expects the tenantID to be in the context. ctx = user.InjectOrgID(ctx, tenantID) - err := writer.PushToStorage(ctx, req) + err := writer.PushToStorageAndReleaseRequest(ctx, req) return err } @@ -213,14 +213,14 @@ func newSequentialStoragePusherWithErrorHandler(metrics *storagePusherMetrics, p } } -// PushToStorage implements the PusherCloser interface. -func (ssp sequentialStoragePusher) PushToStorage(ctx context.Context, wr *mimirpb.WriteRequest) error { +// PushToStorageAndReleaseRequest implements the PusherCloser interface. +func (ssp sequentialStoragePusher) PushToStorageAndReleaseRequest(ctx context.Context, wr *mimirpb.WriteRequest) error { ssp.metrics.timeSeriesPerFlush.Observe(float64(len(wr.Timeseries))) defer func(now time.Time) { ssp.metrics.processingTime.WithLabelValues(requestContents(wr)).Observe(time.Since(now).Seconds()) }(time.Now()) - if err := ssp.pusher.PushToStorage(ctx, wr); ssp.errorHandler.IsServerError(ctx, err) { + if err := ssp.pusher.PushToStorageAndReleaseRequest(ctx, wr); ssp.errorHandler.IsServerError(ctx, err) { return err } @@ -270,15 +270,15 @@ func newParallelStoragePusher(metrics *storagePusherMetrics, pusher Pusher, byte } } -// PushToStorage implements the PusherCloser interface. -func (c *parallelStoragePusher) PushToStorage(ctx context.Context, wr *mimirpb.WriteRequest) error { +// PushToStorageAndReleaseRequest implements the PusherCloser interface. +func (c *parallelStoragePusher) PushToStorageAndReleaseRequest(ctx context.Context, wr *mimirpb.WriteRequest) error { userID, err := user.ExtractOrgID(ctx) if err != nil { level.Error(c.logger).Log("msg", "failed to extract tenant ID from context", "err", err) } shards := c.shardsFor(userID, wr.Source) - return shards.PushToStorage(ctx, wr) + return shards.PushToStorageAndReleaseRequest(ctx, wr) } // Close implements the PusherCloser interface. @@ -392,20 +392,26 @@ func labelAdaptersHash(b []byte, ls []mimirpb.LabelAdapter) ([]byte, uint64) { return b, xxhash.Sum64(b) } -// PushToStorage hashes each time series in the write requests and sends them to the appropriate shard which is then handled by the current batchingQueue in that shard. -// PushToStorage ignores SkipLabelNameValidation because that field is only used in the distributor and not in the ingester. -// PushToStorage aborts the request if it encounters an error. -func (p *parallelStorageShards) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error { +// PushToStorageAndReleaseRequest hashes each time series in the write requests and sends them to the appropriate shard which is then handled by the current batchingQueue in that shard. +// PushToStorageAndReleaseRequest ignores SkipLabelNameValidation because that field is only used in the distributor and not in the ingester. +// PushToStorageAndReleaseRequest aborts the request if it encounters an error. +func (p *parallelStorageShards) PushToStorageAndReleaseRequest(ctx context.Context, request *mimirpb.WriteRequest) error { hashBuf := make([]byte, 0, 1024) - for _, ts := range request.Timeseries { + for i := range request.Timeseries { var shard uint64 - hashBuf, shard = labelAdaptersHash(hashBuf, ts.Labels) + hashBuf, shard = labelAdaptersHash(hashBuf, request.Timeseries[i].Labels) shard = shard % uint64(p.numShards) - if err := p.shards[shard].AddToBatch(ctx, request.Source, ts); err != nil { + if err := p.shards[shard].AddToBatch(ctx, request.Source, request.Timeseries[i]); err != nil { return fmt.Errorf("encountered a non-client error when ingesting; this error was for a previous write request for the same tenant: %w", err) } + // We're transferring ownership of the timeseries to the batch, clear the slice as we go so we can reuse it. + request.Timeseries[i] = mimirpb.PreallocTimeseries{} } + // The slice no longer owns any timeseries, so we can re-use it. + // Nil-out the slice to make any use-after-free attempts fail in an obvious way. + mimirpb.ReuseSliceOnly(request.Timeseries) + request.Timeseries = nil // Push metadata to every shard in a round-robin fashion. // Start from a random shard to avoid hotspots in the first few shards when there are not many metadata pieces in each request. @@ -460,12 +466,13 @@ func (p *parallelStorageShards) run(queue *batchingQueue) { p.metrics.batchAge.Observe(time.Since(wr.startedAt).Seconds()) p.metrics.timeSeriesPerFlush.Observe(float64(len(wr.Timeseries))) processingStart := time.Now() + requestContents := requestContents(wr.WriteRequest) - err := p.pusher.PushToStorage(wr.Context, wr.WriteRequest) + err := p.pusher.PushToStorageAndReleaseRequest(wr.Context, wr.WriteRequest) // The error handler needs to determine if this is a server error or not. // If it is, we need to stop processing as the batch will be retried. When is not (client error), it'll log it, and we can continue processing. - p.metrics.processingTime.WithLabelValues(requestContents(wr.WriteRequest)).Observe(time.Since(processingStart).Seconds()) + p.metrics.processingTime.WithLabelValues(requestContents).Observe(time.Since(processingStart).Seconds()) if p.errorHandler.IsServerError(wr.Context, err) { queue.ReportError(err) } diff --git a/pkg/storage/ingest/pusher_test.go b/pkg/storage/ingest/pusher_test.go index a92339b42c2..ba935a11118 100644 --- a/pkg/storage/ingest/pusher_test.go +++ b/pkg/storage/ingest/pusher_test.go @@ -39,7 +39,7 @@ func (p pusherFunc) Close() []error { return nil } -func (p pusherFunc) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error { +func (p pusherFunc) PushToStorageAndReleaseRequest(ctx context.Context, request *mimirpb.WriteRequest) error { return p(ctx, request) } @@ -587,7 +587,7 @@ type mockPusher struct { mock.Mock } -func (m *mockPusher) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error { +func (m *mockPusher) PushToStorageAndReleaseRequest(ctx context.Context, request *mimirpb.WriteRequest) error { args := m.Called(ctx, request) return args.Error(0) } @@ -927,14 +927,14 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) { upstreamPushErrsCount := 0 for i, req := range tc.expectedUpstreamPushes { err := tc.upstreamPushErrs[i] - pusher.On("PushToStorage", mock.Anything, req).Return(err) + pusher.On("PushToStorageAndReleaseRequest", mock.Anything, req).Return(err) if err != nil { upstreamPushErrsCount++ } } var actualPushErrs []error for _, req := range tc.requests { - err := shardingP.PushToStorage(context.Background(), req) + err := shardingP.PushToStorageAndReleaseRequest(context.Background(), req) actualPushErrs = append(actualPushErrs, err) } @@ -957,7 +957,7 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) { } else { require.Empty(t, closeErr) } - pusher.AssertNumberOfCalls(t, "PushToStorage", len(tc.expectedUpstreamPushes)) + pusher.AssertNumberOfCalls(t, "PushToStorageAndReleaseRequest", len(tc.expectedUpstreamPushes)) pusher.AssertExpectations(t) require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` @@ -1094,7 +1094,7 @@ func TestParallelStoragePusher(t *testing.T) { receivedPushes := make(map[string]map[mimirpb.WriteRequest_SourceEnum]int) var receivedPushesMu sync.Mutex - pusher.On("PushToStorage", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + pusher.On("PushToStorageAndReleaseRequest", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { tenantID, err := tenant.TenantID(args.Get(0).(context.Context)) require.NoError(t, err) req := args.Get(1).(*mimirpb.WriteRequest) @@ -1119,7 +1119,7 @@ func TestParallelStoragePusher(t *testing.T) { // Process requests for _, req := range tc.requests { ctx := user.InjectOrgID(context.Background(), req.tenantID) - err := psp.PushToStorage(ctx, req.WriteRequest) + err := psp.PushToStorageAndReleaseRequest(ctx, req.WriteRequest) require.NoError(t, err) } @@ -1284,14 +1284,17 @@ func TestParallelStoragePusher_Fuzzy(t *testing.T) { req := generateWriteRequest(batchID, numSeriesPerWriteRequest) - if err := psp.PushToStorage(ctx, req); err == nil { + // We need this for later, but psp's PushToStorage destroys the request by freeing resources. + requestSeriesLabelValues := []string{} + for _, series := range req.Timeseries { + requestSeriesLabelValues = append(requestSeriesLabelValues, series.Labels[0].Value) + } + if err := psp.PushToStorageAndReleaseRequest(ctx, req); err == nil { enqueuedTimeSeriesReqs++ // Keep track of the enqueued series. We don't keep track of it if there was an error because, in case // of an error, only some series may been added to a batch (it breaks on the first failed "append to batch"). - for _, series := range req.Timeseries { - enqueuedTimeSeriesPerTenant[tenantID] = append(enqueuedTimeSeriesPerTenant[tenantID], series.Labels[0].Value) - } + enqueuedTimeSeriesPerTenant[tenantID] = append(enqueuedTimeSeriesPerTenant[tenantID], requestSeriesLabelValues...) } else { // We received an error. Make sure a server error was reported by the upstream pusher. require.Greater(t, serverErrsCount.Load(), int64(0)) @@ -1604,3 +1607,52 @@ func setupQueue(t *testing.T, capacity, batchSize int, series []mimirpb.Prealloc return queue } + +func BenchmarkPusherConsumer(b *testing.B) { + pusher := pusherFunc(func(ctx context.Context, request *mimirpb.WriteRequest) error { + mimirpb.ReuseSlice(request.Timeseries) + return nil + }) + + records := make([]record, 50) + for i := range records { + wr := &mimirpb.WriteRequest{Timeseries: make([]mimirpb.PreallocTimeseries, 100)} + for j := range len(wr.Timeseries) { + wr.Timeseries[j] = mockPreallocTimeseries(fmt.Sprintf("series_%d", i)) + } + content, err := wr.Marshal() + require.NoError(b, err) + records[i].content = content + records[i].tenantID = "user-1" + records[i].version = 1 + records[i].ctx = context.Background() + } + + b.Run("sequential pusher", func(b *testing.B) { + kcfg := KafkaConfig{} + flagext.DefaultValues(&kcfg) + kcfg.IngestionConcurrencyMax = 0 + metrics := newPusherConsumerMetrics(prometheus.NewPedanticRegistry()) + c := newPusherConsumer(pusher, kcfg, metrics, log.NewNopLogger()) + b.ResetTimer() + + for range b.N { + err := c.Consume(context.Background(), records) + require.NoError(b, err) + } + }) + + b.Run("parallel pusher", func(b *testing.B) { + kcfg := KafkaConfig{} + flagext.DefaultValues(&kcfg) + kcfg.IngestionConcurrencyMax = 2 + metrics := newPusherConsumerMetrics(prometheus.NewPedanticRegistry()) + c := newPusherConsumer(pusher, kcfg, metrics, log.NewNopLogger()) + b.ResetTimer() + + for range b.N { + err := c.Consume(context.Background(), records) + require.NoError(b, err) + } + }) +}