8000 Make backfill batch selection exclude rows inserted or updated after backfill start by andrew-farries · Pull Request #652 · xataio/pgroll · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Make backfill batch selection exclude rows inserted or updated after backfill start #652

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 30 additions & 25 deletions docs/tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ After some progress updates you should see a message saying that the migration h
<summary>What's happening behind the progress updates?</summary>

In order to add the new `description` column, `pgroll` creates a temporary `_pgroll_new_description` column and copies over the data from the existing `description` column, using the `up` SQL from the migration. As we have 10^5 rows in our table, this process takes some time. This process is called _backfilling_ and it is performed in batches to avoid locking all rows in the table simultaneously.

The `_pgroll_needs_backfill` column in the `users` table is used to track which rows have been backfilled and which have not. This column is set to `true` for all rows at the start of the migration and set to `false` once the row has been backfilled. This ensures that rows inserted or updated while the backfill is in process aren't backfilled twice.
</details>

At this point it's useful to look at the table data and schema to see what `pgroll` has done. Let's look at the data first:
Expand All @@ -169,19 +171,20 @@ SELECT * FROM users ORDER BY id LIMIT 10

You should see something like this:
```
+-----+----------+-------------------------+--------------------------+
| id | name | description | _pgroll_new_description |
+-----+----------+-------------------------+--------------------------+
| 1 | user_1 | <null> | description for user_1 |
| 2 | user_2 | description for user_2 | description for user_2 |
| 3 | user_3 | <null> | description for user_3 |
| 4 | user_4 | description for user_4 | description for user_4 |
| 5 | user_5 | <null> | description for user_5 |
| 6 | user_6 | description for user_6 | description for user_6 |
| 7 | user_7 | <null> | description for user_7 |
| 8 | user_8 | <null> | description for user_8 |
| 9 | user_9 | description for user_9 | description for user_9 |
| 10 | user_10 | description for user_10 | description for user_10 |
+----+---------+------------------------+-------------------------+------------------------+
| id | name | description | _pgroll_new_description | _pgroll_needs_backfill |
|----+---------+------------------------+-------------------------+------------------------|
| 1 | user_1 | <null> | description for user_1 | False |
| 2 | user_2 | description for user_2 | description for user_2 | False |
| 3 | user_3 | <null> | description for user_3 | False |
| 4 | user_4 | <null> | description for user_4 | False |
| 5 | user_5 | <null> | description for user_5 | False |
| 6 | user_6 | description for user_6 | description for user_6 | False |
| 7 | user_7 | <null> | description for user_7 | False |
| 8 | user_8 | description for user_8 | description for user_8 | False |
| 9 | user_9 | <null> | description for user_9 | False |
| 10 | user_10 | <null> | description for user_10 | False |
+----+---------+------------------------+-------------------------+------------------------+
```

