8000 perf(internal/state): avoid double-saving FinalizeBlockResponse by melekes · Pull Request #2017 · cometbft/cometbft · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

perf(internal/state): avoid double-saving FinalizeBlockResponse #2017

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[state]` avoid double-saving `FinalizeBlockResponse` for performance reasons
([\#2017](https://github.com/cometbft/cometbft/pull/2017))
96 changes: 47 additions & 49 deletions internal/state/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func calcABCIResponsesKey(height int64) []byte {
//----------------------

var (
lastABCIResponseKey = []byte("lastABCIResponseKey")
lastABCIResponseKey = []byte("lastABCIResponseKey") // DEPRECATED
lastABCIResponsesRetainHeightKey = []byte("lastABCIResponsesRetainHeight")
offlineStateSyncHeight = []byte("offlineStateSyncHeightKey")
)
Expand Down Expand Up @@ -535,7 +535,7 @@ func TxResultsHash(txResults []*abci.ExecTxResult) []byte {
}

// LoadFinalizeBlockResponse loads the DiscardABCIResponses for the given height from the
// database. If the node has D set to true, ErrABCIResponsesNotPersisted
// database. If the node has D set to true, ErrFinalizeBlockResponsesNotPersisted
// is persisted. If not found, ErrNoABCIResponsesForHeight is returned.
func (store dbStore) LoadFinalizeBlockResponse(height int64) (*abci.FinalizeBlockResponse, error) {
if store.DiscardABCIResponses {
Expand All @@ -562,7 +562,6 @@ func (store dbStore) LoadFinalizeBlockResponse(height int64) (*abci.FinalizeBloc
legacyResp := new(cmtstate.LegacyABCIResponses)
rerr := legacyResp.Unmarshal(buf)
if rerr != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
cmtos.Exit(fmt.Sprintf(`LoadFinalizeBlockResponse: Data has been corrupted or its spec has
changed: %v\n`, err))
}
Expand All @@ -585,39 +584,44 @@ func (store dbStore) LoadFinalizeBlockResponse(height int64) (*abci.FinalizeBloc
// method on the application but crashed before persisting the results.
func (store dbStore) LoadLastFinalizeBlockResponse(height int64) (*abci.FinalizeBlockResponse, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the crash scenario fully. And this is the first time I am looking at this code. With the new change if height is smaller than the height of the lastABCIResponseKey it will return stored results (or not found if DiscardABCIResponses was true). Maybe this is not possible in the crash scenario. If that's the case, could you maybe document this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new change if height is smaller than the height of the lastABCIResponseKey

This is not possible IMHO. I.e. if we know we crashed at height 5, then we'll try to restore last responses for this height.

start := time.Now()
bz, err := store.db.Get(lastABCIResponseKey)
buf, err := store.db.Get(calcABCIResponsesKey(height))
if err != nil {
return nil, err
}
addTimeSample(store.StoreOptions.Metrics.StoreAccessDurationSeconds.With("method", "load_last_abci_response"), start)()
if len(bz) == 0 {
return nil, errors.New("no last ABCI response has been persisted")
if len(buf) == 0 {
// DEPRECATED lastABCIResponseKey
// It is possible if this is called directly after an upgrade that
// `lastABCIResponseKey` contains the last ABCI responses.
bz, err := store.db.Get(lastABCIResponseKey)
if err == nil && len(bz) > 0 {
info := new(cmtstate.ABCIResponsesInfo)
err = info.Unmarshal(bz)
if err != nil {
cmtos.Exit(fmt.Sprintf(`LoadLastFinalizeBlockResponse: Data has been corrupted or its spec has changed: %v\n`, err))
}
// Here we validate the result by comparing its height to the expected height.
if height != info.GetHeight() {
return nil, fmt.Errorf("expected height %d but last stored abci responses was at height %d", height, info.GetHeight())
}
if info.FinalizeBlock == nil {
// sanity check
if info.LegacyAbciResponses == nil {
panic("state store contains last abci response but it is empty")
}
return responseFinalizeBlockFromLegacy(info.LegacyAbciResponses), nil
}
return info.FinalizeBlock, nil
}
// END OF DEPRECATED lastABCIResponseKey
return nil, fmt.Errorf("expected last ABCI responses at height %d, but none are found", height)
}

info := new(cmtstate.ABCIResponsesInfo)
err = info.Unmarshal(bz)
resp := new(abci.FinalizeBlockResponse)
err = resp.Unmarshal(buf)
if err != nil {
cmtos.Exit(fmt.Sprintf(`LoadLastFinalizeBlockResponse: Data has been corrupted or its spec has
changed: %v\n`, err))
}

// Here we validate the result by comparing its height to the expected height.
if height != info.GetHeight() {
return nil, fmt.Errorf("expected height %d but last stored abci responses was at height %d", height, info.GetHeight())
}

// It is possible if this is called directly after an upgrade that
// FinalizeBlockResponse is nil. In which case we use the legacy
// ABCI responses
if info.FinalizeBlock == nil {
// sanity check
if info.LegacyAbciResponses == nil {
panic("state store contains last abci response but it is empty")
}
return responseFinalizeBlockFromLegacy(info.LegacyAbciResponses), nil
cmtos.Exit(fmt.Sprintf(`LoadLastFinalizeBlockResponse: Data has been corrupted or its spec has changed: %v\n`, err))
}

return info.FinalizeBlock, nil
return resp, nil
}

// SaveFinalizeBlockResponse persists the FinalizeBlockResponse to the database.
Expand All @@ -636,32 +640,26 @@ func (store dbStore) SaveFinalizeBlockResponse(height int64, resp *abci.Finalize
}
resp.TxResults = dtxs

// If the flag is false then we save the ABCIResponse. This can be used for the /BlockResults
// query or to reindex an event using the command line.
if !store.DiscardABCIResponses {
bz, err := resp.Marshal()
if err != nil {
return err
}
start := time.Now()
if err := store.db.Set(calcABCIResponsesKey(height), bz); err != nil {
return err
}
addTimeSample(store.StoreOptions.Metrics.StoreAccessDurationSeconds.With("method", "save_abci_responses"), start)()
bz, err := resp.Marshal()
if err != nil {
return err
}

// Save the ABCI response.
//
// We always save the last ABCI response for crash recovery.
// This overwrites the previous saved ABCI Response.
response := &cmtstate.ABCIResponsesInfo{
FinalizeBlock: resp,
Height: height,
// If `store.DiscardABCIResponses` is true, then we delete the previous ABCI response.
start := time.Now()
if store.DiscardABCIResponses && height > 1 {
if err := store.db.Delete(calcABCIResponsesKey(height - 1)); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

< 8000 /div>

While we do not double save a lot of data, we do have 2 DB operations here. Note that if compaction does not work as expected, this might still be blowing up state.db even though we are deleting data.

If DiscardABCIResponses is true then we could potentially just save everything under LastABCIResponsesKey.
So we have only one key in the DB and don't risk having potentially a big DB due to lack of compaction kicking in.

The changes could look something like this:

func  (store dbStore) LoadLastFinalizeBlockResponse(height int64) (*abci.FinalizeBlockResponse, error) {

.....
var bz
if store.DiscareABCIResponse == true {

    bz, err := store.db.Get(lastABCIResponseKey)
    
 } else {

    bz, err := store.db.Get(calcABCIResponseKey(height))
}
// Perform checks on height

....

Then in

func (store dbStore) SaveFinalizeBlockResponse(height int64, resp*abci.FinalizeBlockResponse) error {

...

var resp 
if store.DiscardABCIResponses == true {
      store.db.SetSync(lastABCIResponseKey, bz)
} else {

  store.db.SetSync(calcLastABCIResponseKey(height), bz)
}

The only potential problem I see is if this setting (DiscardABCIResponses) can be changed between node boot-ups so there might be some additional checks to be added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we do not double save a lot of data, we do have 2 DB operations here. Note that if compaction does not work as expected, this might still be blowing up state.db even though we are deleting data.

Good point 👍 under what conditions compaction does not work as expected?

The only potential problem I see is if this setting (DiscardABCIResponses) can be changed between node boot-ups so there might be some additional checks to be added.

This is a valid concern.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we do not double save a lot of data, we do have 2 DB operations here. Note that if compaction does not work as expected, this might still be blowing up state.db even though we are deleting data.

Good point 👍 under what conditions compaction does not work as expected?

At the moment (without the changes we plan to introduce with in-process compaction, there is no guarantee that golevleDB will actually compact the storage, the same way it has not been doing so previously for blocks. . This is worth testing out in this case though. If the problem exists, then calling CompactRange after a couple of these delete operations would solve the storage footprint problem.

The only potential problem I see is if this setting (DiscardABCIResponses) can be changed between node boot-ups so there might be some additional checks to be added.

This is a valid concern.

I am not sure this is a problem in your version of the code, only in what I propose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@melekes , what could be worth running is a small testnet on this branch and monitor the storage usage. Even on one node.

return err
}
}
bz, err := response.Marshal()
if err != nil {
if err := store.db.SetSync(calcABCIResponsesKey(height), bz); err != nil {
return err
}

return store.db.SetSync(lastABCIResponseKey, bz)
addTimeSample(store.StoreOptions.Metrics.StoreAccessDurationSeconds.With("method", "save_abci_responses"), start)()
return nil
}

func (store dbStore) getValue(key []byte) ([]byte, error) {
Expand Down
26 changes: 16 additions & 10 deletions internal/state/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,63 +504,69 @@ func TestFinalizeBlockResponsePruning(t *testing.T) {
}

func TestLastFinalizeBlockResponses(t *testing.T) {
// create an empty state store.
t.Run("Not persisting responses", func(t *testing.T) {
t.Run("persisting responses", func(t *testing.T) {
stateDB := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: false,
})

responses, err := stateStore.LoadFinalizeBlockResponse(1)
require.Error(t, err)
require.Nil(t, responses)
// stub the abciresponses.

response1 := &abci.FinalizeBlockResponse{
TxResults: []*abci.ExecTxResult{
{Code: 32, Data: []byte("Hello"), Log: "Huh?"},
},
}
// create new db and state store and set discard abciresponses to false.

stateDB = dbm.NewMemDB()
stateStore = sm.NewStore(stateDB, sm.StoreOptions{DiscardABCIResponses: false})
height := int64(10)

// save the last abci response.
err = stateStore.SaveFinalizeBlockResponse(height, response1)
require.NoError(t, err)

// search for the last finalize block response and check if it has saved.
lastResponse, err := stateStore.LoadLastFinalizeBlockResponse(height)
require.NoError(t, err)
// check to see if the saved response height is the same as the loaded height.

assert.Equal(t, lastResponse, response1)

// use an incorrect height to make sure the state store errors.
_, err = stateStore.LoadLastFinalizeBlockResponse(height + 1)
require.Error(t, err)

// check if the abci response didn't save in the abciresponses.
responses, err = stateStore.LoadFinalizeBlockResponse(height)
require.NoError(t, err, responses)
require.Equal(t, response1, responses)
})

t.Run("persisting responses", func(t *testing.T) {
t.Run("not persisting responses", func(t *testing.T) {
stateDB := dbm.NewMemDB()
height := int64(10)
// stub the second abciresponse.

response2 := &abci.FinalizeBlockResponse{
TxResults: []*abci.ExecTxResult{
{Code: 44, Data: []byte("Hello again"), Log: "????"},
},
}
// create a new statestore with the responses on.

stateStore := sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: true,
})
// save an additional response.

err := stateStore.SaveFinalizeBlockResponse(height+1, response2)
require.NoError(t, err)

// check to see if the response saved by calling the last response.
lastResponse2, err := stateStore.LoadLastFinalizeBlockResponse(height + 1)
require.NoError(t, err)
// check to see if the saved response height is the same as the loaded height.

assert.Equal(t, response2, lastResponse2)

// should error as we are no longer saving the response.
_, err = stateStore.LoadFinalizeBlockResponse(height + 1)
assert.Equal(t, sm.ErrFinalizeBlockResponsesNotPersisted, err)
Expand Down
0