8000 Improve backfill performance by andrew-farries · Pull Request #389 · xataio/pgroll · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Improve backfill performance #389

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Operation interface {
// version in the database (through a view)
// update the given views to expose the new schema version
// Returns the table that requires backfilling, if any.
Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error)
Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error)

// Complete will update the database schema to match the current version
// after calling Start.
Expand Down
4 changes: 2 additions & 2 deletions pkg/migrations/op_add_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

var _ Operation = (*OpAddColumn)(nil)

func (o *OpAddColumn) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpAddColumn) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
table := s.GetTable(o.Table)

if err := addColumn(ctx, conn, *o, table, tr); err != nil {
Expand Down Expand Up @@ -47,9 +47,9 @@ func (o *OpAddColumn) Start(ctx context.Context, conn db.DB, stateSchema string,
Direction: TriggerDirectionUp,
Columns: s.GetTable(o.Table).Columns,
SchemaName: s.Name,
LatestSchema: latestSchema,
TableName: o.Table,
PhysicalColumn: TemporaryName(o.Column.Name),
StateSchema: stateSchema,
SQL: o.Up,
})
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/migrations/op_alter_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

var _ Operation = (*OpAlterColumn)(nil)

func (o *OpAlterColumn) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpAlterColumn) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
table := s.GetTable(o.Table)
column := table.GetColumn(o.Column)
ops := o.subOperations()
Expand All @@ -29,7 +29,7 @@ func (o *OpAlterColumn) Start(ctx context.Context, conn db.DB, stateSchema strin

// perform any operation specific start steps
for _, op := range ops {
if _, err := op.Start(ctx, conn, stateSchema, tr, s, cbs...); err != nil {
if _, err := op.Start(ctx, conn, latestSchema, tr, s, cbs...); err != nil {
return nil, err
}
}
Expand All @@ -42,9 +42,9 @@ func (o *OpAlterColumn) Start(ctx context.Context, conn db.DB, stateSchema strin
Direction: TriggerDirectionUp,
Columns: table.Columns,
SchemaName: s.Name,
LatestSchema: latestSchema,
TableName: o.Table,
PhysicalColumn: TemporaryName(o.Column),
StateSchema: stateSchema,
SQL: o.upSQLForOperations(ops),
})
if err != nil {
Expand Down Expand Up @@ -73,10 +73,10 @@ func (o *OpAlterColumn) Start(ctx context.Context, conn db.DB, stateSchema strin
Name: TriggerName(o.Table, TemporaryName(o.Column)),
Direction: TriggerDirectionDown,
Columns: cols,
LatestSchema: latestSchema,
SchemaName: s.Name,
TableName: o.Table,
PhysicalColumn: o.Column,
StateSchema: stateSchema,
SQL: o.downSQLForOperations(ops),
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_change_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type OpChangeType struct {

var _ Operation = (*OpChangeType)(nil)

func (o *OpChangeType) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpChangeType) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
table := s.GetTable(o.Table)

return table, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_create_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

var _ Operation = (*OpCreateIndex)(nil)

func (o *OpCreateIndex) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpCreateIndex) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
// create index concurrently
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)",
pq.QuoteIdentifier(o.Name),
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

var _ Operation = (*OpCreateTable)(nil)

func (o *OpCreateTable) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpCreateTable) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
// Generate SQL for the columns in the table
columnsSQL, err := columnsToSQL(o.Columns, tr)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/migrations/op_drop_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ import (

var _ Operation = (*OpDropColumn)(nil)

func (o *OpDropColumn) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpDropColumn) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
if o.Down != "" {
err := createTrigger(ctx, conn, tr, triggerConfig{
Name: TriggerName(o.Table, o.Column),
Direction: TriggerDirectionDown,
Columns: s.GetTable(o.Table).Columns,
SchemaName: s.Name,
LatestSchema: latestSchema,
TableName: o.Table,
PhysicalColumn: o.Column,
StateSchema: stateSchema,
SQL: o.Down,
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/migrations/op_drop_constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var _ Operation = (*OpDropConstraint)(nil)

func (o *OpDropConstraint) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpDropConstraint) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
table := s.GetTable(o.Table)
column := table.GetColumn(o.Column)

Expand All @@ -29,9 +29,9 @@ func (o *OpDropConstraint) Start(ctx context.Context, conn db.DB, stateSchema st
Direction: TriggerDirectionUp,
Columns: table.Columns,
SchemaName: s.Name,
LatestSchema: latestSchema,
TableName: o.Table,
PhysicalColumn: TemporaryName(o.Column),
StateSchema: stateSchema,
SQL: o.upSQL(),
})
if err != nil {
Expand All @@ -51,9 +51,9 @@ func (o *OpDropConstraint) Start(ctx context.Context, conn db.DB, stateSchema st
Direction: TriggerDirectionDown,
Columns: table.Columns,
SchemaName: s.Name,
LatestSchema: latestSchema,
TableName: o.Table,
PhysicalColumn: o.Column,
StateSchema: stateSchema,
SQL: o.Down,
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_drop_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var _ Operation = (*OpDropIndex)(nil)

func (o *OpDropIndex) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpDropIndex) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
// no-op
return nil, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_drop_not_null.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type OpDropNotNull struct {

var _ Operation = (*OpDropNotNull)(nil)

func (o *OpDropNotNull) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpDropNotNull) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
table := s.GetTable(o.Table)

return table, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var _ Operation = (*OpDropTable)(nil)

func (o *OpDropTable) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpDropTable) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
s.RemoveTable(o.Name)
return nil, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_raw_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

var _ Operation = (*OpRawSQL)(nil)

func (o *OpRawSQL) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpRawSQL) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
if o.OnComplete {
return nil, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_rename_constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var _ Operation = (*OpRenameConstraint)(nil)

func (o *OpRenameConstraint) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpRenameConstraint) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
// no-op
return nil, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_rename_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var _ Operation = (*OpRenameTable)(nil)

