From 4fbb35b4c718b7654eb839297161016198f38907 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 12 Oct 2022 18:38:31 -0400 Subject: [PATCH 1/2] Consumer restart causes duplicate messages When we stop the consumer, there is a race condition that forces the messages to not get acknowledged. This results in duplicate messages on the restart of the consumer. This PR addresses the issue by waiting for the consumer thread to complete before stopping the consumer. Resolves https://github.com/spring-projects-experimental/spring-pulsar/issues/161 --- ...DefaultPulsarMessageListenerContainer.java | 130 +++++++++++------- .../core/ConsumerAcknowledgmentTests.java | 60 ++++++++ ...ltPulsarMessageListenerContainerTests.java | 1 + .../ObservationIntegrationTests.java | 8 -- 4 files changed, 139 insertions(+), 60 deletions(-) diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java index 8f14c077..587e021f 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java @@ -24,8 +24,8 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -74,7 +74,7 @@ */ public class DefaultPulsarMessageListenerContainer extends AbstractPulsarMessageListenerContainer { - private volatile Future listenerConsumerFuture; + private volatile CompletableFuture listenerConsumerFuture; private volatile Listener listenerConsumer; @@ -82,6 +82,10 @@ public class DefaultPulsarMessageListenerContainer extends AbstractPulsarMess private final AbstractPulsarMessageListenerContainer thisOrParentContainer; + private volatile Thread listenerConsumerThread; + + private volatile boolean receiveInProgress; + public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory pulsarConsumerFactory, PulsarContainerProperties pulsarContainerProperties) { this(pulsarConsumerFactory, pulsarContainerProperties, null); @@ -113,7 +117,7 @@ protected void doStart() { this.getObservationRegistry()); setRunning(true); this.startLatch = new CountDownLatch(1); - this.listenerConsumerFuture = consumerExecutor.submit(this.listenerConsumer); + this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer); try { if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), @@ -133,6 +137,15 @@ public void doStop() { setRunning(false); this.logger.info("Pausing this consumer."); this.listenerConsumer.consumer.pause(); + if (this.receiveInProgress) { + this.listenerConsumerThread.interrupt(); + } + try { + this.listenerConsumerThread.join(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } try { this.logger.info("Closing this consumer."); this.listenerConsumer.consumer.close(); @@ -286,6 +299,7 @@ public boolean isLongLived() { @Override public void run() { + DefaultPulsarMessageListenerContainer.this.listenerConsumerThread = Thread.currentThread(); publishConsumerStartingEvent(); publishConsumerStartedEvent(); AtomicBoolean inRetryMode = new AtomicBoolean(false); @@ -296,71 +310,83 @@ public void run() { // Always receive messages in batch mode. try { if (!inRetryMode.get() && !messagesPendingInBatch.get()) { + DefaultPulsarMessageListenerContainer.this.receiveInProgress = true; messages = this.consumer.batchReceive(); + DefaultPulsarMessageListenerContainer.this.receiveInProgress = false; } } catch (PulsarClientException e) { - DefaultPulsarMessageListenerContainer.this.logger.error(e, () -> "Error receiving messages."); - } - Assert.isTrue(messages != null, "Messages cannot be null."); - if (this.isBatchListener) { - if (!inRetryMode.get() && !messagesPendingInBatch.get()) { - messageList = new ArrayList<>(); - messages.forEach(messageList::add); + if (e.getCause() instanceof InterruptedException) { + DefaultPulsarMessageListenerContainer.this.logger.debug(e, () -> "Error receiving messages."); } - try { - if (messageList != null && messageList.size() > 0) { - if (this.batchMessageListener instanceof PulsarBatchAcknowledgingMessageListener) { - this.batchMessageListener.received(this.consumer, messageList, - this.ackMode.equals(AckMode.MANUAL) - ? new ConsumerBatchAcknowledgment(this.consumer) : null); - } - else { - this.batchMessageListener.received(this.consumer, messageList); - } - if (this.ackMode.equals(AckMode.BATCH)) { - try { - if (isSharedSubscriptionType()) { - this.consumer.acknowledge(messages); + else { + DefaultPulsarMessageListenerContainer.this.logger.error(e, () -> "Error receiving messages."); + } + messages = null; + } + + if (messages != null) { + if (this.isBatchListener) { + if (!inRetryMode.get() && !messagesPendingInBatch.get()) { + messageList = new ArrayList<>(); + messages.forEach(messageList::add); + } + try { + if (messageList != null && messageList.size() > 0) { + if (this.batchMessageListener instanceof PulsarBatchAcknowledgingMessageListener) { + this.batchMessageListener.received(this.consumer, messageList, + this.ackMode.equals(AckMode.MANUAL) + ? new ConsumerBatchAcknowledgment(this.consumer) : null); + } + else { + this.batchMessageListener.received(this.consumer, messageList); + } + if (this.ackMode.equals(AckMode.BATCH)) { + try { + if (isSharedSubscriptionType()) { + this.consumer.acknowledge(messages); + } + else { + final Stream> stream = StreamSupport + .stream(messages.spliterator(), true); + Message last = stream.reduce((a, b) -> b).orElse(null); + this.consumer.acknowledgeCumulative(last); + } } - else { - final Stream> stream = StreamSupport.stream(messages.spliterator(), - true); - Message last = stream.reduce((a, b) -> b).orElse(null); - this.consumer.acknowledgeCumulative(last); + catch (PulsarClientException pce) { + this.consumer.negativeAcknowledge(messages); } } - catch (PulsarClientException pce) { - this.consumer.negativeAcknowledge(messages); + if (this.pulsarConsumerErrorHandler != null) { + pendingMessagesHandledSuccessfully(inRetryMode, messagesPendingInBatch); } } + } + catch (Exception e) { if (this.pulsarConsumerErrorHandler != null) { - pendingMessagesHandledSuccessfully(inRetryMode, messagesPendingInBatch); + messageList = invokeBatchListenerErrorHandler(inRetryMode, messagesPendingInBatch, + messageList, e); + } + else { + // the whole batch is negatively acknowledged in the event + // of + // an exception from the handler method. + this.consumer.negativeAcknowledge(messages); } } } - catch (Exception e) { - if (this.pulsarConsumerErrorHandler != null) { - messageList = invokeBatchListenerErrorHandler(inRetryMode, messagesPendingInBatch, - messageList, e); - } - else { - // the whole batch is negatively acknowledged in the event of - // an exception from the handler method. - this.consumer.negativeAcknowledge(messages); + else { + for (Message message : messages) { + do { + newObservation(message) + .observe(() -> this.dispatchMessageToListener(message, inRetryMode)); + } + while (inRetryMode.get()); } - } - } - else { - for (Message message : messages) { - do { - newObservation(message).observe(() -> this.dispatchMessageToListener(message, inRetryMode)); + // All the records are processed at this point. Handle acks. + if (this.ackMode.equals(AckMode.BATCH)) { + handleAcks(messages); } - while (inRetryMode.get()); - } - // All the records are processed at this point. Handle acks. - if (this.ackMode.equals(AckMode.BATCH)) { - handleAcks(messages); } } } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/ConsumerAcknowledgmentTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/ConsumerAcknowledgmentTests.java index c57adeca..2dc35b00 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/ConsumerAcknowledgmentTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/ConsumerAcknowledgmentTests.java @@ -361,4 +361,64 @@ void testBatchNackForEntireBatchWhenUsingBatchListener() throws Exception { pulsarClient.close(); } + // ********************* + + @Test + void messagesAreProperlyAckdOnContainerStopBeforeExitingListenerThread() throws Exception { + Map config = new HashMap<>(); + config.put("topicNames", Set.of("duplicate-message-test")); + config.put("subscriptionName", "duplicate-sub-1"); + final PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); + final DefaultPulsarConsumerFactory pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>( + pulsarClient, config); + + PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties(); + final AtomicInteger counter1 = new AtomicInteger(0); + pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener) (consumer, msg) -> { + counter1.getAndIncrement(); + }); + pulsarContainerProperties.setSchema(Schema.STRING); + DefaultPulsarMessageListenerContainer container1 = new DefaultPulsarMessageListenerContainer<>( + pulsarConsumerFactory, pulsarContainerProperties); + container1.start(); + + Map prodConfig = new HashMap<>(); + prodConfig.put("topicName", "duplicate-message-test"); + final DefaultPulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>( + pulsarClient, prodConfig); + final PulsarTemplate pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); + pulsarTemplate.send("hello john doe"); + + while (counter1.get() == 0) { + // busy wait until counter1 is > 0 + } + // When we stop, if any acks are in progress, that should all be + // taken care of before exiting the listener thread, so that the + // next consumer under the same subscription will not receive the + // unacked message. + container1.stop(); + + final AtomicInteger counter2 = new AtomicInteger(0); + pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener) (consumer, msg) -> { + counter2.getAndIncrement(); + }); + pulsarContainerProperties.setSchema(Schema.STRING); + DefaultPulsarMessageListenerContainer container2 = new DefaultPulsarMessageListenerContainer<>( + pulsarConsumerFactory, pulsarContainerProperties); + container2.start(); + + pulsarTemplate.send("hello john doe"); + + while (counter2.get() == 0) { + // busy wait until counter2 > 0 + } + // Asserting that both consumers are only receiving the expected data. + assertThat(counter1.get()).isEqualTo(1); + assertThat(counter2.get()).isEqualTo(1); + + container2.stop(); + pulsarClient.close(); + } + } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java index a3c2427d..05683425 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java @@ -262,6 +262,7 @@ void deadLetterPolicy() throws Exception { // Normal consumer should receive 5 msg + 1 re-delivery assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); container.stop(); + dlqContainer.stop(); pulsarClient.close(); } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java index 593bab7e..10a462dd 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java @@ -93,14 +93,6 @@ public SampleTestRunnerConsumer yourCode() { assertThat(listen2Completed).withFailMessage( "Message %s not received in listen2 (latchesByMessageListen1 = %s and latchesByMessageListen2 = %s)", msg, listeners.latchesByMessageListen1, listeners.latchesByMessageListen2).isTrue(); - - // Without this sleep, the 2nd tracingSetup run sometimes fails due to - // messages from 1st run being - // delivered during the 2nd run. The test runs share the same listener - // config, including the - // same subscription names. Seems like the listener in run2 is getting - // message from run1. - Thread.sleep(5000); } List finishedSpans = bb.getFinishedSpans(); From dec3c2031e9d035f9ed9954f350abf646bfde3f8 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 19 Oct 2022 15:43:02 -0400 Subject: [PATCH 2/2] Addressing PR review comments --- ...DefaultPulsarMessageListenerContainer.java | 145 ++++++++++-------- .../core/ConsumerAcknowledgmentTests.java | 5 +- 2 files changed, 81 insertions(+), 69 deletions(-) diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java index 587e021f..62b11604 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java @@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -82,9 +83,9 @@ public class DefaultPulsarMessageListenerContainer extends AbstractPulsarMess private final AbstractPulsarMessageListenerContainer thisOrParentContainer; - private volatile Thread listenerConsumerThread; + private AtomicReference listenerConsumerThread; - private volatile boolean receiveInProgress; + private final AtomicBoolean receiveInProgress = new AtomicBoolean(); public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory pulsarConsumerFactory, PulsarContainerProperties pulsarContainerProperties) { @@ -137,14 +138,23 @@ public void doStop() { setRunning(false); this.logger.info("Pausing this consumer."); this.listenerConsumer.consumer.pause(); - if (this.receiveInProgress) { - this.listenerConsumerThread.interrupt(); - } - try { - this.listenerConsumerThread.join(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); + if (this.listenerConsumerThread != null) { + // if there is a receive operation already in progress, we want to interrupt + // the listener thread. + if (this.receiveInProgress.get()) { + // All the records received so far in the current batch receive will be + // re-delivered. + this.listenerConsumerThread.get().interrupt(); + } + // if there is something other than receive operations are in progress, + // such as ack operations, wait for the listener thread to complete them. + try { + this.listenerConsumerThread.get().join(); + } + catch (InterruptedException e) { + this.logger.error(e, () -> "Interrupting the main thread"); + Thread.currentThread().interrupt(); + } } try { this.logger.info("Closing this consumer."); @@ -299,7 +309,8 @@ public boolean isLongLived() { @Override public void run() { - DefaultPulsarMessageListenerContainer.this.listenerConsumerThread = Thread.currentThread(); + DefaultPulsarMessageListenerContainer.this.listenerConsumerThread = new AtomicReference<>( + Thread.currentThread()); publishConsumerStartingEvent(); publishConsumerStartedEvent(); AtomicBoolean inRetryMode = new AtomicBoolean(false); @@ -310,85 +321,89 @@ public void run() { // Always receive messages in batch mode. try { if (!inRetryMode.get() && !messagesPendingInBatch.get()) { - DefaultPulsarMessageListenerContainer.this.receiveInProgress = true; + DefaultPulsarMessageListenerContainer.this.receiveInProgress.set(true); messages = this.consumer.batchReceive(); - DefaultPulsarMessageListenerContainer.this.receiveInProgress = false; } } catch (PulsarClientException e) { if (e.getCause() instanceof InterruptedException) { - DefaultPulsarMessageListenerContainer.this.logger.debug(e, () -> "Error receiving messages."); + DefaultPulsarMessageListenerContainer.this.logger.debug(e, + () -> "Error receiving messages due to a thread interrupt call from upstream."); } else { DefaultPulsarMessageListenerContainer.this.logger.error(e, () -> "Error receiving messages."); } messages = null; } + finally { + DefaultPulsarMessageListenerContainer.this.receiveInProgress.set(false); + } - if (messages != null) { - if (this.isBatchListener) { - if (!inRetryMode.get() && !messagesPendingInBatch.get()) { - messageList = new ArrayList<>(); - messages.forEach(messageList::add); - } - try { - if (messageList != null && messageList.size() > 0) { - if (this.batchMessageListener instanceof PulsarBatchAcknowledgingMessageListener) { - this.batchMessageListener.received(this.consumer, messageList, - this.ackMode.equals(AckMode.MANUAL) - ? new ConsumerBatchAcknowledgment(this.consumer) : null); - } - else { - this.batchMessageListener.received(this.consumer, messageList); - } - if (this.ackMode.equals(AckMode.BATCH)) { - try { - if (isSharedSubscriptionType()) { - this.consumer.acknowledge(messages); - } - else { - final Stream> stream = StreamSupport - .stream(messages.spliterator(), true); - Message last = stream.reduce((a, b) -> b).orElse(null); - this.consumer.acknowledgeCumulative(last); - } + if (messages == null) { + continue; + } + + if (this.isBatchListener) { + if (!inRetryMode.get() && !messagesPendingInBatch.get()) { + messageList = new ArrayList<>(); + messages.forEach(messageList::add); + } + try { + if (messageList != null && messageList.size() > 0) { + if (this.batchMessageListener instanceof PulsarBatchAcknowledgingMessageListener) { + this.batchMessageListener.received(this.consumer, messageList, + this.ackMode.equals(AckMode.MANUAL) + ? new ConsumerBatchAcknowledgment(this.consumer) : null); + } + else { + this.batchMessageListener.received(this.consumer, messageList); + } + if (this.ackMode.equals(AckMode.BATCH)) { + try { + if (isSharedSubscriptionType()) { + this.consumer.acknowledge(messages); } - catch (PulsarClientException pce) { - this.consumer.negativeAcknowledge(messages); + else { + final Stream> stream = StreamSupport.stream(messages.spliterator(), + true); + Message last = stream.reduce((a, b) -> b).orElse(null); + this.consumer.acknowledgeCumulative(last); } } - if (this.pulsarConsumerErrorHandler != null) { - pendingMessagesHandledSuccessfully(inRetryMode, messagesPendingInBatch); + catch (PulsarClientException pce) { + this.consumer.negativeAcknowledge(messages); } } - } - catch (Exception e) { if (this.pulsarConsumerErrorHandler != null) { - messageList = invokeBatchListenerErrorHandler(inRetryMode, messagesPendingInBatch, - messageList, e); - } - else { - // the whole batch is negatively acknowledged in the event - // of - // an exception from the handler method. - this.consumer.negativeAcknowledge(messages); + pendingMessagesHandledSuccessfully(inRetryMode, messagesPendingInBatch); } } } - else { - for (Message message : messages) { - do { - newObservation(message) - .observe(() -> this.dispatchMessageToListener(message, inRetryMode)); - } - while (inRetryMode.get()); + catch (Exception e) { + if (this.pulsarConsumerErrorHandler != null) { + messageList = invokeBatchListenerErrorHandler(inRetryMode, messagesPendingInBatch, + messageList, e); } - // All the records are processed at this point. Handle acks. - if (this.ackMode.equals(AckMode.BATCH)) { - handleAcks(messages); + else { + // the whole batch is negatively acknowledged in the event + // of + // an exception from the handler method. + this.consumer.negativeAcknowledge(messages); } } } + else { + for (Message message : messages) { + do { + newObservation(message).observe(() -> this.dispatchMessageToListener(message, inRetryMode)); + } + while (inRetryMode.get()); + } + // All the records are processed at this point. Handle acks. + if (this.ackMode.equals(AckMode.BATCH)) { + handleAcks(messages); + } + } } } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/ConsumerAcknowledgmentTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/ConsumerAcknowledgmentTests.java index 2dc35b00..f6c15582 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/ConsumerAcknowledgmentTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/ConsumerAcknowledgmentTests.java @@ -361,8 +361,6 @@ void testBatchNackForEntireBatchWhenUsingBatchListener() throws Exception { pulsarClient.close(); } - // ********************* - @Test void messagesAreProperlyAckdOnContainerStopBeforeExitingListenerThread() throws Exception { Map config = new HashMap<>(); @@ -383,8 +381,7 @@ void messagesAreProperlyAckdOnContainerStopBeforeExitingListenerThread() throws pulsarConsumerFactory, pulsarContainerProperties); container1.start(); - Map prodConfig = new HashMap<>(); - prodConfig.put("topicName", "duplicate-message-test"); + Map prodConfig = Collections.singletonMap("topicName", "duplicate-message-test"); final DefaultPulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>( pulsarClient, prodConfig); final PulsarTemplate pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);