8000 Add support for multi-deletes. by VaibhaveS · Pull Request #828 · dimitri/pgcopydb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add support for multi-deletes. #828

New issue
8000

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 199 additions & 75 deletions src/bin/pgcopydb/ld_transform.c
6D4E
Original file line number Diff line number Diff line change
Expand Up @@ -1372,51 +1372,116 @@ coalesceLogicalTransactionStatement(LogicalTransaction *txn,
{
LogicalTransactionStatement *last = txn->last;

LogicalMessageValuesArray *lastValuesArray =
&(last->stmt.insert.new.array->values);
int action = last->action;

LogicalMessageValuesArray *newValuesArray =
&(new->stmt.insert.new.array->values);
switch (action)
{
case STREAM_ACTION_INSERT:
{
LogicalMessageValuesArray *lastValuesArray =
&(last->stmt.insert.new.array->values);

int capacity = lastValuesArray->capacity;
LogicalMessageValues *array = lastValuesArray->array;
LogicalMessageValuesArray *newValuesArray =
&(new->stmt.insert.new.array->values);

/*
* Check if the current LogicalMessageValues array has enough space to hold
* the values from the new statement. If not, resize the lastValuesArray
* using realloc.
*/
if (capacity < (lastValuesArray->count + 1))
{
/*
* Additionally, we allocate more space than currently needed to avoid
* repeated reallocation on every new value append. This trade-off
* increases memory usage slightly but reduces the reallocation overhead
* and potential heap memory fragmentation.
*/
capacity *= 2;
array = (LogicalMessageValues *)
realloc(array, sizeof(LogicalMessageValues) * capacity);
int capacity = lastValuesArray->capacity;
LogicalMessageValues *array = lastValuesArray->array;

if (array == NULL)
{
log_error(ALLOCATION_FAILED_ERROR);
return false;
/*
* Check if the current LogicalMessageValues array has enough space to hold
* the values from the new statement. If not, resize the lastValuesArray
* using realloc.
*/
if (capacity < (lastValuesArray->count + 1))
{
/*
* Additionally, we allocate more space than currently needed to avoid
* repeated reallocation on every new value append. This trade-off
* increases memory usage slightly but reduces the reallocation overhead
* and potential heap memory fragmentation.
*/
capacity *= 2;
array = (LogicalMessageValues *)
realloc(array, sizeof(LogicalMessageValues) * capacity);

if (array == NULL)
{
log_error(ALLOCATION_FAILED_ERROR);
return false;
}

lastValuesArray->array = array;
lastValuesArray->capacity = capacity;
}

/*
* Move the new value from the 'newValuesArray' to the 'lastValuesArray' of
* the existing statement. Additionally, set the count of the 'newValuesArray'
* to 0 to prevent it from being deallocated by FreeLogicalMessageTupleArray,
* as it has been moved to the 'lastValuesArray'.
*/
lastValuesArray->array[lastValuesArray->count++] = newValuesArray->array[0];
newValuesArray->count = 0;

return true;
}

lastValuesArray->array = array;
lastValuesArray->capacity = capacity;
}
case STREAM_ACTION_DELETE:
{
LogicalMessageValuesArray *lastIdentityValuesArray =
&(last->stmt.delete.old.array->values);

/*
* Move the new value from the 'newValuesArray' to the 'lastValuesArray' of
* the existing statement. Additionally, set the count of the 'newValuesArray'
* to 0 to prevent it from being deallocated by FreeLogicalMessageTupleArray,
* as it has been moved to the 'lastValuesArray'.
*/
lastValuesArray->array[lastValuesArray->count++] = newValuesArray->array[0];
newValuesArray->count = 0;
LogicalMessageValuesArray *newIdentityValuesArray =
&(new->stmt.delete.old.array->values);

int capacity = lastIdentityValuesArray->capacity;
LogicalMessageValues *array = lastIdentityValuesArray->array;

/*
* Check if the current LogicalMessageValues array has enough space to hold
* the values from the new statement. If not, resize the lastIdentityValuesArray
* using realloc.
*/
if (capacity < (lastIdentityValuesArray->count + 1))
{
/*
* Additionally, we allocate more space than currently needed to avoid
* repeated reallocation on every new value append. This trade-off
* increases memory usage slightly but reduces the reallocation overhead
* and potential heap memory fragmentation.
*/
capacity *= 2;
array = (LogicalMessageValues *)
realloc(array, sizeof(LogicalMessageValues) * capacity);

if (array == NULL)
{
log_error(ALLOCATION_FAILED_ERROR);
return false;
}

lastIdentityValuesArray->array = array;
lastIdentityValuesArray->capacity = capacity;
}

/*
* Move the new value from the 'newIdentityValuesArray' to the 'lastIdentityValuesArray' of
* the existing statement. Additionally, set the count of the 'newIdentityValuesArray'
* to 0 to prevent it from being deallocated by FreeLogicalMessageTupleArray,
* as it has been moved to the 'lastIdentityValuesArray'.
*/
lastIdentityValuesArray->array[lastIdentityValuesArray->count++] =
newIdentityValuesArray->array[0];
newIdentityValuesArray->count = 0;

return true;
}

default:
{
break;
}
}
return true;
}

Expand All @@ -1434,60 +1499,114 @@ canCoalesceLogicalTransactionStatement(LogicalTransaction *txn,
{
LogicalTransactionStatement *last = txn->last;

/* TODO: Support UPDATE and DELETE */
if (last->action != STREAM_ACTION_INSERT ||
new->action != STREAM_ACTION_INSERT)
if (last->action != new->action)
{
return false;
}

LogicalMessageInsert *lastInsert = &last->stmt.insert;
LogicalMessageInsert *newInsert = &new->stmt.insert;
int action = last->action;

/* Last and current statements must target same relation */
if (!streq(lastInsert->table.nspname, newInsert->table.nspname) ||
!streq(lastInsert->table.relname, newInsert->table.relname))
switch (action)
{
return false;
}
case STREAM_ACTION_INSERT:
{
LogicalMessageInsert *lastInsert = &last->stmt.insert;
LogicalMessageInsert *newInsert = &new->stmt.insert;

/* Last and current statements must target same relation */
if (!streq(lastInsert->table.nspname, newInsert->table.nspname) ||
!streq(lastInsert->table.relname, newInsert->table.relname))
{
return false;
}

LogicalMessageTuple *lastInsertColumns = lastInsert->new.array;
LogicalMessageTuple *newInsertColumns = newInsert->new.array;
LogicalMessageTuple *lastInsertColumns = lastInsert->new.array;
LogicalMessageTuple *newInsertColumns = newInsert->new.array;

/* Last and current statements must have same number of columns */
if (lastInsertColumns->cols != newInsertColumns->cols)
{
return false;
}
/* Last and current statements must have same number of columns */
if (lastInsertColumns->cols != newInsertColumns->cols)
{
return false;
}

LogicalMessageValuesArray *lastValuesArray = &(lastInsert->new.array->values);
LogicalMessageValuesArray *lastValuesArray = &(lastInsert->new.array->values);

/*
* Check if adding the new statement would exceed libpq's limit on the total
* number of parameters allowed in a single PQsendPrepare call.
* If it would exceed the limit, return false to indicate that coalescing
* should not be performed.
*
* TODO: This parameter limit check is not applicable for COPY operations.
* It should be removed once we switch to using COPY.
*/
if (((lastValuesArray->count + 1) * lastInsertColumns->cols) >
PQ_QUERY_PARAM_MAX_LIMIT)
{
return false;
}
/*
* Check if adding the new statement would exceed libpq's limit on the total
* number of parameters allowed in a single PQsendPrepare call.
* If it would exceed the limit, return false to indicate that coalescing
* should not be performed.
*
* TODO: This parameter limit check is not applicable for COPY operations.
* It should be removed once we switch to using COPY.
*/
if (((lastValuesArray->count + 1) * lastInsertColumns->cols) >
PQ_QUERY_PARAM_MAX_LIMIT)
{
return false;
}


/* Last and current statements cols must have same name and order */
for (int i = 0; i < lastInsertColumns->cols; i++)
{
if (!streq(lastInsertColumns->columns[i], newInsertColumns->columns[i]))
/* Last and current statements cols must have same name and order */
for (int i = 0; i < lastInsertColumns->cols; i++)
{
if (!streq(lastInsertColumns->columns[i], newInsertColumns->columns[i]))
{
return false;
}
}

return true;
}

case STREAM_ACTION_DELETE:
{
return false;
LogicalMessageDelete *lastDelete = &last->stmt.delete;
LogicalMessageDelete *newDelete = &new->stmt.delete;

/* Last and current statements must target same relation */
if (!streq(lastDelete->table.nspname, newDelete->table.nspname) ||
!streq(lastDelete->table.relname, newDelete->table.relname))
{
return false;
}

LogicalMessageTuple *lastDeleteIdentityColumns = lastDelete->old.array;
LogicalMessageTuple *newDeleteIdentityColumns = newDelete->old.array;

/* Last and current identity cols must have same name and order */
for (int i = 0; i < lastDeleteIdentityColumns->cols; i++)
{
if (!streq(lastDeleteIdentityColumns->columns[i],
newDeleteIdentityColumns->columns[i]))
{
return false;
}
}

/*
* Check if adding the new statement would exceed libpq's limit on the total
* number of parameters allowed in a single PQsendPrepare call.
* If it would exceed the limit, return false to indicate that coalescing
* should not be performed.
*/

if (((lastDeleteIdentityColumns->cols + 1) *
lastDeleteIdentityColumns->cols) > PQ_QUERY_PARAM_MAX_LIMIT)
{
return false;
}

return true;
}

default:
{
break;
}
}

return true;
return false;
}


Expand Down Expand Up @@ -2330,6 +2449,8 @@ stream_write_delete(FILE *out, LogicalMessageDelete *delete)
{
LogicalMessageValues *values = &(old->values.array[r]);

appendPQExpBuffer(buf, "(");

/* now loop over column values for this VALUES row */
for (int v = 0; v < values->cols; v++)
{
Expand Down Expand Up @@ -2370,6 +2491,9 @@ stream_write_delete(FILE *out, LogicalMessageDelete *delete)
}
}
}

appendPQExpBuffer(buf, ")%s",
r < old->values.count - 1 ? " or " : "");
}

uint32_t hash = hashlittle(buf->data, buf->len, 5381);
Expand Down
32 changes: 14 additions & 18 deletions tests/cdc-low-level/000000010000000000000002.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
BEGIN; -- {"xid":489,"lsn":"0/24D2208","timestamp":"2024-05-08 11:10:09.242465+0000","commit_lsn":"0/24D4588"}
BEGIN; -- {"xid":489,"lsn":"0/24D2168","timestamp":"2024-06-28 03:23:06.960051+0000","commit_lsn":"0/24D44E8"}
PREPARE 58013803 AS INSERT INTO public.rental (rental_id, rental_date, inventory_id, customer_id, return_date, staff_id, last_update) overriding system value VALUES ($1, $2, $3, $4, $5, $6, $7), ($8, $9, $10, $11, $12, $13, $14);
EXECUTE 58013803["16050","2022-06-01 00:00:00+00","371","291",null,"1","2022-06-01 00:00:00+00","16051","2022-06-01 00:00:00+00","373","293",null,"2","2022-06-01 00:00:00+00"];
PREPARE dd147129 AS INSERT INTO public.payment_p2022_06 (payment_id, customer_id, staff_id, rental_id, amount, payment_date) overriding system value VALUES ($1, $2, $3, $4, $5, $6), ($7, $8, $9, $10, $11, $12);
EXECUTE dd147129["32099","291","1","16050","5.99","2022-06-01 00:00:00+00","32100","293","2","16051","5.99","2022-06-01 00:00:00+00"];
COMMIT; -- {"xid":489,"lsn":"0/24D4588","timestamp":"2024-05-08 11:10:09.242465+0000"}
BEGIN; -- {"xid":490,"lsn":"0/24D4588","timestamp":"2024-05-08 11:10:09.242871+0000","commit_lsn":"0/24D5658"}
COMMIT; -- {"xid":489,"lsn":"0/24D44E8","timestamp":"2024-06-28 03:23:06.960051+0000"}
BEGIN; -- {"xid":490,"lsn":"0/24D44E8","timestamp":"2024-06-28 03:23:06.960576+0000","commit_lsn":"0/24D55B8"}
PREPARE b44633db AS UPDATE public.payment_p2022_02 SET amount = $1 WHERE payment_id = $2 and customer_id = $3 and staff_id = $4 and rental_id = $5 and amount = $6 and payment_date = $7;
EXECUTE b44633db["11.95","23757","116","2","14763","11.99","2022-02-11 03:52:25.634006+00"];
PREPARE b44633db AS UPDATE public.payment_p2022_02 SET amount = $1 WHERE payment_id = $2 and customer_id = $3 and staff_id = $4 and rental_id = $5 and amount = $6 and payment_date = $7;
Expand All @@ -25,18 +25,14 @@ PREPARE 4b3d4a5b AS UPDATE public.payment_p2022_07 SET amount = $1 WHERE payment
EXECUTE 4b3d4a5b["11.95","28814","592","1","3973","11.99","2022-07-06 12:15:38.928947+00"];
PREPARE 4b3d4a5b AS UPDATE public.payment_p2022_07 SET amount = $1 WHERE payment_id = $2 and customer_id = $3 and staff_id = $4 and rental_id = $5 and amount = $6 and payment_date = $7;
EXECUTE 4b3d4a5b["11.95","29136","13","2","8831","11.99","2022-07-22 16:15:40.797771+00"];
COMMIT; -- {"xid":490,"lsn":"0/24D5658","timestamp":"2024-05-08 11:10:09.242871+0000"}
BEGIN; -- {"xid":491,"lsn":"0/24D5818","timestamp":"2024-05-08 11:10:09.242917+0000","commit_lsn":"0/24D59C8"}
PREPARE e1d51ac7 AS DELETE FROM public.payment_p2022_06 WHERE payment_id = $1 and customer_id = $2 and staff_id = $3 and rental_id = $4 and amount = $5 and payment_date = $6;
EXECUTE e1d51ac7["32099","291","1","16050","5.99","2022-06-01 00:00:00+00"];
PREPARE e1d51ac7 AS DELETE FROM public.payment_p2022_06 WHERE payment_id = $1 and customer_id = $2 and staff_id = $3 and rental_id = $4 and amount = $5 and payment_date = $6;
EXECUTE e1d51ac7["32100","293","2","16051","5.99","2022-06-01 00:00:00+00"];
PREPARE 3f2797d9 AS DELETE FROM public.rental WHERE rental_id = $1;
EXECUTE 3f2797d9["16050"];
PREPARE 3f2797d9 AS DELETE FROM public.rental WHERE rental_id = $1;
EXECUTE 3f2797d9["16051"];
COMMIT; -- {"xid":491,"lsn":"0/24D59C8","timestamp":"2024-05-08 11:10:09.242917+0000"}
BEGIN; -- {"xid":492,"lsn":"0/24D59C8","timestamp":"2024-05-08 11:10:09.243006+0000","commit_lsn":"0/24D5F48"}
COMMIT; -- {"xid":490,"lsn":"0/24D55B8","timestamp":"2024-06-28 03:23:06.960576+0000"}
BEGIN; -- {"xid":491,"lsn":"0/24D5778","timestamp":"2024-06-28 03:23:06.960643+0000","commit_lsn":"0/24D5928"}
PREPARE a38ef4dc AS DELETE FROM public.payment_p2022_06 WHERE (payment_id = $1 and customer_id = $2 and staff_id = $3 and rental_id = $4 and amount = $5 and payment_date = $6) or (payment_id = $7 and customer_id = $8 and staff_id = $9 and rental_id = $10 and amount = $11 and payment_date = $12);
EXECUTE a38ef4dc["32099","291","1","16050","5.99","2022-06-01 00:00:00+00","32100","293","2","16051","5.99","2022-06-01 00:00:00+00"];
PREPARE e57c09e8 AS DELETE FROM public.rental WHERE (rental_id = $1) or (rental_id = $2);
EXECUTE e57c09e8["16050","16051"];
COMMIT; -- {"xid":491,"lsn":"0/24D5928","timestamp":"2024-06-28 03:23:06.960643+0000"}
BEGIN; -- {"xid":492,"lsn":"0/24D5928","timestamp":"2024-06-28 03:23:06.960791+0000","commit_lsn":"0/24D5EA8"}
PREPARE b44633db AS UPDATE public.payment_p2022_02 SET amount = $1 WHERE payment_id = $2 and customer_id = $3 and staff_id = $4 and rental_id = $5 and amount = $6 and payment_date = $7;
EXECUTE b44633db["11.99","23757","116","2","14763","11.95","2022-02-11 03:52:25.634006+00"];
PREPARE b44633db AS UPDATE public.payment_p2022_02 SET amount = $1 WHERE payment_id = $2 and customer_id = $3 and staff_id = $4 and rental_id = $5 and amount = $6 and payment_date = $7;
Expand All @@ -57,6 +53,6 @@ PREPARE 4b3d4a5b AS UPDATE public.payment_p2022_07 SET amount = $1 WHERE payment
EXECUTE 4b3d4a5b["11.99","28814","592","1","3973","11.95","2022-07-06 12:15:38.928947+00"];
PREPARE 4b3d4a5b AS UPDATE public.payment_p2022_07 SET amount = $1 WHERE payment_id = $2 and customer_id = $3 and staff_id = $4 and rental_id = $5 and amount = $6 and payment_date = $7;
EXECUTE 4b3d4a5b["11.99","29136","13","2","8831","11.95","2022-07-22 16:15:40.797771+00"];
COMMIT; -- {"xid":492,"lsn":"0/24D5F48","timestamp":"2024-05-08 11:10:09.243006+0000"}
-- KEEPALIVE {"lsn":"0/24D5F48","timestamp":"2024-05-08 11:10:09.243014+0000"}
-- ENDPOS {"lsn":"0/24D5F48"}
COMMIT; -- {"xid":492,"lsn":"0/24D5EA8","timestamp":"2024-06-28 03:23:06.960791+0000"}
-- KEEPALIVE {"lsn":"0/24D5EA8","timestamp":"2024-06-28 03:23:06.960801+0000"}
-- ENDPOS {"lsn":"0/24D5EA8"}
Loading
Loading
0