8000 fix(mempool)!: stop accepting TXs in the mempool if we can't keep up with reCheckTX (backport #3314) by mergify[bot] · Pull Request #3337 · cometbft/cometbft · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix(mempool)!: stop accepting TXs in the mempool if we can't keep up with reCheckTX (backport #3314) #3337

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- `[mempool]` Add to the `Mempool` interface a new method `PreUpdate()`. This method should be
called before acquiring the mempool lock, to signal that a new update is coming. Also add to
`ErrMempoolIsFull` a new field `RecheckFull`.
([\#3314](https://github.com/cometbft/cometbft/pull/3314))
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[mempool]` Before updating the mempool, consider it as full if rechecking is still in progress.
This will stop accepting transactions in the mempool if the node can't keep up with re-CheckTx.
([\#3314](https://github.com/cometbft/cometbft/pull/3314))
1 change: 1 addition & 0 deletions blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func newReactor(
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down
10000
1 change: 1 addition & 0 deletions consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var _ mempl.Mempool = emptyMempool{}

func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) PreUpdate() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) SizeBytes() int64 { return 0 }
func (emptyMempool) CheckTx(types.Tx, func(*abci.ResponseCheckTx), mempl.TxInfo) error {
Expand Down
39 changes: 33 additions & 6 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ func (mem *CListMempool) Unlock() {
mem.updateMtx.Unlock()
}

// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) PreUpdate() {
if mem.recheck.setRecheckFull() {
mem.logger.Debug("the state of recheckFull has flipped")
}
}

// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) Size() int {
return mem.txs.Len()
Expand Down Expand Up @@ -372,17 +379,17 @@ func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error {
}

func (mem *CListMempool) isFull(txSize int) error {
var (
memSize = mem.Size()
txsBytes = mem.SizeBytes()
)
memSize := mem.Size()
txsBytes := mem.SizeBytes()
recheckFull := mem.recheck.consideredFull()

if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes {
if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes || recheckFull {
return ErrMempoolIsFull{
NumTxs: memSize,
MaxTxs: mem.config.Size,
TxsBytes: txsBytes,
MaxTxsBytes: mem.config.MaxTxsBytes,
RecheckFull: recheckFull,
}
}

Expand Down Expand Up @@ -690,6 +697,8 @@ type recheck struct {
end *clist.CElement // last entry in the mempool to recheck
doneCh chan struct{} // to signal that rechecking has finished successfully (for async app connections)
numPendingTxs atomic.Int32 // number of transactions still pending to recheck
isRechecking atomic.Bool // true iff the rechecking process has begun and is not yet finished
recheckFull atomic.Bool // whether rechecking TXs cannot be completed before a new block is decided
}

func newRecheck() *recheck {
Expand All @@ -705,16 +714,20 @@ func (rc *recheck) init(first, last *clist.CElement) {
rc.cursor = first
rc.end = last
rc.numPendingTxs.Store(0)
rc.isRechecking.Store(true)
}

// done returns true when there is no recheck response to process.
// Safe for concurrent use by multiple goroutines.
func (rc *recheck) done() bool {
return rc.cursor == nil
return !rc.isRechecking.Load()
}

// setDone registers that rechecking has finished.
func (rc *recheck) setDone() {
rc.cursor = nil
rc.recheckFull.Store(false)
rc.isRechecking.Store(false)
}

// setNextEntry sets cursor to the next entry in the list. If there is no next, cursor will be nil.
Expand Down Expand Up @@ -770,3 +783,17 @@ func (rc *recheck) findNextEntryMatching(tx *types.Tx) bool {
func (rc *recheck) doneRechecking() <-chan struct{} {
return rc.doneCh
}

// setRecheckFull sets recheckFull to true if rechecking is still in progress. It returns true iff
// the value of recheckFull has changed.
func (rc *recheck) setRecheckFull() bool {
rechecking := !rc.done()
recheckFull := rc.recheckFull.Swap(rechecking)
return rechecking != recheckFull
}

// consideredFull returns true iff the mempool should be considered as full while rechecking is in
// progress.
func (rc *recheck) consideredFull() bool {
return rc.recheckFull.Load()
}
11 changes: 10 additions & 1 deletion mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,12 @@ func TestMempoolConcurrentUpdateAndReceiveCheckTxResponse(t *testing.T) {
go func(h int) {
defer wg.Done()

err := mp.Update(int64(h), []types.Tx{tx}, abciResponses(1, abci.CodeTypeOK), nil, nil)
mp.PreUpdate()
mp.Lock()
err := mp.FlushAppConn()
require.NoError(t, err)
err = mp.Update(int64(h), []types.Tx{tx}, abciResponses(1, abci.CodeTypeOK), nil, nil)
mp.Unlock()
require.NoError(t, err)
require.Equal(t, int64(h), mp.height.Load(), "height mismatch")
}(h)
Expand Down Expand Up @@ -884,6 +889,7 @@ func TestMempoolAsyncRecheckTxReturnError(t *testing.T) {
require.True(t, mp.recheck.done())
require.Nil(t, mp.recheck.cursor)
require.Nil(t, mp.recheck.end)
require.False(t, mp.recheck.isRechecking.Load())
mockClient.AssertExpectations(t)

// One call to CheckTxAsync per tx, for rechecking.
Expand All @@ -905,6 +911,7 @@ func TestMempoolAsyncRecheckTxReturnError(t *testing.T) {
// mp.recheck.done() should be true only before and after calling recheckTxs.
mp.recheckTxs()
require.True(t, mp.recheck.done())
require.False(t, mp.recheck.isRechecking.Load())
require.Nil(t, mp.recheck.cursor)
require.NotNil(t, mp.recheck.end)
require.Equal(t, mp.recheck.end, mp.txs.Back())
Expand All @@ -928,6 +935,7 @@ func TestMempoolRecheckRace(t *testing.T) {
}

// Update one transaction to force rechecking the rest.
mp.PreUpdate()
mp.Lock()
err = mp.FlushAppConn()
require.NoError(t, err)
Expand Down Expand Up @@ -967,6 +975,7 @@ func TestMempoolConcurrentCheckTxAndUpdate(t *testing.T) {
break
}
txs := mp.ReapMaxBytesMaxGas(100, -1)
mp.PreUpdate()
mp.Lock()
err := mp.FlushAppConn() // needed to process the pending CheckTx requests and their callbacks
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions mempool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type ErrMempoolIsFull struct {
MaxTxs int
TxsBytes int64
MaxTxsBytes int64
RecheckFull bool
}

func (e ErrMempoolIsFull) Error() string {
Expand Down
4 changes: 4 additions & 0 deletions mempool/mempool.go
10000
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type Mempool interface {
// Unlock unlocks the mempool.
Unlock()

// PreUpdate signals that a new update is coming, before acquiring the mempool lock.
// If the mempool is still rechecking at this point, it should be considered full.
PreUpdate()

// Update informs the mempool that the given txs were committed and can be
// discarded.
//
Expand Down
5 changes: 5 additions & 0 deletions mempool/mocks/mempool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions mempool/nop_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func (*NopMempool) Lock() {}
// Unlock does nothing.
func (*NopMempool) Unlock() {}

func (*NopMempool) PreUpdate() {}

// Update does nothing.
func (*NopMempool) Update(
int64,
Expand Down
2 changes: 2 additions & 0 deletions mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func TestReactorConcurrency(t *testing.T) {
go func() {
defer wg.Done()

reactors[0].mempool.PreUpdate()
reactors[0].mempool.Lock()
defer reactors[0].mempool.Unlock()

Expand All @@ -110,6 +111,7 @@ func TestReactorConcurrency(t *testing.T) {
go func() {
defer wg.Done()

reactors[1].mempool.PreUpdate()
reactors[1].mempool.Lock()
defer reactors[1].mempool.Unlock()
err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ExecTxResult, 0), nil, nil)
Expand Down
1 change: 1 addition & 0 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func (blockExec *BlockExecutor) Commit(
block *types.Block,
abciResponse *abci.ResponseFinalizeBlock,
) (int64, error) {
blockExec.mempool.PreUpdate()
blockExec.mempool.Lock()
defer blockExec.mempool.Unlock()

Expand Down
5 changes: 5 additions & 0 deletions state/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestApplyBlock(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -117,6 +118,7 @@ func TestFinalizeBlockDecidedLastCommit(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -332,6 +334,7 @@ func TestFinalizeBlockMisbehavior(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -590,6 +593,7 @@ func TestFinalizeBlockValidatorUpdates(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -722,6 +726,7 @@ func TestEmptyPrepareProposal(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down
3 changes: 3 additions & 0 deletions state/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestValidateBlockHeader(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -135,6 +136,7 @@ func TestValidateBlockCommit(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -287,6 +289,7 @@ func TestValidateBlockEvidence(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down
Loading
0