8000 mempool: Handle concurrent requests in recheck callback by hvanz · Pull Request #895 · cometbft/cometbft · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

mempool: Handle concurrent requests in recheck callback #895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[mempool]` Allow recheck requests to be handled concurrently instead of
sequentially. ([\#895](https://github.com/cometbft/cometbft/pull/895))
133 changes: 62 additions & 71 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package mempool

import (
"bytes"
"context"
"errors"
"sync"
Expand Down Expand Up @@ -42,20 +41,19 @@ type CListMempool struct {
txs *clist.CList // concurrent linked-list of good txs
proxyAppConn proxy.AppConnMempool

// Track whether we're rechecking txs.
// These are not protected by a mutex and are expected to be mutated in
// serial (ie. by abci responses which are called in serial).
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here

// Map for quick access to txs to record sender in CheckTx.
// Map for quick access to txs elements, for
// - recording sender in CheckTx, and
// - finding tx when processing re-CheckTx.
// txsMap: txKey -> CElement
txsMap sync.Map

// Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp.
cache TxCache

// The number of (re-)CheckTx requests for which we await a response.
numRecheckingRequests uint32

logger log.Logger
metrics *Metrics
}
Expand All @@ -74,14 +72,12 @@ func NewCListMempool(
options ...CListMempoolOption,
) *CListMempool {
mp := &CListMempool{
config: cfg,
proxyAppConn: proxyAppConn,
txs: clist.New(),
height: height,
recheckCursor: nil,
recheckEnd: nil,
logger: log.NewNopLogger(),
metrics: NopMetrics(),
config: cfg,
proxyAppConn: proxyAppConn,
txs: clist.New(),
height: height,
logger: log.NewNopLogger(),
metrics: NopMetrics(),
}

if cfg.CacheSize > 0 {
Expand Down Expand Up @@ -113,6 +109,8 @@ func (mem *CListMempool) getMemTx(txKey types.TxKey) *mempoolTx {
return nil
}

// Called from:
// - Flush (lock not held)
func (mem *CListMempool) removeAllTxs() {
for e := mem.txs.Front(); e != nil; e = e.Next() {
mem.txs.Remove(e)
Expand Down Expand Up @@ -180,6 +178,7 @@ func (mem *CListMempool) FlushAppConn() error {
}

// XXX: Unsafe! Calling Flush may leave mempool in inconsistent state.
// Only called from unsafe_flush_mempool RPC endpoint, which is disabled by default.
func (mem *CListMempool) Flush() {
mem.updateMtx.RLock()
defer mem.updateMtx.RUnlock()
Expand Down Expand Up @@ -285,15 +284,7 @@ func (mem *CListMempool) CheckTx(
// When rechecking, we don't need the peerID, so the recheck callback happens
// here.
func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) {
if mem.recheckCursor == nil {
return
}

mem.metrics.RecheckTimes.Add(1)
mem.resCbRecheck(req, res)

// update metrics
mem.metrics.Size.Set(float64(mem.Size()))
}

// Request specific callback that should be set on individual reqRes objects
Expand All @@ -311,20 +302,15 @@ func (mem *CListMempool) reqResCb(
externalCb func(*abci.ResponseCheckTx),
) func(res *abci.Response) {
return func(res *abci.Response) {
if mem.recheckCursor != nil {
// this should never happen
panic("recheck cursor is not nil in reqResCb")
}

mem.resCbFirstTime(tx, txInfo, res)

// update metrics
mem.metrics.Size.Set(float64(mem.Size()))

// passed in by the caller of CheckTx, eg. the RPC
if externalCb != nil {
externalCb(res.GetCheckTx())
}

// update metrics
mem.metrics.Size.Set(float64(mem.Size()))
}
}

Expand All @@ -346,8 +332,10 @@ func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error {
< 10000 span class='blob-code-inner blob-code-marker ' data-code-marker=" "> mem.txs.Remove(elem)
elem.DetachPrev()
mem.txsMap.Delete(txKey)

tx := elem.Value.(*mempoolTx).tx
atomic.AddInt64(&mem.txsBytes, int64(-len(tx)))

return nil
}
return errors.New("transaction not found in mempool")
Expand Down Expand Up @@ -452,37 +440,22 @@ func (mem *CListMempool) resCbFirstTime(
// The case where the app checks the tx for the first time is handled by the
// resCbFirstTime callback.
func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
tx := req.GetCheckTx().Tx
memTx := mem.recheckCursor.Value.(*mempoolTx)

// Search through the remaining list of tx to recheck for a transaction that matches
// the one we received from the ABCI application.
for {
if bytes.Equal(tx, memTx.tx) {
// We've found a tx in the recheck list that matches the tx that we
// received from the ABCI application.
// Break, and use this transaction for further checks.
break
}
if req.GetCheckTx().GetType() != abci.CheckTxType_Recheck {
return
}

mem.logger.Error(
"re-CheckTx transaction mismatch",
"got", types.Tx(tx),
"expected", memTx.tx,
)
if mem.noRecheckingRequests() {
panic("Trying to process a re-CheckTx response with counter numRecheckingRequests = 0")
}

if mem.recheckCursor == mem.recheckEnd {
// we reached the end of the recheckTx list without finding a tx
// matching the one we received from the ABCI application.
// Return without processing any tx.
mem.recheckCursor = nil
return
}
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
mem.metrics.RecheckTimes.Add(1)

mem.recheckCursor = mem.recheckCursor.Next()
memTx = mem.recheckCursor.Value.(*mempoolTx)
tx := types.Tx(req.GetCheckTx().Tx)
memTx := mem.getMemTx(tx.Key())
if memTx == nil {
return
}

var postCheckErr error
Expand All @@ -492,7 +465,7 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {

if (r.CheckTx.Code != abci.CodeTypeOK) || postCheckErr != nil {
// Tx became invalidated due to newly committed block.
mem.logger.Debug("tx is no longer valid", "tx", types.Tx(tx).Hash(), "res", r, "err", postCheckErr)
mem.logger.Debug("tx is no longer valid", "tx", tx.Hash(), "res", r, "err", postCheckErr)
if err := mem.RemoveTxByKey(memTx.tx.Key()); err != nil {
mem.logger.Debug("Transaction could not be removed from mempool", "err", err)
}
Expand All @@ -501,20 +474,21 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
mem.cache.Remove(tx)
}
}
if mem.recheckCursor == mem.recheckEnd {
mem.recheckCursor = nil
} else {
mem.recheckCursor = mem.recheckCursor.Next()
}
if mem.recheckCursor == nil {
// Done!

// update metrics
mem.metrics.Size.Set(float64(mem.Size()))

mem.decreaseNumRecheckingRequests()

if mem.noRecheckingRequests() {
mem.logger.Debug("done rechecking txs")

// incase the recheck removed all txs
if mem.Size() > 0 {
mem.notifyTxsAvailable()
}
}

default:
// ignore other messages
}
Expand Down Expand Up @@ -667,12 +641,14 @@ func (mem *CListMempool) recheckTxs() {
panic("recheckTxs is called, but the mempool is empty")
}

mem.recheckCursor = mem.txs.Front()
mem.recheckEnd = mem.txs.Back()

// Push txs to proxyAppConn
// NOTE: globalCb may be called concurrently.
for e := mem.txs.Front(); e != nil; e = e.Next() {
// We need to increase this counter before sending the re-CheckTx
// request. Otherwise, when processing the response, the counter at zero
// can be decreased.
mem.increaseNumRecheckingRequests()

memTx := e.Value.(*mempoolTx)
_, err := mem.proxyAppConn.CheckTxAsync(context.TODO(), &abci.RequestCheckTx{
Tx: memTx.tx,
Expand All @@ -688,3 +664,18 @@ func (mem *CListMempool) recheckTxs() {
// all pending messages to the app. There doesn't seem to be any need here as the buffer
// will get flushed regularly or when filled.
}

func (mem *CListMempool) noRecheckingRequests() bool {
return atomic.LoadUint32(&mem.numRecheckingRequests) == 0
}

func (mem *CListMempool) increaseNumRecheckingRequests() uint32 {
return atomic.AddUint32(&mem.numRecheckingRequests, 1)
}

func (mem *CListMempool) decreaseNumRecheckingRequests() uint32 {
if mem.noRecheckingRequests() {
panic("Cannot decrease 0 rechecking requests")
}
return atomic.AddUint32(&mem.numRecheckingRequests, ^uint32(0))
}
30 changes: 25 additions & 5 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,24 +268,44 @@ func TestMempoolUpdateDoesNotPanicWhenApplicationMissedTx(t *testing.T) {
// ensure that the callback that the mempool sets on the ReqRes is run.
reqRes.InvokeCallback()
}
assert.Equal(t, 4, mp.Size())

// Calling update to remove the first transaction from the mempool.
// This call also triggers the mempool to recheck its remaining transactions.
err = mp.Update(0, []types.Tx{txs[0]}, abciResponses(1, abci.CodeTypeOK), nil, nil)
require.Nil(t, err)
assert.Equal(t, 3, mp.Size())

// The mempool has now sent its requests off to the client to be rechecked
// and is waiting for the corresponding callbacks to be called.
// We now call the mempool-supplied callback on the first and third transaction.
// This simulates the client dropping the second request.
// We now call the mempool-supplied callback on the second and fourth transactions.
// This simulates the client dropping the third request.
// Previous versions of this code panicked when the ABCI application missed
// a recheck-tx request.
resp := &abci.ResponseCheckTx{Code: abci.CodeTypeOK}
req := &abci.RequestCheckTx{Tx: txs[1]}
// Also note that responses can be processed in a non-sequential order.

// Process response for fourth tx, which is invalid: remove it from mempool.
req := &abci.RequestCheckTx{Tx: txs[3], Type: abci.CheckTxType_Recheck}
resp := &abci.ResponseCheckTx{Code: 1}
callback(abci.ToRequestCheckTx(req), abci.ToResponseCheckTx(resp))
assert.Equal(t, 2, mp.Size())

// Receive same response as before: mempool does not change.
callback(abci.ToRequestCheckTx(req), abci.ToResponseCheckTx(resp))
assert.Equal(t, 2, mp.Size())

req = &abci.RequestCheckTx{Tx: txs[3]}
// Process response for second tx, which is valid: keep it in mempool.
req = &abci.RequestCheckTx{Tx: txs[1], Type: abci.CheckTxType_Recheck}
resp = &abci.ResponseCheckTx{Code: abci.CodeTypeOK}
callback(abci.ToRequestCheckTx(req), abci.ToResponseCheckTx(resp))
assert.Equal(t, 2, mp.Size())

// Process response for second tx, which is now invalid: remove it from mempool.
req = &abci.RequestCheckTx{Tx: txs[1], Type: abci.CheckTxType_Recheck}
resp = &abci.ResponseCheckTx{Code: 1}
callback(abci.ToRequestCheckTx(req), abci.ToResponseCheckTx(resp))
assert.Equal(t, 1, mp.Size())

mockClient.AssertExpectations(t)
}

Expand Down
0