From ec59f251fe8f29a358bf990dc13f4482fa4b35f2 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 4 Jun 2025 21:49:58 -0400 Subject: [PATCH 1/3] Add Kafka shared consumer container support - New AbstractShareKafkaMessageListenerContainer base class with lifecycle management - ShareKafkaMessageListenerContainer implementation for share consumer protocol - Integration tests for end-to-end message delivery validation Signed-off-by: Soby Chacko --- ...actShareKafkaMessageListenerContainer.java | 249 ++++++++++++++++++ .../ShareKafkaMessageListenerContainer.java | 233 ++++++++++++++++ ...sageListenerContainerIntegrationTests.java | 125 +++++++++ 3 files changed, 607 insertions(+) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java new file mode 100644 index 0000000000..cd39e8eaf2 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java @@ -0,0 +1,249 @@ +/* + * Copyright 2025-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; + +import org.springframework.beans.BeanUtils; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanNameAware; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.core.ShareConsumerFactory; +import org.springframework.kafka.support.TopicPartitionOffset; +import org.springframework.util.Assert; + +/** + * Abstract base class for share consumer message listener containers. + *

+ * Handles common lifecycle, configuration, and event publishing for containers using a + * {@link org.springframework.kafka.core.ShareConsumerFactory}. + *

+ * Subclasses are responsible for implementing the actual consumer loop and message dispatch logic. + * + * @param the key type + * @param the value type + * + * @author Soby Chacko + */ +public abstract class AbstractShareKafkaMessageListenerContainer + implements GenericMessageListenerContainer, BeanNameAware, ApplicationEventPublisherAware, + ApplicationContextAware { + + /** + * The default {@link org.springframework.context.SmartLifecycle} phase for listener containers. + */ + public static final int DEFAULT_PHASE = Integer.MAX_VALUE - 100; + + @NonNull + protected final ShareConsumerFactory shareConsumerFactory; + + protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); + + private final ContainerProperties containerProperties; + + protected final ReentrantLock lifecycleLock = new ReentrantLock(); + + @NonNull + private String beanName = "noBeanNameSet"; + + @Nullable + private ApplicationEventPublisher applicationEventPublisher; + + private boolean autoStartup = true; + + private int phase = DEFAULT_PHASE; + + @Nullable + private ApplicationContext applicationContext; + + private volatile boolean running = false; + + /** + * Construct an instance with the provided factory and properties. + * @param shareConsumerFactory the factory. + * @param containerProperties the properties. + */ + @SuppressWarnings("unchecked") + protected AbstractShareKafkaMessageListenerContainer(@Nullable ShareConsumerFactory shareConsumerFactory, + ContainerProperties containerProperties) { + Assert.notNull(containerProperties, "'containerProperties' cannot be null"); + Assert.notNull(shareConsumerFactory, "'shareConsumerFactory' cannot be null"); + this.shareConsumerFactory = (ShareConsumerFactory) shareConsumerFactory; + @Nullable String @Nullable [] topics = containerProperties.getTopics(); + if (topics != null) { + this.containerProperties = new ContainerProperties(topics); + } + else { + Pattern topicPattern = containerProperties.getTopicPattern(); + if (topicPattern != null) { + this.containerProperties = new ContainerProperties(topicPattern); + } + else { + @Nullable TopicPartitionOffset @Nullable [] topicPartitions = containerProperties.getTopicPartitions(); + if (topicPartitions != null) { + this.containerProperties = new ContainerProperties(topicPartitions); + } + else { + throw new IllegalStateException("topics, topicPattern, or topicPartitions must be provided"); + } + } + } + BeanUtils.copyProperties(containerProperties, this.containerProperties, + "topics", "topicPartitions", "topicPattern"); + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + @Nullable + public ApplicationContext getApplicationContext() { + return this.applicationContext; + } + + @Override + public void setBeanName(String name) { + this.beanName = name; + } + + /** + * Return the bean name. + * @return the bean name + */ + public String getBeanName() { + return this.beanName; + } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + + /** + * Get the event publisher. + * @return the publisher + */ + @Nullable + public ApplicationEventPublisher getApplicationEventPublisher() { + return this.applicationEventPublisher; + } + + @Override + public boolean isAutoStartup() { + return this.autoStartup; + } + + @Override + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + + @Override + public int getPhase() { + return this.phase; + } + + public void setPhase(int phase) { + this.phase = phase; + } + + @Override + public void stop(Runnable callback) { + stop(); + callback.run(); + } + + @Override + public void start() { + this.lifecycleLock.lock(); + try { + if (!isRunning()) { + Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener, + () -> "A " + GenericMessageListener.class.getName() + " implementation must be provided"); + doStart(); + } + } + finally { + this.lifecycleLock.unlock(); + } + } + + @Override + public void stop() { + this.lifecycleLock.lock(); + try { + if (isRunning()) { + doStop(); + } + } + finally { + this.lifecycleLock.unlock(); + } + } + + @Override + public boolean isRunning() { + return this.running; + } + + protected void setRunning(boolean running) { + this.running = running; + } + + @Override + public ContainerProperties getContainerProperties() { + return this.containerProperties; + } + + @Override + @Nullable + public String getGroupId() { + return this.containerProperties.getGroupId() == null + ? (String) this.shareConsumerFactory.getConfigurationProperties().get(ConsumerConfig.GROUP_ID_CONFIG) + : this.containerProperties.getGroupId(); + } + + @Override + public String getListenerId() { + return this.beanName; // the container factory sets the bean name to the id attribute + } + + @Override + public void setupMessageListener(Object messageListener) { + this.containerProperties.setMessageListener(messageListener); + } + + protected abstract void doStart(); + + protected abstract void doStop(); + + @Override + public void destroy() { + stop(); + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java new file mode 100644 index 0000000000..05c73c2aed --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java @@ -0,0 +1,233 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; + +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.jspecify.annotations.Nullable; + +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.core.log.LogAccessor; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.kafka.core.ShareConsumerFactory; +import org.springframework.kafka.event.ConsumerStartedEvent; +import org.springframework.kafka.event.ConsumerStartingEvent; +import org.springframework.util.Assert; + +/** + * {@code ShareKafkaMessageListenerContainer} is a message listener container for Kafka's share consumer model. + *

+ * This container manages a single-threaded consumer loop using a {@link org.springframework.kafka.core.ShareConsumerFactory}. + * It is designed for use cases where Kafka's cooperative sharing protocol is desired, and provides a simple polling loop + * with per-record dispatch and acknowledgement. + *

+ * Lifecycle events are published for consumer starting and started. The container supports direct setting of the client.id. + * + * @param the key type + * @param the value type + * + * @author Soby Chacko + */ +public class ShareKafkaMessageListenerContainer + extends AbstractShareKafkaMessageListenerContainer { + + @Nullable + private String clientId; + + @SuppressWarnings("NullAway.Init") + private volatile ShareListenerConsumer listenerConsumer; + + @SuppressWarnings("NullAway.Init") + private volatile CompletableFuture listenerConsumerFuture; + + private volatile CountDownLatch startLatch = new CountDownLatch(1); + + /** + * Construct an instance with the supplied configuration properties. + * @param shareConsumerFactory the share consumer factory + * @param containerProperties the container properties + */ + public ShareKafkaMessageListenerContainer(ShareConsumerFactory shareConsumerFactory, + ContainerProperties containerProperties) { + super(shareConsumerFactory, containerProperties); + Assert.notNull(shareConsumerFactory, "A ShareConsumerFactory must be provided"); + } + + /** + * Set the {@code client.id} to use for the consumer. + * @param clientId the client id to set + */ + public void setClientId(String clientId) { + this.clientId = clientId; + } + + /** + * Get the {@code client.id} for the consumer. + * @return the client id, or null if not set + */ + @Nullable + public String getClientId() { + return this.clientId; + } + + @Override + public boolean isInExpectedState() { + return isRunning(); + } + + @Override + public Map> metrics() { + ShareListenerConsumer listenerConsumerForMetrics = this.listenerConsumer; + if (listenerConsumerForMetrics != null) { + Map metrics = listenerConsumerForMetrics.consumer.metrics(); + return Collections.singletonMap(listenerConsumerForMetrics.getClientId(), metrics); + } + return Collections.emptyMap(); + } + + @Override + protected void doStart() { + if (isRunning()) { + return; + } + ContainerProperties containerProperties = getContainerProperties(); + Object messageListener = containerProperties.getMessageListener(); + AsyncTaskExecutor consumerExecutor = containerProperties.getListenerTaskExecutor(); + if (consumerExecutor == null) { + consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-"); + containerProperties.setListenerTaskExecutor(consumerExecutor); + } + GenericMessageListener listener = (GenericMessageListener) messageListener; + Assert.state(listener != null, "'messageListener' cannot be null"); + this.listenerConsumer = new ShareListenerConsumer(listener); + setRunning(true); + this.listenerConsumerFuture = CompletableFuture.runAsync(this.listenerConsumer, consumerExecutor); + } + + @Override + protected void doStop() { + setRunning(false); + // The consumer will exit its loop naturally when running becomes false. + } + + private void publishConsumerStartingEvent() { + this.startLatch.countDown(); + ApplicationEventPublisher publisher = getApplicationEventPublisher(); + if (publisher != null) { + publisher.publishEvent(new ConsumerStartingEvent(this, this)); + } + } + + private void publishConsumerStartedEvent() { + ApplicationEventPublisher publisher = getApplicationEventPublisher(); + if (publisher != null) { + publisher.publishEvent(new ConsumerStartedEvent(this, this)); + } + } + + /** + * The inner share consumer thread: polls for records and dispatches to the listener. + */ + private class ShareListenerConsumer implements Runnable { + + private final LogAccessor logger = ShareKafkaMessageListenerContainer.this.logger; + + private final ShareConsumer consumer; + + private final GenericMessageListener genericListener; + + private final @Nullable String consumerGroupId = ShareKafkaMessageListenerContainer.this.getGroupId(); + + private final @Nullable String clientId; + + ShareListenerConsumer(GenericMessageListener listener) { + this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer( + ShareKafkaMessageListenerContainer.this.getGroupId(), + ShareKafkaMessageListenerContainer.this.getClientId()); + this.genericListener = listener; + this.clientId = ShareKafkaMessageListenerContainer.this.getClientId(); + // Subscribe to topics, just like in the test + ContainerProperties containerProperties = getContainerProperties(); + this.consumer.subscribe(java.util.Arrays.asList(containerProperties.getTopics())); + } + + @Nullable + String getClientId() { + return this.clientId; + } + + @Override + public void run() { + initialize(); + Throwable exitThrowable = null; + while (isRunning()) { + try { + var records = this.consumer.poll(java.time.Duration.ofMillis(1000)); + if (records != null && records.count() > 0) { + for (var record : records) { + @SuppressWarnings("unchecked") + GenericMessageListener listener = (GenericMessageListener) this.genericListener; + listener.onMessage(record); + // Temporarily auto-acknowledge and commit. + // We will refactor it later on to support more production-like scenarios. + this.consumer.acknowledge(record, AcknowledgeType.ACCEPT); + this.consumer.commitSync(); + } + } + } + catch (Error e) { + this.logger.error(e, "Stopping share consumer due to an Error"); + throw e; + } + catch (Exception e) { + if (e.getCause() instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + this.logger.error(e, "Error in share consumer poll loop"); + exitThrowable = e; + break; + } + } + if (exitThrowable != null) { + this.logger.error(exitThrowable, "ShareListenerConsumer exiting due to error"); + } + this.consumer.close(); + this.logger.info(() -> this.consumerGroupId + ": Consumer stopped"); + } + + protected void initialize() { + publishConsumerStartingEvent(); + publishConsumerStartedEvent(); + } + + @Override + public String toString() { + return "ShareKafkaMessageListenerContainer.ShareListenerConsumer [" + + "consumerGroupId=" + this.consumerGroupId + + ", clientId=" + this.clientId + + "]"; + } + } +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java new file mode 100644 index 0000000000..c475ab788a --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java @@ -0,0 +1,125 @@ +/* + * Copyright 2025-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; + +import org.springframework.kafka.core.DefaultShareConsumerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Basic tests for {@link ShareKafkaMessageListenerContainer}. + * + * @author Soby Chacko + * @since 4.0 + */ +@EmbeddedKafka( + topics = {"share-listener-integration-test"}, partitions = 1, + brokerProperties = { + "unstable.api.versions.enable=true", + "group.coordinator.rebalance.protocols=classic,share", + "share.coordinator.state.topic.replication.factor=1", + "share.coordinator.state.topic.min.isr=1" + } +) +class ShareKafkaMessageListenerContainerIntegrationTests { + + @Test + void integrationTestShareKafkaMessageListenerContainer(EmbeddedKafkaBroker broker) throws Exception { + final String topic = "share-listener-integration-test"; + final String groupId = "shareListenerGroup"; + String bootstrapServers = broker.getBrokersAsString(); + + // Produce a record + var producerProps = new java.util.Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + try (var producer = new KafkaProducer(producerProps)) { + producer.send(new ProducerRecord<>(topic, "key", "integration-test-value")).get(); + } + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + var consumerProps = new java.util.HashMap(); + consumerProps.put("bootstrap.servers", bootstrapServers); + consumerProps.put("key.deserializer", StringDeserializer.class); + consumerProps.put("value.deserializer", StringDeserializer.class); + consumerProps.put("group.id", groupId); + + DefaultShareConsumerFactory consumerFactory = new DefaultShareConsumerFactory<>(consumerProps); + ContainerProperties containerProps = new ContainerProperties(topic); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference received = new AtomicReference<>(); + containerProps.setMessageListener((MessageListener) record -> { + received.set(record.value()); + latch.countDown(); + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(consumerFactory, containerProps); + container.setBeanName("integrationTestShareKafkaMessageListenerContainer"); + container.start(); + + try { + assertThat(latch.await(10, java.util.concurrent.TimeUnit.SECONDS) + && "integration-test-value".equals(received.get())) + .as("Message should be received and have expected value") + .isTrue(); + } + finally { + container.stop(); + } + } + + /** + * Sets the share.auto.offset.reset group config to earliest for the given groupId, + * using the provided bootstrapServers. + */ + private static void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception { + Map adminProperties = new HashMap<>(); + adminProperties.put("bootstrap.servers", bootstrapServers); + ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest"); + AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET); + Map> configs = Map.of( + new ConfigResource(ConfigResource.Type.GROUP, groupId), List.of(op)); + try (Admin admin = AdminClient.create(adminProperties)) { + admin.incrementalAlterConfigs(configs).all().get(); + } + } + +} From f06ef14c5a1ceb7deab9fa35aeef437977073c71 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 5 Jun 2025 13:16:56 -0400 Subject: [PATCH 2/3] Addressing PR review --- ...ractShareKafkaMessageListenerContainer.java | 4 +--- .../ShareKafkaMessageListenerContainer.java | 18 ++++++++++++++---- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java index cd39e8eaf2..fbf2944a68 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java @@ -21,7 +21,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; import org.springframework.beans.BeanUtils; @@ -48,6 +47,7 @@ * @param the value type * * @author Soby Chacko + * @since 4.0 */ public abstract class AbstractShareKafkaMessageListenerContainer implements GenericMessageListenerContainer, BeanNameAware, ApplicationEventPublisherAware, @@ -58,7 +58,6 @@ public abstract class AbstractShareKafkaMessageListenerContainer */ public static final int DEFAULT_PHASE = Integer.MAX_VALUE - 100; - @NonNull protected final ShareConsumerFactory shareConsumerFactory; protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); @@ -67,7 +66,6 @@ public abstract class AbstractShareKafkaMessageListenerContainer protected final ReentrantLock lifecycleLock = new ReentrantLock(); - @NonNull private String beanName = "noBeanNameSet"; @Nullable diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java index 05c73c2aed..bdb9b41849 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java @@ -16,6 +16,7 @@ package org.springframework.kafka.listener; +import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -49,10 +50,13 @@ * @param the value type * * @author Soby Chacko + * @since 4.0 */ public class ShareKafkaMessageListenerContainer extends AbstractShareKafkaMessageListenerContainer { + private static final int POLL_TIMEOUT = 1000; + @Nullable private String clientId; @@ -166,11 +170,12 @@ private class ShareListenerConsumer implements Runnable { this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer( ShareKafkaMessageListenerContainer.this.getGroupId(), ShareKafkaMessageListenerContainer.this.getClientId()); + this.genericListener = listener; this.clientId = ShareKafkaMessageListenerContainer.this.getClientId(); // Subscribe to topics, just like in the test ContainerProperties containerProperties = getContainerProperties(); - this.consumer.subscribe(java.util.Arrays.asList(containerProperties.getTopics())); + this.consumer.subscribe(Arrays.asList(containerProperties.getTopics())); } @Nullable @@ -184,7 +189,7 @@ public void run() { Throwable exitThrowable = null; while (isRunning()) { try { - var records = this.consumer.poll(java.time.Duration.ofMillis(1000)); + var records = this.consumer.poll(java.time.Duration.ofMillis(POLL_TIMEOUT)); if (records != null && records.count() > 0) { for (var record : records) { @SuppressWarnings("unchecked") @@ -199,6 +204,7 @@ public void run() { } catch (Error e) { this.logger.error(e, "Stopping share consumer due to an Error"); + wrapUp(); throw e; } catch (Exception e) { @@ -213,8 +219,7 @@ public void run() { if (exitThrowable != null) { this.logger.error(exitThrowable, "ShareListenerConsumer exiting due to error"); } - this.consumer.close(); - this.logger.info(() -> this.consumerGroupId + ": Consumer stopped"); + wrapUp(); } protected void initialize() { @@ -222,6 +227,11 @@ protected void initialize() { publishConsumerStartedEvent(); } + private void wrapUp(){ + this.consumer.close(); + this.logger.info(() -> this.consumerGroupId + ": Consumer stopped"); + } + @Override public String toString() { return "ShareKafkaMessageListenerContainer.ShareListenerConsumer [" From 4c1d5e1405f945120a4c1017c648250abc07df75 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Fri, 6 Jun 2025 11:15:20 -0400 Subject: [PATCH 3/3] checkstyle --- .../kafka/listener/ShareKafkaMessageListenerContainer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java index bdb9b41849..ae26d7b982 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java @@ -227,7 +227,7 @@ protected void initialize() { publishConsumerStartedEvent(); } - private void wrapUp(){ + private void wrapUp() { this.consumer.close(); this.logger.info(() -> this.consumerGroupId + ": Consumer stopped"); }