Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer restart causes duplicate messages #162

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -74,14 +75,18 @@
*/
public class DefaultPulsarMessageListenerContainer<T> extends AbstractPulsarMessageListenerContainer<T> {

private volatile Future<?> listenerConsumerFuture;
private volatile CompletableFuture<?> listenerConsumerFuture;

private volatile Listener listenerConsumer;

private volatile CountDownLatch startLatch = new CountDownLatch(1);

private final AbstractPulsarMessageListenerContainer<?> thisOrParentContainer;

private AtomicReference<Thread> listenerConsumerThread;
onobc marked this conversation as resolved.
Show resolved Hide resolved

private final AtomicBoolean receiveInProgress = new AtomicBoolean();

public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties) {
this(pulsarConsumerFactory, pulsarContainerProperties, null);
Expand Down Expand Up @@ -113,7 +118,7 @@ protected void doStart() {
this.getObservationRegistry());
setRunning(true);
this.startLatch = new CountDownLatch(1);
this.listenerConsumerFuture = consumerExecutor.submit(this.listenerConsumer);
this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer);

try {
if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(),
Expand All @@ -133,6 +138,24 @@ public void doStop() {
setRunning(false);
this.logger.info("Pausing this consumer.");
this.listenerConsumer.consumer.pause();
if (this.listenerConsumerThread != null) {
// if there is a receive operation already in progress, we want to interrupt
// the listener thread.
if (this.receiveInProgress.get()) {
// All the records received so far in the current batch receive will be
// re-delivered.
this.listenerConsumerThread.get().interrupt();
}
// if there is something other than receive operations are in progress,
// such as ack operations, wait for the listener thread to complete them.
try {
this.listenerConsumerThread.get().join();
}
catch (InterruptedException e) {
this.logger.error(e, () -> "Interrupting the main thread");
Thread.currentThread().interrupt();
}
}
try {
this.logger.info("Closing this consumer.");
this.listenerConsumer.consumer.close();
Expand Down Expand Up @@ -286,6 +309,8 @@ public boolean isLongLived() {

@Override
public void run() {
DefaultPulsarMessageListenerContainer.this.listenerConsumerThread = new AtomicReference<>(
Thread.currentThread());
publishConsumerStartingEvent();
publishConsumerStartedEvent();
AtomicBoolean inRetryMode = new AtomicBoolean(false);
Expand All @@ -296,13 +321,28 @@ public void run() {
// Always receive messages in batch mode.
try {
if (!inRetryMode.get() && !messagesPendingInBatch.get()) {
DefaultPulsarMessageListenerContainer.this.receiveInProgress.set(true);
messages = this.consumer.batchReceive();
}
}
catch (PulsarClientException e) {
DefaultPulsarMessageListenerContainer.this.logger.error(e, () -> "Error receiving messages.");
if (e.getCause() instanceof InterruptedException) {
DefaultPulsarMessageListenerContainer.this.logger.debug(e,
() -> "Error receiving messages due to a thread interrupt call from upstream.");
}
else {
DefaultPulsarMessageListenerContainer.this.logger.error(e, () -> "Error receiving messages.");
}
messages = null;
}
finally {
DefaultPulsarMessageListenerContainer.this.receiveInProgress.set(false);
}
Assert.isTrue(messages != null, "Messages cannot be null.");

if (messages == null) {
continue;
}

if (this.isBatchListener) {
if (!inRetryMode.get() && !messagesPendingInBatch.get()) {
messageList = new ArrayList<>();
Expand Down Expand Up @@ -345,7 +385,8 @@ public void run() {
messageList, e);
}
else {
// the whole batch is negatively acknowledged in the event of
// the whole batch is negatively acknowledged in the event
// of
// an exception from the handler method.
this.consumer.negativeAcknowledge(messages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,4 +361,61 @@ void testBatchNackForEntireBatchWhenUsingBatchListener() throws Exception {
pulsarClient.close();
}

@Test
void messagesAreProperlyAckdOnContainerStopBeforeExitingListenerThread() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("topicNames", Set.of("duplicate-message-test"));
config.put("subscriptionName", "duplicate-sub-1");
final PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build();
final DefaultPulsarConsumerFactory<String> pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(
pulsarClient, config);

PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
final AtomicInteger counter1 = new AtomicInteger(0);
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
counter1.getAndIncrement();
});
pulsarContainerProperties.setSchema(Schema.STRING);
DefaultPulsarMessageListenerContainer<String> container1 = new DefaultPulsarMessageListenerContainer<>(
pulsarConsumerFactory, pulsarContainerProperties);
container1.start();

Map<String, Object> prodConfig = Collections.singletonMap("topicName", "duplicate-message-test");
final DefaultPulsarProducerFactory<String> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(
pulsarClient, prodConfig);
final PulsarTemplate<String> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
pulsarTemplate.send("hello john doe");

while (counter1.get() == 0) {
// busy wait until counter1 is > 0
}
// When we stop, if any acks are in progress, that should all be
// taken care of before exiting the listener thread, so that the
// next consumer under the same subscription will not receive the
// unacked message.
container1.stop();

final AtomicInteger counter2 = new AtomicInteger(0);
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
counter2.getAndIncrement();
});
pulsarContainerProperties.setSchema(Schema.STRING);
DefaultPulsarMessageListenerContainer<String> container2 = new DefaultPulsarMessageListenerContainer<>(
pulsarConsumerFactory, pulsarContainerProperties);
container2.start();

pulsarTemplate.send("hello john doe");

while (counter2.get() == 0) {
// busy wait until counter2 > 0
}
// Asserting that both consumers are only receiving the expected data.
assertThat(counter1.get()).isEqualTo(1);
sobychacko marked this conversation as resolved.
Show resolved Hide resolved
assertThat(counter2.get()).isEqualTo(1);

container2.stop();
pulsarClient.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ void deadLetterPolicy() throws Exception {
// Normal consumer should receive 5 msg + 1 re-delivery
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
container.stop();
dlqContainer.stop();
sobychacko marked this conversation as resolved.
Show resolved Hide resolved
pulsarClient.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ public SampleTestRunnerConsumer yourCode() {
assertThat(listen2Completed).withFailMessage(
"Message %s not received in listen2 (latchesByMessageListen1 = %s and latchesByMessageListen2 = %s)",
msg, listeners.latchesByMessageListen1, listeners.latchesByMessageListen2).isTrue();

// Without this sleep, the 2nd tracingSetup run sometimes fails due to
// messages from 1st run being
// delivered during the 2nd run. The test runs share the same listener
// config, including the
// same subscription names. Seems like the listener in run2 is getting
// message from run1.
Thread.sleep(5000);
sobychacko marked this conversation as resolved.
Show resolved Hide resolved
}

List<FinishedSpan> finishedSpans = bb.getFinishedSpans();
Expand Down