8000 Implement GetAllHistoryTreeBranches for SQL persistence backends by MichaelSnowden · Pull Request #3438 · temporalio/temporal · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Implement GetAllHistoryTreeBranches for SQL persistence backends #3438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,6 @@ func (s *HistoryV2PersistenceSuite) TestGenUUIDs() {

// TestScanAllTrees test
func (s *HistoryV2PersistenceSuite) TestScanAllTrees() {
// TODO https://go.temporal.io/server/issues/2458
if s.ExecutionManager.GetName() != "cassandra" {
return
}

resp, err := s.ExecutionManager.GetAllHistoryTreeBranches(s.ctx, &p.GetAllHistoryTreeBranchesRequest{
PageSize: 1,
})
Expand Down
4 changes: 3 additions & 1 deletion common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ type (
DeleteHistoryBranch(ctx context.Context, request *InternalDeleteHistoryBranchRequest) error
// GetHistoryTree returns all branch information of a tree
GetHistoryTree(ctx context.Context, request *GetHistoryTreeRequest) (*InternalGetHistoryTreeResponse, error)
// GetAllHistoryTreeBranches returns all branches of all trees
// GetAllHistoryTreeBranches returns all branches of all trees.
// Note that branches may be skipped or duplicated across pages if there are branches created or deleted while
// paginating through results.
GetAllHistoryTreeBranches(ctx context.Context, request *GetAllHistoryTreeBranchesRequest) (*InternalGetAllHistoryTreeBranchesResponse, error)
}

Expand Down
67 changes: 63 additions & 4 deletions common/persistence/sql/history_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package sql
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"math"

Expand Down Expand Up @@ -425,14 +426,72 @@ func (m *sqlExecutionStore) DeleteHistoryBranch(
})
}

// getAllHistoryTreeBranchesPaginationToken represents the primary key of the latest row in the history_tree table that
// we returned.
type getAllHistoryTreeBranchesPaginationToken struct {
ShardID int32
TreeID primitives.UUID
BranchID primitives.UUID
}

func (m *sqlExecutionStore) GetAllHistoryTreeBranches(
_ context.Context,
ctx context.Context,
request *p.GetAllHistoryTreeBranchesRequest,
) (*p.InternalGetAllHistoryTreeBranchesResponse, error) {
pageSize := request.PageSize
if pageSize <= 0 {
return nil, fmt.Errorf("PageSize must be greater than 0, but was %d", pageSize)
}

page := sqlplugin.HistoryTreeBranchPage{
Limit: pageSize,
}
if len(request.NextPageToken) != 0 {
var token getAllHistoryTreeBranchesPaginationToken
if err := json.Unmarshal(request.NextPageToken, &token); err != nil {
return nil, err
}
page.ShardID = token.ShardID
page.TreeID = token.TreeID
page.BranchID = token.BranchID
}

// TODO https://github.com/uber/cadence/issues/2458
// Implement it when we need
panic("not implemented yet")
rows, err := m.Db.PaginateBranchesFromHistoryTree(ctx, page)
if err != nil {
return nil, err
}
branches := make([]p.InternalHistoryBranchDetail, 0, pageSize)
for _, row := range rows {
branch := p.InternalHistoryBranchDetail{
TreeID: row.TreeID.String(),
BranchID: row.BranchID.String(),
Data: row.Data,
Encoding: row.DataEncoding,
}
branches = append(branches, branch)
}

response := &p.InternalGetAllHistoryTreeBranchesResponse{
Branches: branches,
}
if len(branches) < pageSize {
// no next page token because there are no more results
return response, nil
}

// if we filled the page with rows, then set the next page token
lastRow := rows[len(rows)-1]
token := getAllHistoryTreeBranchesPaginationToken{
ShardID: lastRow.ShardID,
TreeID: lastRow.TreeID,
BranchID: lastRow.BranchID,
}
tokenBytes, err := json.Marshal(token)
if err != nil {
return nil, err
}
response.NextPageToken = tokenBytes
return response, nil
}

// GetHistoryTree returns all branch information of a tree
Expand Down
11 changes: 10 additions & 1 deletion common/persistence/sql/sqlplugin/history_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,19 @@ type (
BranchID primitives.UUID
}

// HistoryNode is the SQL persistence interface for history trees
// HistoryTreeBranchPage is a struct which represents a page of history tree branches to query.
HistoryTreeBranchPage struct {
ShardID int32
TreeID primitives.UUID
BranchID primitives.UUID
Limit int
}