This is the "expand" phase of the [expand/contract pattern](https://openpracticelibrary.com/practice/expand-and-contract-pattern/) in action; `pgroll` has added a `_pgroll_new_description` field to the table and populated the field for all rows using the `up` SQL from the `02_user_description_set_nullable.json` file:
Expand All @@ -201,21 +204,22 @@ DESCRIBE users
You should see something like this:

```
+-------------------------+------------------------+-----------------------------------------------------------------+
| Column | Type | Modifiers |
+-------------------------+------------------------+-----------------------------------------------------------------+
| id | integer | not null default nextval('_pgroll_new_users_id_seq'::regclass) |
| name | character varying(255) | not null |
| description | text | |
| _pgroll_new_description | text | |
+-------------------------+------------------------+-----------------------------------------------------------------+
+-------------------------+------------------------+-----------------------------------------------------+
| Column | Type | Modifiers |
|-------------------------+------------------------+-----------------------------------------------------|
| id | integer | not null default nextval('users_id_seq'::regclass) |
| name | character varying(255) | not null |
| description | text | |
| _pgroll_new_description | text | |
| _pgroll_needs_backfill | boolean | default true |
+-------------------------+------------------------+-----------------------------------------------------+
Indexes:
"_pgroll_new_users_pkey" PRIMARY KEY, btree (id)
"_pgroll_new_users_name_key" UNIQUE CONSTRAINT, btree (name)
"users_pkey" PRIMARY KEY, btree (id)
"users_name_key" UNIQUE CONSTRAINT, btree (name)
Check constraints:
"_pgroll_add_column_check_description" CHECK (_pgroll_new_description IS NOT NULL) NOT VALID
"_pgroll_check_not_null_description" CHECK (_pgroll_new_description IS NOT NULL) NOT VALID
Triggers:
_pgroll_trigger_users__pgroll_new_description BEFORE INSERT OR UPDATE ON users FOR EACH ROW EXECUTE FUNCTION _pgroll_trigger_users__pgroll_new_description>
_pgroll_trigger_users__pgroll_new_description BEFORE INSERT OR UPDATE ON users FOR EACH ROW EXECUTE FUNCTION _pgroll_trigger_users__pgroll_new_description()
_pgroll_trigger_users_description BEFORE INSERT OR UPDATE ON users FOR EACH ROW EXECUTE FUNCTION _pgroll_trigger_users_description()
```

Expand Down Expand Up @@ -411,6 +415,7 @@ Indexes:
A few things have happened:

- The extra `_pgroll_new_description` has been renamed to `description`.
- The `_pgroll_needs_backfill` column has been removed.
- The old `description` column has been removed.
- The `description` column is now marked as `NOT NULL`.
- The triggers to copy data back and forth between the old and new column have been removed.
Expand Down
7 changes: 4 additions & 3 deletions pkg/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ func (bf *Backfill) Start(ctx context.Context, table *schema.Table) error {
// Create a batcher for the table.
b := batcher{
BatchConfig: templates.BatchConfig{
TableName: table.Name,
PrimaryKey: identityColumns,
BatchSize: bf.batchSize,
TableName: table.Name,
PrimaryKey: identityColumns,
BatchSize: bf.batchSize,
NeedsBackfillColumn: "_pgroll_needs_backfill",
},
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/backfill/templates/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import (
)

type BatchConfig struct {
TableName string
PrimaryKey []string
LastValue []string
BatchSize int
TableName string
PrimaryKey []string
LastValue []string
BatchSize int
NeedsBackfillColumn string
}

func BuildSQL(cfg BatchConfig) (string, error) {
Expand Down
40 changes: 24 additions & 16 deletions pkg/backfill/templates/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,39 @@ func TestBatchStatementBuilder(t *testing.T) {
}{
"single identity column no last value": {
config: BatchConfig{
TableName: "table_name",
PrimaryKey: []string{"id"},
BatchSize: 10,
TableName: "table_name",
PrimaryKey: []string{"id"},
NeedsBackfillColumn: "_pgroll_needs_backfill",
BatchSize: 10,
},
expected: expectSingleIDColumnNoLastValue,
},
"multiple identity columns no last value": {
config: BatchConfig{
TableName: "table_name",
PrimaryKey: []string{"id", "zip"},
BatchSize: 10,
TableName: "table_name",
PrimaryKey: []string{"id", "zip"},
NeedsBackfillColumn: "_pgroll_needs_backfill",
BatchSize: 10,
},
expected: multipleIDColumnsNoLastValue,
},
"single identity column with last value": {
config: BatchConfig{
TableName: "table_name",
PrimaryKey: []string{"id"},
LastValue: []string{"1"},
BatchSize: 10,
TableName: "table_name",
PrimaryKey: []string{"id"},
NeedsBackfillColumn: "_pgroll_needs_backfill",
LastValue: []string{"1"},
BatchSize: 10,
},
expected: singleIDColumnWithLastValue,
},
"multiple identity columns with last value": {
config: BatchConfig{
TableName: "table_name",
PrimaryKey: []string{"id", "zip"},
LastValue: []string{"1", "1234"},
BatchSize: 10,
TableName: "table_name",
PrimaryKey: []string{"id", "zip"},
NeedsBackfillColumn: "_pgroll_needs_backfill",
LastValue: []string{"1", "1234"},
BatchSize: 10,
},
expected: multipleIDColumnsWithLastValue,
},
Expand All @@ -63,6 +67,7 @@ const expectSingleIDColumnNoLastValue = `WITH batch AS
(
SELECT "id"
FROM "table_name"
WHERE "_pgroll_needs_backfill" = true
ORDER BY "id"
LIMIT 10
FOR NO KEY UPDATE
Expand All @@ -83,6 +88,7 @@ const multipleIDColumnsNoLastValue = `WITH batch AS
(
SELECT "id", "zip"
FROM "table_name"
WHERE "_pgroll_needs_backfill" = true
ORDER BY "id", "zip"
LIMIT 10
FOR NO KEY UPDATE
Expand All @@ -103,7 +109,8 @@ const singleIDColumnWithLastValue = `WITH batch AS
(
SELECT "id"
FROM "table_name"
WHERE ("id") > ('1')
WHERE "_pgroll_needs_backfill" = true
AND ("id") > ('1')
ORDER BY "id"
LIMIT 10
FOR NO KEY UPDATE
Expand All @@ -124,7 +131,8 @@ const multipleIDColumnsWithLastValue = `WITH batch AS
(
SELECT "id", "zip"
FROM "table_name"
WHERE ("id", "zip") > ('1', '1234')
WHERE "_pgroll_needs_backfill" = true
AND ("id", "zip") > ('1', '1234')
ORDER BY "id", "zip"
LIMIT 10
FOR NO KEY UPDATE
Expand Down
3 changes: 2 additions & 1 deletion pkg/backfill/templates/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ const SQL = `WITH batch AS
(
SELECT {{ commaSeparate (quoteIdentifiers .PrimaryKey) }}
FROM {{ .TableName | qi}}
WHERE {{ .NeedsBackfillColumn | qi }} = true
{{ if .LastValue -}}
WHERE ({{ commaSeparate (quoteIdentifiers .PrimaryKey) }}) > ({{ commaSeparate (quoteLiterals .LastValue) }})
AND ({{ commaSeparate (quoteIdentifiers .PrimaryKey) }}) > ({{ commaSeparate (quoteLiterals .LastValue) }})
{{ end -}}
ORDER BY {{ commaSeparate (quoteIdentifiers .PrimaryKey) }}
LIMIT {{ .BatchSize }}
Expand Down
17 changes: 17 additions & 0 deletions pkg/migrations/op_add_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ func (o *OpAddColumn) Complete(ctx context.Context, conn db.DB, tr SQLTransforme
return err
}

// Remove the needs backfill column
_, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s DROP COLUMN IF EXISTS %s",
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(CNeedsBackfillColumn)))
if err != nil {
return err
}

if !o.Column.IsNullable() && o.Column.Default == nil {
_, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s VALIDATE CONSTRAINT %s",
pq.QuoteIdentifier(o.Table),
Expand Down Expand Up @@ -169,6 +177,15 @@ func (o *OpAddColumn) Rollback(ctx context.Context, conn db.DB, tr SQLTransforme

_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE",
pq.QuoteIdentifier(TriggerFunctionName(o.Table, o.Column.Name))))
if err != nil {
return err
}

// Remove the needs backfill column
_, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s DROP COLUMN IF EXISTS %s",
pq.QuoteIdentifier(table.Name),
pq.QuoteIdentifier(CNeedsBackfillColumn)))

return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/migrations/op_add_column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,9 +1127,9 @@ func TestAddColumnWithUpSql(t *testing.T) {
// after rollback + restart + complete, all 'description' values are the backfilled ones.
res := MustSelect(t, db, schema, "02_add_column", "products")
assert.Equal(t, []map[string]any{
{"id": "c", "name": "cherries", "description": "CHERRIES"},
{"id": "a", "name": "apple", "description": "APPLE"},
{"id": "b", "name": "banana", "description": "BANANA"},
{"id": "c", "name": "cherries", "description": "CHERRIES"},
}, res)

// The trigger function has been dropped.
Expand Down
15 changes: 14 additions & 1 deletion pkg/migrations/op_alter_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ func (o *OpAlterColumn) Complete(ctx context.Context, conn db.DB, tr SQLTransfor
return err
}

// Remove the needs backfill column
_, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s DROP COLUMN IF EXISTS %s",
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(CNeedsBackfillColumn)))
if err != nil {
return err
}

// Rename the new column to the old column name
table := s.GetTable(o.Table)
if table == nil {
Expand Down Expand Up @@ -177,7 +185,12 @@ func (o *OpAlterColumn) Rollback(ctx context.Context, conn db.DB, tr SQLTransfor
return err
}

return nil
// Remove the needs backfill column
_, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s DROP COLUMN IF EXISTS %s",
pq.QuoteIdentifier(table.Name),
pq.QuoteIdentifier(CNeedsBackfillColumn)))

return err
}

func (o *OpAlterColumn) Validate(ctx context.Context, s *schema.Schema) error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/migrations/op_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,13 +882,17 @@ func MustSelect(t *testing.T, db *sql.DB, schema, version, table string) []map[s
// - The down functions for the columns no longer exist.
// - The up triggers for the columns no longer exist.
// - The down triggers for the columns no longer exist.
// - The _pgroll_needs_backfill column should not exist on the table.
func TableMustBeCleanedUp(t *testing.T, db *sql.DB, schema, table string, columns ...string) {
t.Helper()

for _, column := range columns {
// The temporary column should not exist on the underlying table.
ColumnMustNotExist(t, db, schema, table, migrations.TemporaryName(column))

// The _pgroll_needs_backfill column should not exist on the table.
ColumnMustNotExist(t, db, schema, table, migrations.CNeedsBackfillColumn)

// The up function for the column no longer exists.
FunctionMustNotExist(t, db, schema, migrations.TriggerFunctionName(table, column))
// The down function for the column no longer exists.
Expand Down
22 changes: 20 additions & 2 deletions pkg/migrations/op_create_constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,16 @@ func (o *OpCreateConstraint) Complete(ctx context.Context, conn db.DB, tr SQLTra
}
}

return o.removeTriggers(ctx, conn)
if err := o.removeTriggers(ctx, conn); err != nil {
return err
}

// Remove the needs backfill column
_, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s DROP COLUMN IF EXISTS %s",
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(CNeedsBackfillColumn)))

return err
}

func (o *OpCreateConstraint) Rollback(ctx context.Context, conn db.DB, tr SQLTransformer, s *schema.Schema) error {
Expand All @@ -174,7 +183,16 @@ func (o *OpCreateConstraint) Rollback(ctx context.Context, conn db.DB, tr SQLTra
return err
}

return o.removeTriggers(ctx, conn)
if err := o.removeTriggers(ctx, conn); err != nil {
return err
}

// Remove the needs backfill column
_, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s DROP COLUMN IF EXISTS %s",
pq.QuoteIdentifier(table.Name),
pq.QuoteIdentifier(CNeedsBackfillColumn)))

return err
}

func (o *OpCreateConstraint) removeTriggers(ctx context.Context, conn db.DB) error {
Expand Down
28 changes: 26 additions & 2 deletions pkg/migrations/op_drop_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,43 @@ func (o *OpDropColumn) Complete(ctx context.Context, conn db.DB, tr SQLTransform

_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE",
pq.QuoteIdentifier(TriggerFunctionName(o.Table, o.Column))))
if err != nil {
return err
}

// Remove the needs backfill column
_, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s DROP COLUMN IF EXISTS %s",
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(CNeedsBackfillColumn)))
if err != nil {
return err
}

return err
return nil
}

func (o *OpDropColumn) Rollback(ctx context.Context, conn db.DB, tr SQLTransformer, s *schema.Schema) error {
table := s.GetTable(o.Table)

_, err := conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE",
pq.QuoteIdentifier(TriggerFunctionName(o.Table, o.Column))))
if err != nil {
return err
}

// Remove the needs backfill column
_, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s DROP COLUMN IF EXISTS %s",
pq.QuoteIdentifier(table.Name),
pq.QuoteIdentifier(CNeedsBackfillColumn)))
if err != nil {
return err
}

// Mark the column as no longer deleted so thats it's visible to preceding
// rollback operations in the same migration
s.GetTable(o.Table).UnRemoveColumn(o.Column)

return err
return nil
}

func (o *OpDropColumn) Validate(ctx context.Context, s *schema.Schema) error {
Expand Down
Loading
0