8000 ADR 101: Add GetLatest method to block service by thanethomson · Pull Request #1209 · cometbft/cometbft · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

ADR 101: Add GetLatest method to block service #1209

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
379 changes: 369 additions & 10 deletions proto/tendermint/services/block/v1/block.pb.go
10000

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions proto/tendermint/services/block/v1/block.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ message GetByHeightResponse {
tendermint.types.Block block = 2;
}

// GetLatestHeightRequest - empty message since no parameter is required
message GetLatestHeightRequest {
message GetLatestRequest {}

message GetLatestResponse {
tendermint.types.BlockID block_id = 1;
tendermint.types.Block block = 2;
}

// GetLatestHeightRequest - empty message since no parameter is required
message GetLatestHeightRequest {}

// GetLatestHeightResponse provides the height of the latest committed block.
message GetLatestHeightResponse {
// The height of the latest committed block. Will be 0 if no data has been
Expand Down
69 changes: 53 additions & 16 deletions proto/tendermint/services/block/v1/block_service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions proto/tendermint/services/block/v1/block_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import "tendermint/services/block/v1/block.proto";

// BlockService provides information about blocks
service BlockService {
// GetBlock retrieves the block information at a particular height,
// if height '0' (zero) is specified it returns the latest height
// GetBlock retrieves the block information at a particular height.
rpc GetByHeight(GetByHeightRequest) returns (GetByHeightResponse);

// GetLatest retrieves the latest block.
rpc GetLatest(GetLatestRequest) returns (GetLatestResponse);

// GetLatestHeight returns a stream of the latest block heights committed by
// the network. This is a long-lived stream that is only terminated by the
// server if an error occurs. The caller is expected to handle such
Expand Down
115 changes: 79 additions & 36 deletions rpc/grpc/client/block_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,32 @@ import (
"fmt"

blocksvc "github.com/cometbft/cometbft/proto/tendermint/services/block/v1"
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
"github.com/cometbft/cometbft/types"
"github.com/cosmos/gogoproto/grpc"
)

// Block data returned by the CometBFT BlockService gRPC API.
type Block struct {
BlockID types.BlockID `json:"block_id"`
Block *types.Block `json:"block"`
BlockID *types.BlockID `json:"block_id"`
Block *types.Block `json:"block"`
}

func blockFromProto(pblockID *cmtproto.BlockID, pblock *cmtproto.Block) (*Block, error) {
blockID, err := types.BlockIDFromProto(pblockID)
if err != nil {
return nil, err
}

block, err := types.BlockFromProto(pblock)
if err != nil {
return nil, err
}

return &Block{
BlockID: blockID,
Block: block,
}, nil
}

// LatestHeightResult type used in GetLatestResult and send to the client
Expand All @@ -22,12 +40,32 @@ type LatestHeightResult struct {
Error error
}

type getLatestHeightConfig struct {
chSize uint
}

type GetLatestHeightOption func(*getLatestHeightConfig)

// GetLatestHeightChannelSize allows control over the channel size. If not used
// or the channel size is set to 0, an unbuffered channel will be created.
func GetLatestHeightChannelSize(sz uint) GetLatestHeightOption {
return func(opts *getLatestHeightConfig) {
opts.chSize = sz
}
}

// BlockServiceClient provides block information
type BlockServiceClient interface {
// GetBlockByHeight attempts to retrieve the block associated with the
// given height.
GetBlockByHeight(ctx context.Context, height int64) (*Block, error)
// GetLatestHeight provides sends the latest committed block height to the given output
// channel as blocks are committed.
GetLatestHeight(ctx context.Context, resultCh chan<- LatestHeightResult)

// GetLatestBlock attempts to retrieve the latest committed block.
GetLatestBlock(ctx context.Context) (*Block, error)

// GetLatestHeight provides sends the latest committed block height to the
// resulting output channel as blocks are committed.
GetLatestHeight(ctx context.Context, opts ...GetLatestHeightOption) (<-chan LatestHeightResult, error)
Copy link
Contributor Author
@thanethomson thanethomson Aug 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andynog I've gone back on my prior recommendation of supplying the channel to the function here because it turns out that the goroutine this function spawns could, non-deterministically, attempt to write to the channel after it's been closed (even if the context is cancelled beforehand), causing the client to panic.

This new approach causes the GetLatestHeight method to manage the whole lifecycle of the channel, but still provides the caller control over the channel size (and whether it's buffered or unbuffered).

}

type blockServiceClient struct {
Expand All @@ -42,67 +80,67 @@ func newBlockServiceClient(conn grpc.ClientConn) BlockServiceClient {

// GetBlockByHeight implements BlockServiceClient GetBlockByHeight
func (c *blockServiceClient) GetBlockByHeight(ctx context.Context, height int64) (*Block, error) {
req := blocksvc.GetByHeightRequest{
res, err := c.client.GetByHeight(ctx, &blocksvc.GetByHeightRequest{
Height: height,
}
res, err := c.client.GetByHeight(ctx, &req)
})
if err != nil {
return nil, err
}

// convert Block from proto to core type
block, err := types.BlockFromProto(res.Block)
if err != nil {
return nil, err
}
return blockFromProto(res.BlockId, res.Block)
}

// convert BlockID from proto to core type
blockID, err := types.BlockIDFromProto(res.BlockId)
// GetLatestBlock implements BlockServiceClient.
func (c *blockServiceClient) GetLatestBlock(ctx context.Context) (*Block, error) {
res, err := c.client.GetLatest(ctx, &blocksvc.GetLatestRequest{})
if err != nil {
return nil, err
}

response := Block{
BlockID: *blockID,
Block: block,
}
return &response, nil
return blockFromProto(res.BlockId, res.Block)
}

// GetLatestHeight implements BlockServiceClient GetLatestHeight
// This method provides an out channel (int64) that streams the latest height.
// The out channel might return non-contiguous heights if the channel becomes full,
func (c *blockServiceClient) GetLatestHeight(ctx context.Context, resultCh chan<- LatestHeightResult) {
func (c *blockServiceClient) GetLatestHeight(ctx context.Context, opts ...GetLatestHeightOption) (<-chan LatestHeightResult, error) {
req := blocksvc.GetLatestHeightRequest{}

latestHeightClient, err := c.client.GetLatestHeight(ctx, &req)
if err != nil {
resultCh <- LatestHeightResult{
Height: 0,
Error: fmt.Errorf("error getting a stream for the latest height"),
}
return nil, fmt.Errorf("error getting a stream for the latest height: %w", err)
}

cfg := &getLatestHeightConfig{}
for _, opt := range opts {
opt(cfg)
}
resultCh := make(chan LatestHeightResult, cfg.chSize)

go func(client blocksvc.BlockService_GetLatestHeightClient) {
defer close(resultCh)
for {
response, err := client.Recv()
if err != nil {
resultCh <- LatestHeightResult{
Height: 0,
Error: fmt.Errorf("error receiving the latest height from a stream"),
res := LatestHeightResult{Error: fmt.Errorf("error receiving the latest height from a stream: %w", err)}
select {
case <-ctx.Done():
case resultCh <- res:
}
break
return
}
res := LatestHeightResult{Height: response.Height}
select {
case resultCh <- LatestHeightResult{
Height: response.Height,
Error: fmt.Errorf("error receiving the latest height from a stream"),
}:
case <-ctx.Done():
return
case resultCh <- res:
default:
// Skip sending this result because the channel is full - the
// client will get the next one once the channel opens up again
}

}
}(latestHeightClient)

return resultCh, nil
}

type disabledBlockServiceClient struct{}
Expand All @@ -116,7 +154,12 @@ func (*disabledBlockServiceClient) GetBlockByHeight(context.Context, int64) (*Bl
panic("block service client is disabled")
}

// GetLatestBlock implements BlockServiceClient.
func (*disabledBlockServiceClient) GetLatestBlock(context.Context) (*Block, error) {
panic("block service client is disabled")
}

// GetLatestHeight implements BlockServiceClient GetLatestHeight - disabled client
func (*disabledBlockServiceClient) GetLatestHeight(context.Context, chan<- LatestHeightResult) {
func (*disabledBlockServiceClient) GetLatestHeight(context.Context, ...GetLatestHeightOption) (<-chan LatestHeightResult, error) {
panic("block service client is disabled")
}
Loading
0