8000 feat(sink): introduce exactly once iceberg sink by wcy-fdu · Pull Request #19771 · risingwavelabs/risingwave · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(sink): introduce exactly once iceberg sink #19771

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 72 commits into from
Mar 27, 2025

Conversation

wcy-fdu
Copy link
Contributor
@wcy-fdu wcy-fdu commented Dec 12, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

This pr introduce exactly once consistency semantics for iceberg sink.

Related RFC: Exactly Once File Sink

implementation detail

The implementation of this feature can be mainly divided into four parts:

  1. introduced a table that exists in the meta store.
  2. normal write process(when no recovery occurs):
  • Pre-commit the relevant metadata to the meta store before the Iceberg commit occurs. (Since the actual commit of Iceberg is completed by the coordinator after collecting all parallel tasks in the meta store, this is directly implemented within the coordinator.commit() interface.)
  • Immediately delete the written metadata after the Iceberg commit returns successfully. (Since those data is successfully written into iceberg, deleting metadata is safe).
  1. Recovery logic
    All logic is completed in the coordinator.init() interface when recovery occurs. This is because all memory states are cleared during recovery, and then the sink writer is built first, and then the sink writer builds the coordinator.
  • check whether the pre-commit metadata recorded in the meta store < the current epoch contains any uncommitted data, and do re_commit.
    • If yes, this means that the previous commit failed, and iceberg's repeated submission is idempotent, so re-commit.
    • If no, this means that the previous commit success, just skip.
  • Modify the log store rewind so that it can start rewinding from a certain epoch.
  • After re-commit or skip is completed, and pass the max_epoch called last_recommit_epoch in the code, is passed to CN through the existing StartCoordinationResponse for log store rewind. The rewind here is to avoid repeated consumption of the log store, because this batch of data must have been in the downstream iceberg at this time.

test logic

The testing part is still being improved and will be continuously updated in PR.
After some offline discussions, we decided to test it this way: simulate error injection through madsim, and then sink the same batch of source data into iceberg twice, showing two iceberg tables. One of them has errors injected, and the other does not. After comparing the two tables, they are exactly the same.

Notes:

  1. Since iceberg is currently hosted in RisingWave, the GC cycle is 2 weeks. This PR currently assumes that the original s3 file that failed to be committed to iceberg will not be GC before the next successful commit. In later pr, the re-commit step after recovery will be changed to rewind to the start epoch.
  2. All metadata deletions happen immediately after iceberg commit successfully. correct me if I am wrong.

Checklist

  • I have written necessary rustdoc comments.
  • I have added necessary unit tests and integration tests.
  • I have added test labels as necessary.
  • I have added fuzzing tests or opened an issue to track them.
  • My PR contains breaking changes.
  • My PR changes performance-critical code, so I will run (micro) benchmarks and present the results.
  • My PR contains critical fixes that are necessary to be merged into the latest release.

Documentation

  • My PR needs documentation updates.
Release note

@wcy-fdu wcy-fdu requested a review from a team as a code owner December 12, 2024 06:25
@wcy-fdu wcy-fdu requested a review from cyliu0 December 12, 2024 06:25
@wcy-fdu wcy-fdu marked this pull request as draft December 12, 2024 06:25
@wcy-fdu wcy-fdu changed the title feat(sink): introduce exactly once iceberg sink feat(sink): introduce exactly once iceberg sink[WIP] Dec 12, 2024
@wcy-fdu wcy-fdu requested review from hzxa21 and chenzl25 January 22, 2025 07:03
@wcy-fdu
Copy link
Contributor Author
wcy-fdu commented Jan 22, 2025

The chaos mesh test is still being improved. Please review the functional part first when you have time. @chenzl25 @wenym1

Copy link
Collaborator
@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some early comments

@@ -377,13 +381,13 @@ where
<S as risingwave_connector::sink::Sink>::Coordinator: std::marker::Send,
<S as risingwave_connector::sink::Sink>::Coordinator: 'static,
{
if let Ok(coordinator) = sink.new_coordinator().await {
if let Ok(coordinator) = sink.new_coordinator(DatabaseConnection::Disconnected).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will passing DatabaseConnection::Disconnected cause the iceberg sink bench to fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iceberg sink will only write to the system table in one case, that is, is_exactly_once = true is set in the create sink statement. Otherwise, this DatabaseConnection will not be used.

@wcy-fdu wcy-fdu changed the title feat(sink): introduce exactly once iceberg sink[WIP] feat(sink): introduce exactly once iceberg sink Mar 3, 2025
@wcy-fdu wcy-fdu marked this pull request as ready for review March 3, 2025 05:55
@wcy-fdu wcy-fdu enabled auto-merge March 25, 2025 06:42
@graphite-app graphite-app bot requested a review from a team March 25, 2025 07:15
@wcy-fdu wcy-fdu disabled auto-merge March 25, 2025 07:57
wcy-fdu and others added 12 commits March 25, 2025 19:28
@wcy-fdu wcy-fdu enabled auto-merge March 27, 2025 06:22
@wcy-fdu wcy-fdu added this pull request to the merge queue Mar 27, 2025
Merged via the queue into main with commit 7f08c38 Mar 27, 2025
47 of 48 checks passed
@wcy-fdu wcy-fdu deleted the wcy/exactlt_once_iceberg_sink.pr branch March 27, 2025 07:38
@lmatz lmatz added the user-facing-changes Contains changes that are visible to users label May 27, 2025
@lmatz lmatz removed the user-facing-changes Contains changes that are visible to users label May 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants
0