Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.0
Describe the bug
Configs:
enable.auto.commit = false
AckMode: AckMode.MANUAL_IMMEDIATE
When I partially acknowledge a batch of messages in a partition, the next poll doesn't resume from the expected offset (last committed offset + 1). Instead, it acts as if the entire previous batch was acknowledged, and I receive messages from the batch after that.
Please refer below logs:
{"timestamp":"2025-06-21T18:30:15.060248Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"INFO","serviceArchPath":"NA","process":"NA","message":"Message Received with BatchId: 03c1157a-d5d2-4624-801b-a7f0e0ad8e76, batchIndex: 0, Offset: 76783117, Partition: 23"}
{"timestamp":"2025-06-21T18:30:15.060354Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"INFO","serviceArchPath":"NA","process":"NA","message":"Message Received with BatchId: 03c1157a-d5d2-4624-801b-a7f0e0ad8e76, batchIndex: 1, Offset: 76783118, Partition: 23"}
{"timestamp":"2025-06-21T18:30:15.060463Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"INFO","serviceArchPath":"NA","process":"NA","message":"Message Received with BatchId: 03c1157a-d5d2-4624-801b-a7f0e0ad8e76, batchIndex: 2, Offset: 76783119, Partition: 23"}
{"timestamp":"2025-06-21T18:30:15.061737Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"WARN","serviceArchPath":"NA","process":"NA","message":"Acknowledging messages upto index: 0 for batch id: 03c1157a-d5d2-4624-801b-a7f0e0ad8e76 "}
{"timestamp":"2025-06-21T18:30:20.318048Z","logger_name":"o.s.k.l.KafkaMessageListenerContainer","thread_name":"test_topic_listener-0-C-1","level":"DEBUG","serviceArchPath":"NA","process":"NA","message":"Committing: {test_topic-23=OffsetAndMetadata{offset=76783118, leaderEpoch=null, metadata=''}}"}
{"timestamp":"2025-06-21T18:30:20.320048Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"WARN","serviceArchPath":"NA","process":"NA","message":"Acknowledged messages upto index: 0 for batch id: 03c1157a-d5d2-4624-801b-a7f0e0ad8e76 "}
{"timestamp":"2025-06-21T18:30:20.475265Z","logger_name":"o.s.k.l.KafkaMessageListenerContainer","thread_name":"test_topic_listener-0-C-1","level":"DEBUG","serviceArchPath":"NA","process":"NA","message":"Received: 3 records"}
{"timestamp":"2025-06-21T18:30:20.475736Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"INFO","serviceArchPath":"NA","process":"NA","message":"Message Received with BatchId: 255d2ab5-3a41-4ec7-9359-ae8478c1d9c0, batchIndex: 0, Offset: 76783120, Partition: 23"}
{"timestamp":"2025-06-21T18:30:20.475843Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"INFO","serviceArchPath":"NA","process":"NA","message":"Message Received with BatchId: 255d2ab5-3a41-4ec7-9359-ae8478c1d9c0, batchIndex: 1, Offset: 76783121, Partition: 23"}
{"timestamp":"2025-06-21T18:30:20.475888Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"INFO","serviceArchPath":"NA","process":"NA","message":"Message Received with BatchId: 255d2ab5-3a41-4ec7-9359-ae8478c1d9c0, batchIndex: 2, Offset: 76783122, Partition: 23"}
Let me know if more information is required from my end.
To Reproduce
- Acknowledge partial batch using acknowledgment.acknowledge(index) function for a partition.
- Check next batch first message offset
Expected behavior
The messages in the next poll should have started from the committed offset(of partial acknowledge) + 1.
Sample
Sample code of listener
public class KafkaMessageBatchManualModeListener implements BatchAcknowledgingMessageListener<String, Object>
@Override
public void onMessage(List<ConsumerRecord<String, Object>> data, Acknowledgment acknowledgment) {
if(CollectionUtils.isEmpty(data)) {
return;
}
Map<Integer, CompletableFuture<EventStatus>> futures = new LinkedHashMap<>();
int recordIterationIndex = 0;
String batchId = UUID.randomUUID().toString();
try{
for (ConsumerRecord<String, Object> record : data) {
CompletableFuture<EventStatus> future = CompletableFuture.supplyAsync(
() -> {
return kafkaMessageProcessor.processMessage(createKafkaEventRecord(record, batchId));
}, kafkaFanoutExecutor
);
logger.info("Message Received with BatchId: {}, batchIndex: {}, Offset: {}, Partition: {}", batchId, recordIterationIndex, record.offset(), record.partition());
futures.put(recordIterationIndex, future);
recordIterationIndex += 1;
}
// Wait for completion of all futures
waitForFuturesProcessing(futures);
Integer ackIndex = getAcknowledgmentIndexForBatch(futures);
// Acknowledge the messages
if(ackIndex != null){
logger.info("Acknowledging messages upto index:{} for batch id: {}", ackIndex, batchId);
acknowledgment.acknowledge(ackIndex);
logger.info("Acknowledged messages upto index:{} for batch id: {}", ackIndex, batchId);
}else{
logger.warn("No message to acknowledge for batch id: {} ", batchId);
}
}catch(Exception exception){
logger.error("Error while listening message in KafkaMessageBatchManualModeListener", exception);
}