-
Notifications
You must be signed in to change notification settings - Fork 2
Add retention by time and size [INK-251] #325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
9d3f261
to
f6952ed
Compare
d54decb
to
5158167
Compare
107f5e1
to
8ad45cd
Compare
8ad45cd
to
059cfd7
Compare
long bytesDeleted = 0; | ||
|
||
// Enforce the size retention. | ||
if (request.retentionBytes() >= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we include logInfo.byteSize > request.retentionBytes()
as a condition before starting to check each batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A good shortcut, yes! I'll update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Btw on the SQL side this is implicitly done by the combination of WHERE
and LIMIT
} | ||
} | ||
|
||
// Enforce the time retention. | ||
if (request.retentionMs() >= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could have a similar early validation here by adding the oldest batch max timestamp to the log info
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In contrast to size, the oldest batch max timestamp is known right away as we start looking at the batches (setting aside rare situations where a rogue batch in the middle has much older timestamp than its neighbors). So probably this shortcut won't give us anything
logInfo.byteSize -= bytesDeleted; | ||
if (coordinates.isEmpty()) { | ||
logInfo.logStartOffset = logInfo.highWatermark; | ||
assert logInfo.byteSize == 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: given that assertions come disabled by default, should we instead throw a runtime exception here? or suggest to enable assertions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expected it to work only in tests, not in prod run time.
But thinking again: InMemoryControlPlane
is not a prod control plane anyway, we can do whatever loud failure with it. I've replaced this with a proper if
.
final LogConfig topicConfig = logConfigCache.get(partition.topicPartition(), | ||
tp -> LogConfig.fromProps(metadataView.getDefaultConfig(), metadataView.getTopicConfig(tp.topic()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the metadata view already a cached view of the metadata on KRaft? I wonder if we could use it directly instead of adding a cache dependency here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is cached, but the problem is LogConfig
instantiation. Like here 847798c but not from the memory pressure point of view, but from that we're doing quite a bit of checks and validation inside.
We of course can hand-rewrite them here, too, but that's starting being a bit fragile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After offline discussion, I removed the cache. Left only a map to prevent multiple instantiations during a single run.
/** | ||
* The class responsible for scheduling per partition retention enforcement. | ||
*/ | ||
class RetentionEnforcementScheduler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe worth adding a documentation note on how this scheduler is expected to behave on a distributed environment, e.g. there's no coordination and the scheduling times are randomized to avoid collision, what would happen on collision (probably nothing as control plane handles the concurrency), etc.
@@ -411,6 +412,8 @@ class ReplicaManager(val config: KafkaConfig, | |||
|
|||
// Inkless threads | |||
inklessSharedState.map { sharedState => | |||
scheduler.schedule("inkless-retention-enforcer", () => inklessRetentionEnforcer.foreach(_.run()), 500L, 500L) // the real interval is inside |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could start using LOG_INITIAL_TASK_DELAY_MS_DEFAULT
as initial delay.
About the frequency, does it need to be this small?
What if we piggyback on log.retention.check.interval.ms
(default 5 min)?
|
||
l_base_offset_of_first_batch_to_keep = NULL; | ||
|
||
IF l_request.retention_bytes >= 0 OR l_request.retention_ms >= 0 THEN |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a similar suggestion as in-memory to use log info to have an early quick check if the retention check is needed.
Replace assert with proper if
Pre-check log size
da4c4bb
to
994fb1f
Compare
Remove `LogConfig` cache. Leave only a map for prevent multiple instantiations in a single run.
This PR adds retention by time and size (
retention.ms
,retention.bytes
) to the both control planes and its support on the broker side.Note to the reviewer: I tried to decompose this into logical separate commits, should be easier to review step-by-step.