8000 kvindexer: porting event sequences and slash parsing to main by jmalicevic · Pull Request #423 · cometbft/cometbft · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

kvindexer: porting event sequences and slash parsing to main #423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[kvindexer]` Forward porting the fixes done to the kvindexer in 0.37 in PR \#77
([\#423](https://github.com/cometbft/cometbft/pull/423))
81 changes: 80 additions & 1 deletion abci/example/kvstore/kvstore.go
8000
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,20 @@ type Application struct {
state State
RetainBlocks int64 // blocks to retain after commit (via ResponseCommit.RetainHeight)
txToRemove map[string]struct{}
// If true, the app will generate block events in BeginBlock. Used to test the event indexer
// Should be false by default to avoid generating too much data.
genBlockEvents bool
}

func NewApplication() *Application {
state := loadState(dbm.NewMemDB())
return &Application{state: state}
}

func (app *Application) SetGenBlockEvents() {
app.genBlockEvents = true
}

func (app *Application) Info(req types.RequestInfo) (resInfo types.ResponseInfo) {
return types.ResponseInfo{
Data: fmt.Sprintf("{\"size\":%v}", app.state.Size),
Expand Down Expand Up @@ -116,6 +123,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli
{Key: "noindex_key", Value: "index is working", Index: false},
},
},
{
Type: "app",
Attributes: []types.EventAttribute{
{Key: "creator", Value: "Cosmoshi", Index: true},
{Key: "key", Value: value, Index: true},
{Key: "index_key", Value: "index is working", Index: true},
{Key: "noindex_key", Value: "index is working", Index: false},
},
},
}

return types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events}
Expand Down Expand Up @@ -189,7 +205,70 @@ func (app *Application) Query(reqQuery types.RequestQuery) (resQuery types.Respo

func (app *Application) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginBlock {
app.txToRemove = map[string]struct{}{}
return types.ResponseBeginBlock{}
response := types.ResponseBeginBlock{}

if !app.genBlockEvents {
return response
}

if app.state.Height%2 == 0 {
response = types.ResponseBeginBlock{
Events: []types.Event{
{
Type: "begin_event",
Attributes: []types.EventAttribute{
{
Key: "foo",
Value: "100",
Index: true,
},
{
Key: "bar",
Value: "200",
Index: true,
},
},
},
{
Type: "begin_event",
Attributes: []types.EventAttribute{
{
Key: "foo",
Value: "200",
Index: true,
},
{
Key: "bar",
Value: "300",
Index: true,
},
},
},
},
}
} else {
response = types.ResponseBeginBlock{
Events: []types.Event{
{
Type: "begin_event",
Attributes: []types.EventAttribute{
{
Key: "foo",
Value: "400",
Index: true,
},
{
Key: "bar",
Value: "300",
Index: true,
},
},
},
},
}
}

return response
}

func (app *Application) ProcessProposal(
Expand Down
4 changes: 4 additions & 0 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func NewPersistentKVStoreApplication(dbDir string) *PersistentKVStoreApplication
}
}

func (app *PersistentKVStoreApplication) SetGenBlockEvents() {
app.app.genBlockEvents = true
}

