diff --git a/.changelog/unreleased/breaking-changes/3314-mempool-preupdate.md b/.changelog/unreleased/breaking-changes/3314-mempool-preupdate.md new file mode 100644 index 00000000000..4c2528939f7 --- /dev/null +++ b/.changelog/unreleased/breaking-changes/3314-mempool-preupdate.md @@ -0,0 +1,4 @@ +- `[mempool]` Add to the `Mempool` interface a new method `PreUpdate()`. This method should be + called before acquiring the mempool lock, to signal that a new update is coming. Also add to + `ErrMempoolIsFull` a new field `RecheckFull`. + ([\#3314](https://github.com/cometbft/cometbft/pull/3314)) diff --git a/.changelog/unreleased/improvements/3314-mempool-update-consider-full-when-rechecking.md b/.changelog/unreleased/improvements/3314-mempool-update-consider-full-when-rechecking.md new file mode 100644 index 00000000000..1e308ec6040 --- /dev/null +++ b/.changelog/unreleased/improvements/3314-mempool-update-consider-full-when-rechecking.md @@ -0,0 +1,3 @@ +- `[mempool]` Before updating the mempool, consider it as full if rechecking is still in progress. + This will stop accepting transactions in the mempool if the node can't keep up with re-CheckTx. + ([\#3314](https://github.com/cometbft/cometbft/pull/3314)) diff --git a/blocksync/reactor_test.go b/blocksync/reactor_test.go index cd6e640ab75..6158adc64e1 100644 --- a/blocksync/reactor_test.go +++ b/blocksync/reactor_test.go @@ -91,6 +91,7 @@ func newReactor( mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, diff --git a/consensus/replay_stubs.go b/consensus/replay_stubs.go index 0c55552f036..501f000f8aa 100644 --- a/consensus/replay_stubs.go +++ b/consensus/replay_stubs.go @@ -18,6 +18,7 @@ var _ mempl.Mempool = emptyMempool{} func (emptyMempool) Lock() {} func (emptyMempool) Unlock() {} +func (emptyMempool) PreUpdate() {} func (emptyMempool) Size() int { return 0 } func (emptyMempool) SizeBytes() int64 { return 0 } func (emptyMempool) CheckTx(types.Tx, func(*abci.ResponseCheckTx), mempl.TxInfo) error { diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index e89cd33eb83..16992ef4389 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -160,6 +160,13 @@ func (mem *CListMempool) Unlock() { mem.updateMtx.Unlock() } +// Safe for concurrent use by multiple goroutines. +func (mem *CListMempool) PreUpdate() { + if mem.recheck.setRecheckFull() { + mem.logger.Debug("the state of recheckFull has flipped") + } +} + // Safe for concurrent use by multiple goroutines. func (mem *CListMempool) Size() int { return mem.txs.Len() @@ -372,17 +379,17 @@ func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error { } func (mem *CListMempool) isFull(txSize int) error { - var ( - memSize = mem.Size() - txsBytes = mem.SizeBytes() - ) + memSize := mem.Size() + txsBytes := mem.SizeBytes() + recheckFull := mem.recheck.consideredFull() - if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes { + if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes || recheckFull { return ErrMempoolIsFull{ NumTxs: memSize, MaxTxs: mem.config.Size, TxsBytes: txsBytes, MaxTxsBytes: mem.config.MaxTxsBytes, + RecheckFull: recheckFull, } } @@ -690,6 +697,8 @@ type recheck struct { end *clist.CElement // last entry in the mempool to recheck doneCh chan struct{} // to signal that rechecking has finished successfully (for async app connections) numPendingTxs atomic.Int32 // number of transactions still pending to recheck + isRechecking atomic.Bool // true iff the rechecking process has begun and is not yet finished + recheckFull atomic.Bool // whether rechecking TXs cannot be completed before a new block is decided } func newRecheck() *recheck { @@ -705,16 +714,20 @@ func (rc *recheck) init(first, last *clist.CElement) { rc.cursor = first rc.end = last rc.numPendingTxs.Store(0) + rc.isRechecking.Store(true) } // done returns true when there is no recheck response to process. +// Safe for concurrent use by multiple goroutines. func (rc *recheck) done() bool { - return rc.cursor == nil + return !rc.isRechecking.Load() } // setDone registers that rechecking has finished. func (rc *recheck) setDone() { rc.cursor = nil + rc.recheckFull.Store(false) + rc.isRechecking.Store(false) } // setNextEntry sets cursor to the next entry in the list. If there is no next, cursor will be nil. @@ -770,3 +783,17 @@ func (rc *recheck) findNextEntryMatching(tx *types.Tx) bool { func (rc *recheck) doneRechecking() <-chan struct{} { return rc.doneCh } + +// setRecheckFull sets recheckFull to true if rechecking is still in progress. It returns true iff +// the value of recheckFull has changed. +func (rc *recheck) setRecheckFull() bool { + rechecking := !rc.done() + recheckFull := rc.recheckFull.Swap(rechecking) + return rechecking != recheckFull +} + +// consideredFull returns true iff the mempool should be considered as full while rechecking is in +// progress. +func (rc *recheck) consideredFull() bool { + return rc.recheckFull.Load() +} diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 5da8472036f..7a7a4acb28b 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -734,7 +734,12 @@ func TestMempoolConcurrentUpdateAndReceiveCheckTxResponse(t *testing.T) { go func(h int) { defer wg.Done() - err := mp.Update(int64(h), []types.Tx{tx}, abciResponses(1, abci.CodeTypeOK), nil, nil) + mp.PreUpdate() + mp.Lock() + err := mp.FlushAppConn() + require.NoError(t, err) + err = mp.Update(int64(h), []types.Tx{tx}, abciResponses(1, abci.CodeTypeOK), nil, nil) + mp.Unlock() require.NoError(t, err) require.Equal(t, int64(h), mp.height.Load(), "height mismatch") }(h) @@ -884,6 +889,7 @@ func TestMempoolAsyncRecheckTxReturnError(t *testing.T) { require.True(t, mp.recheck.done()) require.Nil(t, mp.recheck.cursor) require.Nil(t, mp.recheck.end) + require.False(t, mp.recheck.isRechecking.Load()) mockClient.AssertExpectations(t) // One call to CheckTxAsync per tx, for rechecking. @@ -905,6 +911,7 @@ func TestMempoolAsyncRecheckTxReturnError(t *testing.T) { // mp.recheck.done() should be true only before and after calling recheckTxs. mp.recheckTxs() require.True(t, mp.recheck.done()) + require.False(t, mp.recheck.isRechecking.Load()) require.Nil(t, mp.recheck.cursor) require.NotNil(t, mp.recheck.end) require.Equal(t, mp.recheck.end, mp.txs.Back()) @@ -928,6 +935,7 @@ func TestMempoolRecheckRace(t *testing.T) { } // Update one transaction to force rechecking the rest. + mp.PreUpdate() mp.Lock() err = mp.FlushAppConn() require.NoError(t, err) @@ -967,6 +975,7 @@ func TestMempoolConcurrentCheckTxAndUpdate(t *testing.T) { break } txs := mp.ReapMaxBytesMaxGas(100, -1) + mp.PreUpdate() mp.Lock() err := mp.FlushAppConn() // needed to process the pending CheckTx requests and their callbacks require.NoError(t, err) diff --git a/mempool/errors.go b/mempool/errors.go index caf9c02b8f5..7ca6fdcefdf 100644 --- a/mempool/errors.go +++ b/mempool/errors.go @@ -29,6 +29,7 @@ type ErrMempoolIsFull struct { MaxTxs int TxsBytes int64 MaxTxsBytes int64 + RecheckFull bool } func (e ErrMempoolIsFull) Error() string { diff --git a/mempool/mempool.go b/mempool/mempool.go index 25f9253ba5f..74cd68e6e5e 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -57,6 +57,10 @@ type Mempool interface { // Unlock unlocks the mempool. Unlock() + // PreUpdate signals that a new update is coming, before acquiring the mempool lock. + // If the mempool is still rechecking at this point, it should be considered full. + PreUpdate() + // Update informs the mempool that the given txs were committed and can be // discarded. // diff --git a/mempool/mocks/mempool.go b/mempool/mocks/mempool.go index 7573c58e978..73e667ceedc 100644 --- a/mempool/mocks/mempool.go +++ b/mempool/mocks/mempool.go @@ -59,6 +59,11 @@ func (_m *Mempool) Lock() { _m.Called() } +// PreUpdate provides a mock function with given fields: +func (_m *Mempool) PreUpdate() { + _m.Called() +} + // ReapMaxBytesMaxGas provides a mock function with given fields: maxBytes, maxGas func (_m *Mempool) ReapMaxBytesMaxGas(maxBytes int64, maxGas int64) types.Txs { ret := _m.Called(maxBytes, maxGas) diff --git a/mempool/nop_mempool.go b/mempool/nop_mempool.go index 6bfff3b04d4..90eddd3d6d5 100644 --- a/mempool/nop_mempool.go +++ b/mempool/nop_mempool.go @@ -40,6 +40,8 @@ func (*NopMempool) Lock() {} // Unlock does nothing. func (*NopMempool) Unlock() {} +func (*NopMempool) PreUpdate() {} + // Update does nothing. func (*NopMempool) Update( int64, diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 454012f8854..146ca7c6698 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -93,6 +93,7 @@ func TestReactorConcurrency(t *testing.T) { go func() { defer wg.Done() + reactors[0].mempool.PreUpdate() reactors[0].mempool.Lock() defer reactors[0].mempool.Unlock() @@ -110,6 +111,7 @@ func TestReactorConcurrency(t *testing.T) { go func() { defer wg.Done() + reactors[1].mempool.PreUpdate() reactors[1].mempool.Lock() defer reactors[1].mempool.Unlock() err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ExecTxResult, 0), nil, nil) diff --git a/state/execution.go b/state/execution.go index 4accf1639d6..0d69f3bf778 100644 --- a/state/execution.go +++ b/state/execution.go @@ -389,6 +389,7 @@ func (blockExec *BlockExecutor) Commit( block *types.Block, abciResponse *abci.ResponseFinalizeBlock, ) (int64, error) { + blockExec.mempool.PreUpdate() blockExec.mempool.Lock() defer blockExec.mempool.Unlock() diff --git a/state/execution_test.go b/state/execution_test.go index 797bd8a5778..17469c93d5b 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -56,6 +56,7 @@ func TestApplyBlock(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -117,6 +118,7 @@ func TestFinalizeBlockDecidedLastCommit(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -332,6 +334,7 @@ func TestFinalizeBlockMisbehavior(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -590,6 +593,7 @@ func TestFinalizeBlockValidatorUpdates(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -722,6 +726,7 @@ func TestEmptyPrepareProposal(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, diff --git a/state/validation_test.go b/state/validation_test.go index b4efcd75989..a6e82f88c5a 100644 --- a/state/validation_test.go +++ b/state/validation_test.go @@ -38,6 +38,7 @@ func TestValidateBlockHeader(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -135,6 +136,7 @@ func TestValidateBlockCommit(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -287,6 +289,7 @@ func TestValidateBlockEvidence(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything,