8000 Incorrect behaviour observed with Partial batch acknowledgment method: acknowledgment.acknowledge(index) · Issue #3983 · spring-projects/spring-kafka · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
Incorrect behaviour observed with Partial batch acknowledgment method: acknowledgment.acknowledge(index) #3983
Open
@HimanshuLakra

Description

@HimanshuLakra

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.3.0

Describe the bug

Configs:
enable.auto.commit = false
AckMode: AckMode.MANUAL_IMMEDIATE

When I partially acknowledge a batch of messages in a partition, the next poll doesn't resume from the expected offset (last committed offset + 1). Instead, it acts as if the entire previous batch was acknowledged, and I receive messages from the batch after that.

Please refer below logs:

{"timestamp":"2025-06-21T18:30:15.060248Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"INFO","serviceArchPath":"NA","process":"NA","message":"Message Received with BatchId: 03c1157a-d5d2-4624-801b-a7f0e0ad8e76, batchIndex: 0, Offset: 76783117, Partition: 23"}
{"timestamp":"2025-06-21T18:30:15.060354Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"INFO","serviceArchPath":"NA","process":"NA","message":"Message Received with BatchId: 03c1157a-d5d2-4624-801b-a7f0e0ad8e76, batchIndex: 1, Offset: 76783118, Partition: 23"}
{"timestamp":"2025-06-21T18:30:15.060463Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"INFO","serviceArchPath":"NA","process":"NA","message":"Message Received with BatchId: 03c1157a-d5d2-4624-801b-a7f0e0ad8e76, batchIndex: 2, Offset: 76783119, Partition: 23"}
{"timestamp":"2025-06-21T18:30:15.061737Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"WARN","serviceArchPath":"NA","process":"NA","message":"Acknowledging messages upto index: 0 for batch id: 03c1157a-d5d2-4624-801b-a7f0e0ad8e76 "}
{"timestamp":"2025-06-21T18:30:20.318048Z","logger_name":"o.s.k.l.KafkaMessageListenerContainer","thread_name":"test_topic_listener-0-C-1","level":"DEBUG","serviceArchPath":"NA","process":"NA","message":"Committing: {test_topic-23=OffsetAndMetadata{offset=76783118, leaderEpoch=null, metadata=''}}"}
{"timestamp":"2025-06-21T18:30:20.320048Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"WARN","serviceArchPath":"NA","process":"NA","message":"Acknowledged messages upto index: 0 for batch id: 03c1157a-d5d2-4624-801b-a7f0e0ad8e76 "}


{"timestamp":"2025-06-21T18:30:20.475265Z","logger_name":"o.s.k.l.KafkaMessageListenerContainer","thread_name":"test_topic_listener-0-C-1","level":"DEBUG","serviceArchPath":"NA","process":"NA","message":"Received: 3 records"}
{"timestamp":"2025-06-21T18:30:20.475736Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"INFO","serviceArchPath":"NA","process":"NA","message":"Message Received with BatchId: 255d2ab5-3a41-4ec7-9359-ae8478c1d9c0, batchIndex: 0, Offset: 76783120, Partition: 23"}
{"timestamp":"2025-06-21T18:30:20.475843Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"INFO","serviceArchPath":"NA","process":"NA","message":"Message Received with BatchId: 255d2ab5-3a41-4ec7-9359-ae8478c1d9c0, batchIndex: 1, Offset: 76783121, Partition: 23"}
{"timestamp":"2025-06-21T18:30:20.475888Z","logger_name":"c.a.d.c.k.l.KafkaMessageBatchManualModeListener","thread_name":"test_topic_listener-0-C-1","level":"INFO","serviceArchPath":"NA","process":"NA","message":"Message Received with BatchId: 255d2ab5-3a41-4ec7-9359-ae8478c1d9c0, batchIndex: 2, Offset: 76783122, Partition: 23"}

Let me know if more information is required from my end.

To Reproduce

  1. Acknowledge partial batch using acknowledgment.acknowledge(index) function for a partition.
  2. Check next batch first message offset

Expected behavior

The messages in the next poll should have started from the committed offset(of partial acknowledge) + 1.

Sample

Sample code of listener
public class KafkaMessageBatchManualModeListener implements BatchAcknowledgingMessageListener<String, Object>

@Override
public void onMessage(List<ConsumerRecord<String, Object>> data, Acknowledgment acknowledgment) {

    if(CollectionUtils.isEmpty(data)) {
        return;
    }

    Map<Integer, CompletableFuture<EventStatus>> futures = new LinkedHashMap<>();
    int recordIterationIndex = 0;
    String batchId = UUID.randomUUID().toString();

    try{
        for (ConsumerRecord<String, Object> record : data) {
            CompletableFuture<EventStatus> future = CompletableFuture.supplyAsync(
                    () -> {
                        return kafkaMessageProcessor.processMessage(createKafkaEventRecord(record, batchId));
                    }, kafkaFanoutExecutor
            );

            logger.info("Message Received with BatchId: {}, batchIndex: {}, Offset: {}, Partition: {}", batchId, recordIterationIndex, record.offset(), record.partition());
            futures.put(recordIterationIndex, future);
            recordIterationIndex += 1;
        }

        // Wait for completion of all futures
        waitForFuturesProcessing(futures);

        Integer ackIndex = getAcknowledgmentIndexForBatch(futures);

        // Acknowledge the messages
        if(ackIndex != null){
            logger.info("Acknowledging messages upto index:{} for batch id: {}", ackIndex, batchId);
            acknowledgment.acknowledge(ackIndex);
            logger.info("Acknowledged messages upto index:{} for batch id: {}", ackIndex, batchId);
        }else{
            logger.warn("No message to acknowledge for batch id: {} ", batchId);
        }
    }catch(Exception exception){
        logger.error("Error while listening message in KafkaMessageBatchManualModeListener", exception);
    }

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      0