func (app *PersistentKVStoreApplication) SetLogger(l log.Logger) {
app.logger = l
}
Expand Down
80 changes: 73 additions & 7 deletions docs/app-dev/indexing-transactions.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ the block itself is never stored.
Each event contains a type and a list of attributes, which are key-value pairs
denoting something about what happened during the method's execution. For more
details on `Events`, see the
< 6D40 span class='blob-code-inner blob-code-marker ' data-code-marker="+">
[ABCI](https://github.com/cometbft/cometbft/blob/main/spec/abci/abci++_basic_concepts.md#events)

documentation.

An `Event` has a composite key associated with it. A `compositeKey` is
Expand All @@ -34,6 +36,9 @@ would be equal to the composite key of `jack.account.number`.
By default, CometBFT will index all transactions by their respective hashes
and height and blocks by their height.

CometBFT allows for different events within the same height to have
equal attributes.

## Configuration

Operators can configure indexing via the `[tx_index]` section. The `indexer`
Expand Down Expand Up @@ -67,6 +72,60 @@ for block and transaction events directly against CometBFT's RPC. However, the
query syntax is limited and so this indexer type might be deprecated or removed
entirely in the future.

**Implementation and data layout**

The kv indexer stores each attribute of an event individually, by creating a composite key
with
- event type,
- attribute key,
- attribute value,
- event generator (e.g. `EndBlock` and `BeginBlock`)
- the height, and
- event counter.
For example the following events:

```
Type: "transfer",
Attributes: []abci.EventAttribute{
{Key: "sender", Value: "Bob", Index: true},
{Key: "recipient", Value: "Alice", Index: true},
{Key: "balance", Value: "100", Index: true},
{Key: "note", Value: "nothing", Index: true},
},

```

```
Type: "transfer",
Attributes: []abci.EventAttribute{
{Key: "sender", Value: "Tom", Index: true},
{Key: "recipient", Value: "Alice", Index: true},
{Key: "balance", Value: "200", Index: true},
{Key: "note", Value: "nothing", Index: true},
},
```

will be represented as follows in the store, assuming these events result from the EndBlock call for height 1:

```
Key value
---- event1 ------
transferSenderBobEndBlock11 1
transferRecipientAliceEndBlock11 1
transferBalance100EndBlock11 1
transferNodeNothingEndblock11 1
---- event2 ------
transferSenderTomEndBlock12 1
transferRecepientAliceEndBlock12 1
transferBalance200EndBlock12 1
transferNodeNothingEndblock12 1

```
The event number is a local variable kept by the indexer and incremented when a new event is processed.
It is an `int64` variable and has no other semantics besides being used to associate attributes belonging to the same events within a height.
This variable is not atomically incremented as event indexing is deterministic. **Should this ever change**, the event id generation
will be broken.

#### PostgreSQL

The `psql` indexer type allows an operator to enable block and transaction event
Expand Down Expand Up @@ -122,10 +181,10 @@ func (app *KVStoreApplication) DeliverTx(req types.RequestDeliverTx) types.Resul
{
Type: "transfer",
Attributes: []abci.EventAttribute{
{Key: []byte("sender"), Value: []byte("Bob"), Index: true},
{Key: []byte("recipient"), Value: []byte("Alice"), Index: true},
{Key: []byte("balance"), Value: []byte("100"), Index: true},
{Key: []byte("note"), Value: []byte("nothing"), Index: true},
{Key: "sender ", Value: "Bob ", Index: true},
{Key: "recipient ", Value: "Alice ", Index: true},
{Key: "balance ", Value: "100 ", Index: true},
{Key: "note ", Value: "nothing ", Index: true},
},
},
}
Expand Down Expand Up @@ -168,7 +227,7 @@ a query to `/subscribe` RPC endpoint.
Check out [API docs](https://docs.cometbft.com/main/rpc/#subscribe) for more information
on query syntax and other options.

## Querying F438 Blocks Events
## Querying Block Events

You can query for a paginated set of blocks by their events by calling the
`/block_search` RPC endpoint:
Expand All @@ -177,5 +236,12 @@ You can query for a paginated set of blocks by their events by calling the
curl "localhost:26657/block_search?query=\"block.height > 10 AND val_set.num_changed > 0\""
```

Check out [API docs](https://docs.cometbft.com/main/rpc/#/Info/block_search)
for more information on query syntax and other options.

Storing the event sequence was introduced in CometBFT 0.34.26. Before that, up until Tendermint Core 0.34.26,
the event sequence was not stored in the kvstore and events were stored only by height. That means that queries
returned blocks and transactions whose event attributes match within the height but can match across different
events on that height.
This behavior was fixed with CometBFT 0.34.26+. However, if the data was indexed with earlier versions of
Tendermint Core and not re-indexed, that data will be queried as if all the attributes within a height
occurred within the same event.

2 changes: 2 additions & 0 deletions rpc/client/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ func TestMain(m *testing.M) {
}

app := kvstore.NewPersistentKVStoreApplication(dir)
// If testing block event generation
// app.SetGenBlockEvents() needs to be called here
node = rpctest.StartTendermint(app)

code := m.Run()
Expand Down
30 changes: 26 additions & 4 deletions rpc/client/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,27 @@ func TestTxSearchWithTimeout(t *testing.T) {
require.Greater(t, len(result.Txs), 0, "expected a lot of transactions")
}

// This test does nothing if we do not call app.SetGenBlockEvents() within main_test.go
// It will nevertheless pass as there are no events being generated.
func TestBlockSearch(t *testing.T) {
c := getHTTPClient()

// first we broadcast a few txs
for i := 0; i < 10; i++ {
_, _, tx := MakeTxKV()

_, err := c.BroadcastTxCommit(context.Background(), tx)
require.NoError(t, err)
}
require.NoError(t, client.WaitForHeight(c, 5, nil))
// This cannot test match_events as it calls the client BlockSearch function directly
// It is the RPC request handler that processes the match_event
result, err := c.BlockSearch(context.Background(), "begin_event.foo = 100 AND begin_event.bar = 300", nil, nil, "asc")
require.NoError(t, err)
blockCount := len(result.Blocks)
require.Equal(t, blockCount, 0)

}
func TestTxSearch(t *testing.T) {
c := getHTTPClient()

Expand All @@ -536,8 +557,7 @@ func TestTxSearch(t *testing.T) {
find := result.Txs[len(result.Txs)-1]
anotherTxHash := types.Tx("a different tx").Hash()

for i, c := range GetClients() {
t.Logf("client %d", i)
for _, c := range GetClients() {

// now we query for the tx.
result, err := c.TxSearch(context.Background(), fmt.Sprintf("tx.hash='%v'", find.Hash), true, nil, nil, "asc")
Expand Down Expand Up @@ -616,16 +636,17 @@ func TestTxSearch(t *testing.T) {
pages = int(math.Ceil(float64(txCount) / float64(perPage)))
)

totalTx := 0
for page := 1; page <= pages; page++ {
page := page
result, err := c.TxSearch(context.Background(), "tx.height >= 1", false, &page, &perPage, "asc")
result, err := c.TxSearch(context.Background(), "tx.height >= 1", true, &page, &perPage, "asc")
require.NoError(t, err)
if page < pages {
require.Len(t, result.Txs, perPage)
} else {
require.LessOrEqual(t, len(result.Txs), perPage)
}
require.Equal(t, txCount, result.TotalCount)
totalTx = totalTx + len(result.Txs)
for _, tx := range result.Txs {
require.False(t, seen[tx.Height],
"Found duplicate height %v in page %v", tx.Height, page)
Expand All @@ -635,6 +656,7 @@ func TestTxSearch(t *testing.T) {
maxHeight = tx.Height
}
}
require.Equal(t, txCount, totalTx)
require.Len(t, seen, txCount)
}
}
Expand Down
Loading
0