func (o *OpRenameTable) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpRenameTable) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
return nil, s.RenameTable(o.From, o.To)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_set_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type OpSetCheckConstraint struct {

var _ Operation = (*OpSetCheckConstraint)(nil)

func (o *OpSetCheckConstraint) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpSetCheckConstraint) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
table := s.GetTable(o.Table)

// Add the check constraint to the new column as NOT VALID.
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_set_comment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type OpSetComment struct {

var _ Operation = (*OpSetComment)(nil)

func (o *OpSetComment) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpSetComment) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
tbl := s.GetTable(o.Table)

return tbl, addCommentToColumn(ctx, conn, o.Table, TemporaryName(o.Column), o.Comment)
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_set_default.go
F438
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type OpSetDefault struct {

var _ Operation = (*OpSetDefault)(nil)

func (o *OpSetDefault) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpSetDefault) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
tbl := s.GetTable(o.Table)

_, err := conn.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN %s SET DEFAULT %s`,
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_set_fk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type OpSetForeignKey struct {

var _ Operation = (*OpSetForeignKey)(nil)

func (o *OpSetForeignKey) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpSetForeignKey) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
table := s.GetTable(o.Table)

// Create a NOT VALID foreign key constraint on the new column.
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_set_notnull.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type OpSetNotNull struct {

var _ Operation = (*OpSetNotNull)(nil)

func (o *OpSetNotNull) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpSetNotNull) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
table := s.GetTable(o.Table)

// Add an unchecked NOT NULL constraint to the new column.
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_set_replica_identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

var _ Operation = (*OpSetReplicaIdentity)(nil)

func (o *OpSetReplicaIdentity) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpSetReplicaIdentity) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
// build the correct form of the `SET REPLICA IDENTITY` statement based on the`identity type
identitySQL := strings.ToUpper(o.Identity.Type)
if identitySQL == "INDEX" {
Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_set_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type OpSetUnique struct {

var _ Operation = (*OpSetUnique)(nil)

func (o *OpSetUnique) Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
func (o *OpSetUnique) Start(ctx context.Context, conn db.DB, latestSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error) {
table := s.GetTable(o.Table)

// Add a unique index to the new column
Expand Down
6 changes: 1 addition & 5 deletions pkg/migrations/templates/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@ const Function = `CREATE OR REPLACE FUNCTION {{ .Name | qi }}()
latest_schema text;
search_path text;
BEGIN
SELECT {{ .SchemaName | ql }} || '_' || latest_version
INTO latest_schema
FROM {{ .StateSchema | qi }}.latest_version({{ .SchemaName | ql }});

SELECT current_setting
INTO search_path
FROM current_setting('search_path');

IF search_path {{- if eq .Direction "up" }} != {{- else }} = {{- end }} latest_schema {{ if .TestExpr -}} AND {{ .TestExpr }} {{ end -}} THEN
IF search_path {{- if eq .Direction "up" }} != {{- else }} = {{- end }} {{ .LatestSchema | ql }} {{ if .TestExpr -}} AND {{ .TestExpr }} {{ end -}} THEN
NEW.{{ .PhysicalColumn | qi }} = {{ .SQL }};
{{- if .ElseExpr }}
ELSE
Expand Down
10000
2 changes: 1 addition & 1 deletion pkg/migrations/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type triggerConfig struct {
SchemaName string
TableName string
PhysicalColumn string
StateSchema string
LatestSchema string
TestExpr string
ElseExpr string
SQL string
Expand Down
32 changes: 8 additions & 24 deletions pkg/migrations/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func TestBuildFunction(t *testing.T) {
"review": {Name: "review", Type: "text"},
},
SchemaName: "public",
LatestSchema: "public_01_migration_name",
TableName: "reviews",
StateSchema: "pgroll",
PhysicalColumn: "_pgroll_new_review",
SQL: "product || 'is good'",
},
Expand All @@ -44,15 +44,11 @@ func TestBuildFunction(t *testing.T) {
latest_schema text;
search_path text;
BEGIN
SELECT 'public' || '_' || latest_version
INTO latest_schema
FROM "pgroll".latest_version('public');

SELECT current_setting
INTO search_path
FROM current_setting('search_path');

IF search_path != latest_schema THEN
IF search_path != 'public_01_migration_name' THEN
NEW."_pgroll_new_review" = product || 'is good';
END IF;

Expand All @@ -72,8 +68,8 @@ func TestBuildFunction(t *testing.T) {
"review": {Name: "review", Type: "text"},
},
SchemaName: "public",
LatestSchema: "public_01_migration_name",
TableName: "reviews",
StateSchema: "pgroll",
TestExpr: `NEW."review" IS NULL`,
PhysicalColumn: "_pgroll_new_review",
ElseExpr: `NEW."_pgroll_new_review" = NEW."review"`,
Expand All @@ -91,15 +87,11 @@ func TestBuildFunction(t *testing.T) {
latest_schema text;
search_path text;
BEGIN
SELECT 'public' || '_' || latest_version
INTO latest_schema
FROM "pgroll".latest_version('public');

SELECT current_setting
INTO search_path
FROM current_setting('search_path');

IF search_path != latest_schema AND NEW."review" IS NULL THEN
IF search_path != 'public_01_migration_name' AND NEW."review" IS NULL THEN
NEW."_pgroll_new_review" = product || 'is good';
ELSE
NEW."_pgroll_new_review" = NEW."review";
Expand All @@ -121,8 +113,8 @@ func TestBuildFunction(t *testing.T) {
"review": {Name: "review", Type: "text"},
},
SchemaName: "public",
LatestSchema: "public_01_migration_name",
TableName: "reviews",
StateSchema: "pgroll",
PhysicalColumn: "review",
SQL: `NEW."_pgroll_new_review"`,
},
Expand All @@ -138,15 +130,11 @@ func TestBuildFunction(t *testing.T) {
latest_schema text;
search_path text;
BEGIN
SELECT 'public' || '_' || latest_version
INTO latest_schema
FROM "pgroll".latest_version('public');

SELECT current_setting
INTO search_path
FROM current_setting('search_path');

IF search_path = latest_schema THEN
IF search_path = 'public_01_migration_name' THEN
NEW."review" = NEW."_pgroll_new_review";
END IF;

Expand All @@ -167,8 +155,8 @@ func TestBuildFunction(t *testing.T) {
"rating": {Name: "_pgroll_new_rating", Type: "integer"},
},
SchemaName: "public",
LatestSchema: "public_01_migration_name",
TableName: "reviews",
StateSchema: "pgroll",
PhysicalColumn: "rating",
SQL: `CAST(rating as text)`,
},
Expand All @@ -185,15 +173,11 @@ func TestBuildFunction(t *testing.T) {
latest_schema text;
search_path text;
BEGIN
SELECT 'public' || '_' || latest_version
INTO latest_schema
FROM "pgroll".latest_version('public');

SELECT current_setting
INTO search_path
FROM current_setting('search_path');

IF search_path = latest_schema THEN
IF search_path = 'public_01_migration_name' THEN
NEW."rating" = CAST(rating as text);
END IF;

Expand Down
Loading
0