8000 Add retention by time and size [INK-251] by ivanyu · Pull Request #325 · aiven/inkless · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add retention by time and size [INK-251] #325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

ivanyu
Copy link
Member
@ivanyu ivanyu commented Jun 16, 2025

This PR adds retention by time and size (retention.ms, retention.bytes) to the both control planes and its support on the broker side.

Note to the reviewer: I tried to decompose this into logical separate commits, should be easier to review step-by-step.

@ivanyu ivanyu force-pushed the ivanyu/ink-251-retention branch 9 times, most recently from 9d3f261 to f6952ed Compare June 16, 2025 12:40
@ivanyu ivanyu force-pushed the ivanyu/ink-251-retention branch 3 times, most recently from d54decb to 5158167 Compare June 16, 2025 14:46
@ivanyu ivanyu force-pushed the ivanyu/ink-251-retention branch 2 times, most recently from 107f5e1 to 8ad45cd Compare June 16, 2025 15:22
@ivanyu ivanyu marked this pull request as ready for review June 16, 2025 15:27
@ivanyu ivanyu requested a review from jeqo June 16, 2025 15:27
@ivanyu ivanyu force-pushed the ivanyu/ink-251-retention branch from 8ad45cd to 059cfd7 Compare June 16, 2025 15:55
long bytesDeleted = 0;

// Enforce the size retention.
if (request.retentionBytes() >= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include logInfo.byteSize > request.retentionBytes() as a condition before starting to check each batch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good shortcut, yes! I'll update

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Btw on the SQL side this is implicitly done by the combination of WHERE and LIMIT

}
}

// Enforce the time retention.
if (request.retentionMs() >= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could have a similar early validation here by adding the oldest batch max timestamp to the log info

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In contrast to size, the oldest batch max timestamp is known right away as we start looking at the batches (setting aside rare situations where a rogue batch in the middle has much older timestamp than its neighbors). So probably this shortcut won't give us anything

logInfo.byteSize -= bytesDeleted;
if (coordinates.isEmpty()) {
logInfo.logStartOffset = logInfo.highWatermark;
assert logInfo.byteSize == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: given that assertions come disabled by default, should we instead throw a runtime exception here? or suggest to enable assertions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expected it to work only in tests, not in prod run time.
But thinking again: InMemoryControlPlane is not a prod control plane anyway, we can do whatever loud failure with it. I've replaced this with a proper if.

Comment on lines 100 to 101
final LogConfig topicConfig = logConfigCache.get(partition.topicPartition(),
tp -> LogConfig.fromProps(metadataView.getDefaultConfig(), metadataView.getTopicConfig(tp.topic())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the metadata view already a cached view of the metadata on KRaft? I wonder if we could use it directly instead of adding a cache dependency here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is cached, but the problem is LogConfig instantiation. Like here 847798c but not from the memory pressure point of view, but from that we're doing quite a bit of checks and validation inside.
We of course can hand-rewrite them here, too, but that's starting being a bit fragile

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After offline discussion, I removed the cache. Left only a map to prevent multiple instantiations during a single run.

/**
* The class responsible for scheduling per partition retention enforcement.
*/
class RetentionEnforcementScheduler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth adding a documentation note on how this scheduler is expected to behave on a distributed environment, e.g. there's no coordination and the scheduling times are randomized to avoid collision, what would happen on collision (probably nothing as control plane handles the concurrency), etc.

@@ -411,6 +412,8 @@ class ReplicaManager(val config: KafkaConfig,

// Inkless threads
inklessSharedState.map { sharedState =>
scheduler.schedule("inkless-retention-enforcer", () => inklessRetentionEnforcer.foreach(_.run()), 500L, 500L) // the real interval is inside
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could start using LOG_INITIAL_TASK_DELAY_MS_DEFAULT as initial delay.
About the frequency, does it need to be this small?
What if we piggyback on log.retention.check.interval.ms (default 5 min)?


l_base_offset_of_first_batch_to_keep = NULL;

IF l_request.retention_bytes >= 0 OR l_request.retention_ms >= 0 THEN
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a similar suggestion as in-memory to use log info to have an early quick check if the retention check is needed.

@ivanyu ivanyu force-pushed the ivanyu/ink-251-retention branch from da4c4bb to 994fb1f Compare June 17, 2025 11:35
Remove `LogConfig` cache. Leave only a map for prevent multiple instantiations in a single run.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants
0