8000 node/state:bootstrap state api (backport #1057) by mergify[bot] · Pull Request #1311 · cometbft/cometbft · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

node/state:bootstrap state api (backport #1057) #1311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- `[node/state]` Add Go API to bootstrap block store and state store to a height. Make sure block sync starts syncing from bootstrapped height.
([\#1057](https://github.com/tendermint/tendermint/pull/#1057)) (@yihuang)
- `[state/store]` Added Go functions to save height at which offline state sync is performed.
([\#1057](https://github.com/tendermint/tendermint/pull/#1057)) (@jmalicevic)
2 changes: 2 additions & 0 deletions .changelog/unreleased/features/1057-bootstrap-state-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[node/state]` Add Go API to bootstrap block store and state store to a height
([\#1057](https://github.com/tendermint/tendermint/pull/#1057)) (@yihuang)
22 changes: 16 additions & 6 deletions blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,29 @@ type Reactor struct {

// NewReactor returns new reactor instance.
func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
blockSync bool, metrics *Metrics,
blockSync bool, metrics *Metrics, offlineStateSyncHeight int64,
) *Reactor {
if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
store.Height()))
}

storeHeight := store.Height()
if storeHeight == 0 {
// If state sync was performed offline and the stores were bootstrapped to height H
// the state store's lastHeight will be H while blockstore's Height and Base are still 0
// 1. This scenario should not lead to a panic in this case, which is indicated by
// having a OfflineStateSyncHeight > 0
// 2. We need to instruct the blocksync reactor to start fetching blocks from H+1
// instead of 0.
storeHeight = offlineStateSyncHeight
}
if state.LastBlockHeight != storeHeight {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch, stores were left in an inconsistent state", state.LastBlockHeight,
storeHeight))
}
requestsCh := make(chan BlockRequest, maxTotalRequesters)

const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock

startHeight := store.Height() + 1
startHeight := storeHeight + 1
if startHeight == 1 {
startHeight = state.InitialHeight
}
Expand Down
2 changes: 1 addition & 1 deletion blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func newReactor(
blockStore.SaveBlockWithExtendedCommit(thisBlock, thisParts, seenExtCommit)
}

bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync, NopMetrics())
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync, NopMetrics(), 0)
bcReactor.SetLogger(logger.With("module", "blocksync"))

return ReactorPair{bcReactor, proxyApp}
Expand Down
46 changes: 35 additions & 11 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ type State struct {

// for reporting metrics
metrics *Metrics

// offline state sync height indicating to which height the node synced offline
offlineStateSyncHeight int64
}

// StateOption sets an optional parameter on the State.
Expand Down Expand Up @@ -172,25 +175,33 @@ func NewState(
evsw: cmtevents.NewEventSwitch(),
metrics: NopMetrics(),
}

for _, option := range options {
option(cs)
}
// set function defaults (may be overwritten before calling Start)
cs.decideProposal = cs.defaultDecideProposal
cs.doPrevote = cs.defaultDoPrevote
cs.setProposal = cs.defaultSetProposal

// We have no votes, so reconstruct LastCommit from SeenCommit.
if state.LastBlockHeight > 0 {
cs.reconstructLastCommit(state)
// In case of out of band performed statesync, the state store
// will have a state but no extended commit (as no block has been downloaded).
// If the height at which the vote extensions are enabled is lower
// than the height at which we statesync, consensus will panic because
// it will try to reconstruct the extended commit here.
if cs.offlineStateSyncHeight != 0 {
cs.reconstructSeenCommit(state)
} else {
cs.reconstructLastCommit(state)
}
}

cs.updateToState(state)

// NOTE: we do not call scheduleRound0 yet, we do that upon Start()

cs.BaseService = *service.NewBaseService(nil, "State", cs)
for _, option := range options {
option(cs)
}

return cs
}
Expand All @@ -212,6 +223,12 @@ func StateMetrics(metrics *Metrics) StateOption {
return func(cs *State) { cs.metrics = metrics }
}

// OfflineStateSyncHeight indicates the height at which the node
// statesync offline - before booting sets the metrics.
func OfflineStateSyncHeight(height int64) StateOption {
return func(cs *State) { cs.offlineStateSyncHeight = height }
}

// String returns a string.
func (cs *State) String() string {
// better not to access shared variables
Expand Down Expand Up @@ -560,21 +577,28 @@ func (cs *State) sendInternalMessage(mi msgInfo) {
}
}

// ReconstructSeenCommit reconstructs the seen commit
// This function is meant to be called after statesync
// that was performed offline as to avoid interfering with vote
// extensions.
func (cs *State) reconstructSeenCommit(state sm.State) {
votes, err := cs.votesFromSeenCommit(state)
if err != nil {
panic(fmt.Sprintf("failed to reconstruct last commit; %s", err))
}
cs.LastCommit = votes
}

// Reconstruct the LastCommit from either SeenCommit or the ExtendedCommit. SeenCommit
// and ExtendedCommit are saved along with the block. If VoteExtensions are required
// the method will panic on an absent ExtendedCommit or an ExtendedCommit without
// extension data.
func (cs *State) reconstructLastCommit(state sm.State) {
extensionsEnabled := state.ConsensusParams.ABCI.VoteExtensionsEnabled(state.LastBlockHeight)
if !extensionsEnabled {
votes, err := cs.votesFromSeenCommit(state)
if err != nil {
panic(fmt.Sprintf("failed to reconstruct last commit; %s", err))
}
cs.LastCommit = votes
cs.reconstructSeenCommit(state)
return
}

votes, err := cs.votesFromExtendedCommit(state)
if err != nil {
panic(fmt.Sprintf("failed to reconstruct last extended commit; %s", err))
Expand Down
136 changes: 134 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package node

import (
"bytes"
"context"
"fmt"
"net"
"net/http"
"os"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -15,6 +17,7 @@ import (
cfg "github.com/cometbft/cometbft/config"
cs "github.com/cometbft/cometbft/consensus"
"github.com/cometbft/cometbft/evidence"
"github.com/cometbft/cometbft/light"

"github.com/cometbft/cometbft/libs/log"
cmtpubsub "github.com/cometbft/cometbft/libs/pubsub"
Expand Down Expand Up @@ -133,6 +136,124 @@ func StateProvider(stateProvider statesync.StateProvider) Option {
}
}

// BootstrapState synchronizes the stores with the application after state sync
// has been performed offline. It is expected that the block store and state
// store are empty at the time the function is called.
//
// If the block store is not empty, the function returns an error.
func BootstrapState(ctx context.Context, config *cfg.Config, dbProvider cfg.DBProvider, height uint64, appHash []byte) (err error) {
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))
if ctx == nil {
ctx = context.Background()
}

if config == nil {
logger.Info("no config provided, using default configuration")
config = cfg.DefaultConfig()
}

if dbProvider == nil {
dbProvider = cfg.DefaultDBProvider
}
blockStore, stateDB, err := initDBs(config, dbProvider)

defer func() {
if derr := blockStore.Close(); derr != nil {
logger.Error("Failed to close blockstore", "err", derr)
// Set the return value
err = derr
}
}()

if err != nil {
return err
}

if !blockStore.IsEmpty() {
return fmt.Errorf("blockstore not empty, trying to initialize non empty state")
}

stateStore := sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: config.Storage.DiscardABCIResponses,
})

defer func() {
if derr := stateStore.Close(); derr != nil {
logger.Error("Failed to close statestore", "err", derr)
// Set the return value
err = derr
}
}()
state, err := stateStore.Load()
if err != nil {
return err
}

if !state.IsEmpty() {
return fmt.Errorf("state not empty, trying to initialize non empty state")
}

genState, _, err := LoadStateFromDBOrGenesisDocProvider(stateDB, DefaultGenesisDocProviderFunc(config))
if err != nil {
return err
}

stateProvider, err := statesync.NewLightClientStateProvider(
ctx,
genState.ChainID, genState.Version, genState.InitialHeight,
config.StateSync.RPCServers, light.TrustOptions{
Period: config.StateSync.TrustPeriod,
Height: config.StateSync.TrustHeight,
Hash: config.StateSync.TrustHashBytes(),
}, logger.With("module", "light"))
if err != nil {
return fmt.Errorf("failed to set up light client state provider: %w", err)
}

state, err = stateProvider.State(ctx, height)
if err != nil {
return err
}
if appHash == nil {
logger.Info("warning: cannot verify appHash. Verification will happen when node boots up!")
} else {
if !bytes.Equal(appHash, state.AppHash) {
if err := blockStore.Close(); err != nil {
logger.Error("failed to close blockstore: %w", err)
}
if err := stateStore.Close(); err != nil {
logger.Error("failed to close statestore: %w", err)
}
return fmt.Errorf("the app hash returned by the light client does not match the provided appHash, expected %X, got %X", state.AppHash, appHash)
}
}

commit, err := stateProvider.Commit(ctx, height)
if err != nil {
return err
}

if err = stateStore.Bootstrap(state); err != nil {
return err
}

err = blockStore.SaveSeenCommit(state.LastBlockHeight, commit)
if err != nil {
return err
}

// Once the stores are bootstrapped, we need to set the height at which the node has finished
// statesyncing. This will allow the blocksync reactor to fetch blocks at a proper height.
// In case this operation fails, it is equivalent to a failure in online state sync where the operator
// needs to manually delete the state and blockstores and rerun the bootstrapping process.
err = stateStore.SetOfflineStateSyncHeight(state.LastBlockHeight)
if err != nil {
return fmt.Errorf("failed to set synced height: %w", err)
}

return err
}

//------------------------------------------------------------------------------

// NewNode returns a new, ready to go, CometBFT Node.
Expand Down Expand Up @@ -265,18 +386,29 @@ func NewNodeWithContext(ctx context.Context,
sm.BlockExecutorWithMetrics(smMetrics),
)

offlineStateSyncHeight := int64(0)
if blockStore.Height() == 0 {
offlineStateSyncHeight, err = blockExec.Store().GetOfflineStateSyncHeight()
if err != nil && err.Error() != "value empty" {
panic(fmt.Sprintf("failed to retrieve statesynced height from store %s; expected state store height to be %v", err, state.LastBlockHeight))
}
}
// Make BlocksyncReactor. Don't start block sync if we're doing a state sync first.
bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger, bsMetrics)
bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger, bsMetrics, offlineStateSyncHeight)
if err != nil {
return nil, fmt.Errorf("could not create blocksync reactor: %w", err)
}

// Make ConsensusReactor
consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evidencePool,
privValidator, csMetrics, stateSync || blockSync, eventBus, consensusLogger,
privValidator, csMetrics, stateSync || blockSync, eventBus, consensusLogger, offlineStateSyncHeight,
)

err = stateStore.SetOfflineStateSyncHeight(0)
if err != nil {
panic(fmt.Sprintf("failed to reset the offline state sync height %s", err))
}
// Set up state sync reactor, and schedule a sync if requested.
// FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy,
// we should clean this whole thing up. See:
Expand Down
5 changes: 4 additions & 1 deletion node/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,11 @@ func createBlocksyncReactor(config *cfg.Config,
blockSync bool,
logger log.Logger,
metrics *blocksync.Metrics,
offlineStateSyncHeight int64,
) (bcReactor p2p.Reactor, err error) {
switch config.BlockSync.Version {
case "v0":
bcReactor = blocksync.NewReactor(state.Copy(), blockExec, blockStore, blockSync, metrics)
bcReactor = blocksync.NewReactor(state.Copy(), blockExec, blockStore, blockSync, metrics, offlineStateSyncHeight)
case "v1", "v2":
return nil, fmt.Errorf("block sync version %s has been deprecated. Please use v0", config.BlockSync.Version)
default:
Expand All @@ -300,6 +301,7 @@ func createConsensusReactor(config *cfg.Config,
waitSync bool,
eventBus *types.EventBus,
consensusLogger log.Logger,
offlineStateSyncHeight int64,
) (*cs.Reactor, *cs.State) {
consensusState := cs.NewState(
config.Consensus,
Expand All @@ -309,6 +311,7 @@ func createConsensusReactor(config *cfg.Config,
mempool,
evidencePool,
cs.StateMetrics(csMetrics),
cs.OfflineStateSyncHeight(offlineStateSyncHeight),
)
consensusState.SetLogger(consensusLogger)
if privValidator != nil {
Expand Down
8 changes: 8 additions & 0 deletions state/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,11 @@ func SaveValidatorsInfo(db dbm.DB, height, lastHeightChanged int64, valSet *type
stateStore := dbStore{db, StoreOptions{DiscardABCIResponses: false}}
return stateStore.saveValidatorsInfo(height, lastHeightChanged, valSet)
}

func Int64ToBytes(val int64) []byte {
return int64ToBytes(val)
}

func Int64FromBytes(val []byte) int64 {
return int64FromBytes(val)
}
Loading
0