// HistoryTree is the SQL persistence interface for history trees
HistoryTree interface {
InsertIntoHistoryTree(ctx context.Context, row *HistoryTreeRow) (sql.Result, error)
SelectFromHistoryTree(ctx context.Context, filter HistoryTreeSelectFilter) ([]HistoryTreeRow, error)
DeleteFromHistoryTree(ctx context.Context, filter HistoryTreeDeleteFilter) (sql.Result, error)
PaginateBranchesFromHistoryTree(ctx context.Context, filter HistoryTreeBranchPage) ([]HistoryTreeRow, error)
}
)
25 changes: 25 additions & 0 deletions common/persistence/sql/sqlplugin/mysql/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ const (

getHistoryTreeQuery = `SELECT branch_id, data, data_encoding FROM history_tree WHERE shard_id = ? AND tree_id = ? `

paginateBranchesQuery = `SELECT shard_id, tree_id, branch_id, data, data_encoding
FROM history_tree
WHERE (shard_id, tree_id, branch_id) > (?, ?, ?)
ORDER BY shard_id, tree_id, branch_id
LIMIT ?`

deleteHistoryTreeQuery = `DELETE FROM history_tree WHERE shard_id = ? AND tree_id = ? AND branch_id = ? `
)

Expand Down Expand Up @@ -190,6 +196,25 @@ func (mdb *db) SelectFromHistoryTree(
return rows, err
}

// PaginateBranchesFromHistoryTree reads up to page.Limit rows from the history_tree table sorted by their primary key,
// while skipping the first page.Offset rows.
func (mdb *db) PaginateBranchesFromHistoryTree(
ctx context.Context,
page sqlplugin.HistoryTreeBranchPage,
) ([]sqlplugin.HistoryTreeRow, error) {
var rows []sqlplugin.HistoryTreeRow
err := mdb.conn.SelectContext(
ctx,
&rows,
paginateBranchesQuery,
page.ShardID,
page.TreeID,
page.BranchID,
page.Limit,
)
return rows, err
}

// DeleteFromHistoryTree deletes one or more rows from history_tree table
func (mdb *db) DeleteFromHistoryTree(
ctx context.Context,
Expand Down
24 changes: 24 additions & 0 deletions common/persistence/sql/sqlplugin/postgresql/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ const (

getHistoryTreeQuery = `SELECT branch_id, data, data_encoding FROM history_tree WHERE shard_id = $1 AND tree_id = $2 `

paginateBranchesQuery = `SELECT shard_id, tree_id, branch_id, data, data_encoding
FROM history_tree
WHERE (shard_id, tree_id, branch_id) > ($1, $2, $3)
ORDER BY shard_id, tree_id, branch_id
LIMIT $4`

deleteHistoryTreeQuery = `DELETE FROM history_tree WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 `
)

Expand Down Expand Up @@ -192,6 +198,24 @@ func (pdb *db) SelectFromHistoryTree(
return rows, err
}

// PaginateBranchesFromHistoryTree reads up to page.Limit rows from the history_tree table sorted by their primary key,
// while skipping the first page.Offset rows.
func (pdb *db) PaginateBranchesFromHistoryTree(
ctx context.Context,
page sqlplugin.HistoryTreeBranchPage,
) ([]sqlplugin.HistoryTreeRow, error) {
var rows []sqlplugin.HistoryTreeRow
err := pdb.conn.SelectContext(ctx,
&rows,
paginateBranchesQuery,
page.ShardID,
page.TreeID,
page.BranchID,
page.Limit,
)
return rows, err
}

// DeleteFromHistoryTree deletes one or more rows from history_tree table
func (pdb *db) DeleteFromHistoryTree(
ctx context.Context,
Expand Down
24 changes: 24 additions & 0 deletions common/persistence/sql/sqlplugin/sqlite/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ const (

getHistoryTreeQuery = `SELECT branch_id, data, data_encoding FROM history_tree WHERE shard_id = ? AND tree_id = ? `

paginateBranchesQuery = `SELECT shard_id, tree_id, branch_id, data, data_encoding
FROM history_tree
WHERE (shard_id, tree_id, branch_id) > ($1, $2, $3)
ORDER BY shard_id, tree_id, branch_id
LIMIT $4`

deleteHistoryTreeQuery = `DELETE FROM history_tree WHERE shard_id = ? AND tree_id = ? AND branch_id = ? `
)

Expand Down Expand Up @@ -190,6 +196,24 @@ func (mdb *db) SelectFromHistoryTree(
return rows, err
}

// PaginateBranchesFromHistoryTree reads up to page.Limit rows from the history_tree table sorted by their primary key,
// while skipping the first page.Offset rows.
func (mdb *db) PaginateBranchesFromHistoryTree(
ctx context.Context,
page sqlplugin.HistoryTreeBranchPage,
) ([]sqlplugin.HistoryTreeRow, error) {
var rows []sqlplugin.HistoryTreeRow
err := mdb.conn.SelectContext(ctx,
&rows,
paginateBranchesQuery,
page.ShardID,
page.TreeID,
page.BranchID,
page.Limit,
)
return rows, err
}

// DeleteFromHistoryTree deletes one or more rows from history_tree table
func (mdb *db) DeleteFromHistoryTree(
ctx context.Context,
Expand Down
0