diff --git a/node/node.go b/node/node.go index f2a2034f46e..5705496a693 100644 --- a/node/node.go +++ b/node/node.go @@ -303,7 +303,7 @@ func createAndStartIndexerService( blockIndexer = &blockidxnull.BlockerIndexer{} } - indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus) + indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false) indexerService.SetLogger(logger.With("module", "txindex")) if err := indexerService.Start(); err != nil { diff --git a/state/indexer/sink/psql/psql_test.go b/state/indexer/sink/psql/psql_test.go index 906fb12c2fc..5f4047bc2c2 100644 --- a/state/indexer/sink/psql/psql_test.go +++ b/state/indexer/sink/psql/psql_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" abci "github.com/cometbft/cometbft/abci/types" + "github.com/cometbft/cometbft/state/txindex" "github.com/cometbft/cometbft/types" // Register the Postgres database driver. @@ -196,6 +197,55 @@ func TestIndexing(t *testing.T) { err = indexer.IndexTxEvents([]*abci.TxResult{txResult}) require.NoError(t, err) }) + + t.Run("IndexerService", func(t *testing.T) { + indexer := &EventSink{store: testDB(), chainID: chainID} + + // event bus + eventBus := types.NewEventBus() + err := eventBus.Start() + require.NoError(t, err) + t.Cleanup(func() { + if err := eventBus.Stop(); err != nil { + t.Error(err) + } + }) + + service := txindex.NewIndexerService(indexer.TxIndexer(), indexer.BlockIndexer(), eventBus, true) + err = service.Start() + require.NoError(t, err) + t.Cleanup(func() { + if err := service.Stop(); err != nil { + t.Error(err) + } + }) + + // publish block with txs + err = eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{ + Header: types.Header{Height: 1}, + NumTxs: int64(2), + }) + require.NoError(t, err) + txResult1 := &abci.TxResult{ + Height: 1, + Index: uint32(0), + Tx: types.Tx("foo"), + Result: abci.ResponseDeliverTx{Code: 0}, + } + err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult1}) + require.NoError(t, err) + txResult2 := &abci.TxResult{ + Height: 1, + Index: uint32(1), + Tx: types.Tx("bar"), + Result: abci.ResponseDeliverTx{Code: 1}, + } + err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult2}) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + require.True(t, service.IsRunning()) + }) } func TestStop(t *testing.T) { diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index 765b205f95f..4b0cfb90ce0 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -3,7 +3,6 @@ package txindex import ( "context" - abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/libs/service" "github.com/cometbft/cometbft/state/indexer" "github.com/cometbft/cometbft/types" @@ -20,9 +19,10 @@ const ( type IndexerService struct { service.BaseService - txIdxr TxIndexer - blockIdxr indexer.BlockIndexer - eventBus *types.EventBus + txIdxr TxIndexer + blockIdxr indexer.BlockIndexer + eventBus *types.EventBus + terminateOnError bool } // NewIndexerService returns a new service instance. @@ -30,9 +30,10 @@ func NewIndexerService( txIdxr TxIndexer, blockIdxr indexer.BlockIndexer, eventBus *types.EventBus, + terminateOnError bool, ) *IndexerService { - is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus} + is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus, terminateOnError: terminateOnError} is.BaseService = *service.NewBaseService(nil, "IndexerService", is) return is } @@ -74,24 +75,38 @@ func (is *IndexerService) OnStart() error { "index", txResult.Index, "err", err, ) + + if is.terminateOnError { + if err := is.Stop(); err != nil { + is.Logger.Error("failed to stop", "err", err) + } + return + } } } if err := is.blockIdxr.Index(eventDataHeader); err != nil { is.Logger.Error("failed to index block", "height", height, "err", err) + if is.terminateOnError { + if err := is.Stop(); err != nil { + is.Logger.Error("failed to stop", "err", err) + } + return + } } else { - is.Logger.Info("indexed block", "height", height) - } - - batch.Ops, err = DeduplicateBatch(batch.Ops, is.txIdxr) - if err != nil { - is.Logger.Error("deduplicate batch", "height", height) + is.Logger.Info("indexed block exents", "height", height) } if err = is.txIdxr.AddBatch(batch); err != nil { is.Logger.Error("failed to index block txs", "height", height, "err", err) + if is.terminateOnError { + if err := is.Stop(); err != nil { + is.Logger.Error("failed to stop", "err", err) + } + return + } } else { - is.Logger.Debug("indexed block txs", "height", height, "num_txs", eventDataHeader.NumTxs) + is.Logger.Debug("indexed transactions", "height", height, "num_txs", eventDataHeader.NumTxs) } } }() @@ -104,45 +119,3 @@ func (is *IndexerService) OnStop() { _ = is.eventBus.UnsubscribeAll(context.Background(), subscriber) } } - -// DeduplicateBatch consider the case of duplicate txs. -// if the current one under investigation is NOT OK, then we need to check -// whether there's a previously indexed tx. -// SKIP the current tx if the previously indexed record is found and successful. -func DeduplicateBatch(ops []*abci.TxResult, txIdxr TxIndexer) ([]*abci.TxResult, error) { - result := make([]*abci.TxResult, 0, len(ops)) - - // keep track of successful txs in this block in order to suppress latter ones being indexed. - var successfulTxsInThisBlock = make(map[string]struct{}) - - for _, txResult := range ops { - hash := types.Tx(txResult.Tx).Hash() - - if txResult.Result.IsOK() { - successfulTxsInThisBlock[string(hash)] = struct{}{} - } else { - // if it already appeared in current block and was successful, skip. - if _, found := successfulTxsInThisBlock[string(hash)]; found { - continue - } - - // check if this tx hash is already indexed - old, err := txIdxr.Get(hash) - - // if db op errored - // Not found is not an error - if err != nil { - return nil, err - } - - // if it's already indexed in an older block and was successful, skip. - if old != nil && old.Result.Code == abci.CodeTypeOK { - continue - } - } - - result = append(result, txResult) - } - - return result, nil -} diff --git a/state/txindex/indexer_service_test.go b/state/txindex/indexer_service_test.go index 7cf93154982..0b509368d03 100644 --- a/state/txindex/indexer_service_test.go +++ b/state/txindex/indexer_service_test.go @@ -33,7 +33,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { txIndexer := kv.NewTxIndex(store) blockIndexer := blockidxkv.New(db.NewPrefixDB(store, []byte("block_events"))) - service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus) + service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false) service.SetLogger(log.TestingLogger()) err = service.Start() require.NoError(t, err) @@ -80,164 +80,3 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { require.NoError(t, err) require.Equal(t, txResult2, res) } - -func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) { - var mockTx = types.Tx("MOCK_TX_HASH") - - testCases := []struct { - name string - tx1 abci.TxResult - tx2 abci.TxResult - expSkip bool // do we expect the second tx to be skipped by tx indexer - }{ - {"skip, previously successful", - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK, - }, - }, - abci.TxResult{ - Height: 2, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK + 1, - }, - }, - true, - }, - {"not skip, previously unsuccessful", - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK + 1, - }, - }, - abci.TxResult{ - Height: 2, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK + 1, - }, - }, - false, - }, - {"not skip, both successful", - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK, - }, - }, - abci.TxResult{ - Height: 2, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK, - }, - }, - false, - }, - {"not skip, both unsuccessful", - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK + 1, - }, - }, - abci.TxResult{ - Height: 2, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK + 1, - }, - }, - false, - }, - {"skip, same block, previously successful", - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK, - }, - }, - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK + 1, - }, - }, - true, - }, - {"not skip, same block, previously unsuccessful", - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK + 1, - }, - }, - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK, - }, - }, - false, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - indexer := kv.NewTxIndex(db.NewMemDB()) - - if tc.tx1.Height != tc.tx2.Height { - // index the first tx - err := indexer.AddBatch(&txindex.Batch{ - Ops: []*abci.TxResult{&tc.tx1}, - }) - require.NoError(t, err) - - // check if the second one should be skipped. - ops, err := txindex.DeduplicateBatch([]*abci.TxResult{&tc.tx2}, indexer) - require.NoError(t, err) - - if tc.expSkip { - require.Empty(t, ops) - } else { - require.Equal(t, []*abci.TxResult{&tc.tx2}, ops) - } - } else { - // same block - ops := []*abci.TxResult{&tc.tx1, &tc.tx2} - ops, err := txindex.DeduplicateBatch(ops, indexer) - require.NoError(t, err) - if tc.expSkip { - // the second one is skipped - require.Equal(t, []*abci.TxResult{&tc.tx1}, ops) - } else { - require.Equal(t, []*abci.TxResult{&tc.tx1, &tc.tx2}, ops) - } - } - }) - } -} diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index e8c3d2e4649..07db5c7a9a4 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -102,12 +102,30 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error { // that indexed from the tx's events is a composite of the event type and the // respective attribute's key delimited by a "." (eg. "account.number"). // Any event with an empty type is not indexed. +// +// If a transaction is indexed with the same hash as a previous transaction, it will +// be overwritten unless the tx result was NOT OK and the prior result was OK i.e. +// more transactions that successfully executed overwrite transactions that failed +// or successful yet older transactions. func (txi *TxIndex) Index(result *abci.TxResult) error { b := txi.store.NewBatch() defer b.Close() hash := types.Tx(result.Tx).Hash() + if !result.Result.IsOK() { + oldResult, err := txi.Get(hash) + if err != nil { + return err + } + + // if the new transaction failed and it's already indexed in an older block and was successful + // we skip it as we want users to get the older successful transaction when they query. + if oldResult != nil && oldResult.Result.Code == abci.CodeTypeOK { + return nil + } + } + // index tx by events err := txi.indexEvents(result, hash, b) if err != nil { diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index 4172f3798d3..96d765fc38c 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -258,6 +258,103 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) { } } +func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) { + var mockTx = types.Tx("MOCK_TX_HASH") + + testCases := []struct { + name string + tx1 *abci.TxResult + tx2 *abci.TxResult + expOverwrite bool // do we expect the second tx to overwrite the first tx + }{ + { + "don't overwrite as a non-zero code was returned and the previous tx was successful", + &abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + &abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + false, + }, + { + "overwrite as the previous tx was also unsuccessful", + &abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + &abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + true, + }, + { + "overwrite as the most recent tx was successful", + &abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + &abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + true, + }, + } + + hash := mockTx.Hash() + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + indexer := NewTxIndex(db.NewMemDB()) + + // index the first tx + err := indexer.Index(tc.tx1) + require.NoError(t, err) + + // index the same tx with different results + err = indexer.Index(tc.tx2) + require.NoError(t, err) + + res, err := indexer.Get(hash) + require.NoError(t, err) + + if tc.expOverwrite { + require.Equal(t, tc.tx2, res) + } else { + require.Equal(t, tc.tx1, res) + } + }) + } +} + func TestTxSearchMultipleTxs(t *testing.T) { indexer := NewTxIndex(db.NewMemDB())