-
Notifications
You must be signed in to change notification settings - Fork 654
feat(sink): introduce exactly once iceberg sink #19771
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some early comments
@@ -377,13 +381,13 @@ where | |||
<S as risingwave_connector::sink::Sink>::Coordinator: std::marker::Send, | |||
<S as risingwave_connector::sink::Sink>::Coordinator: 'static, | |||
{ | |||
if let Ok(coordinator) = sink.new_coordinator().await { | |||
if let Ok(coordinator) = sink.new_coordinator(DatabaseConnection::Disconnected).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will passing DatabaseConnection::Disconnected
cause the iceberg sink bench to fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The iceberg sink will only write to the system table in one case, that is, is_exactly_once = true
is set in the create sink statement. Otherwise, this DatabaseConnection will not be used.
…/risingwavelabs/risingwave into wcy/exactlt_once_iceberg_sink.pr
…/risingwavelabs/risingwave into wcy/exactlt_once_iceberg_sink.pr
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
This pr introduce exactly once consistency semantics for iceberg sink.
Related RFC: Exactly Once File Sink
implementation detail
The implementation of this feature can be mainly divided into four parts:
coordinator.commit()
interface.)All logic is completed in the
coordinator.init()
interface when recovery occurs. This is because all memory states are cleared during recovery, and then the sink writer is built first, and then the sink writer builds the coordinator.re_commit
.max_epoch
calledlast_recommit_epoch
in the code, is passed to CN through the existingStartCoordinationResponse
for log store rewind. The rewind here is to avoid repeated consumption of the log store, because this batch of data must have been in the downstream iceberg at this time.test logic
The testing part is still being improved and will be continuously updated in PR.
After some offline discussions, we decided to test it this way: simulate error injection through madsim, and then sink the same batch of source data into iceberg twice, showing two iceberg tables. One of them has errors injected, and the other does not. After comparing the two tables, they are exactly the same.
Notes:
will not
be GC before the next successful commit. In later pr, there-commit
step after recovery will be changed to rewind to the start epoch.Checklist
Documentation
Release note