Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [AMQ-9554] Browsing a DLQ in transacted mode should allow redelivered messages #1286

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -548,13 +548,18 @@ private void poisonAck(MessageDispatch md, String cause) throws JMSException {

private boolean redeliveryExceeded(MessageDispatch md) {
try {
return session.getTransacted()
&& redeliveryPolicy != null
&& redeliveryPolicy.isPreDispatchCheck()
&& redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()
// redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
&& md.getMessage().getProperty("redeliveryDelay") == null;
if(!session.getTransacted() || redeliveryPolicy == null || !redeliveryPolicy.isPreDispatchCheck()) {
return false;
}

if(info.isBrowser() && redeliveryPolicy.isQueueBrowserIgnored()) {
return false;
}

return redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()
// redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
&& md.getMessage().getProperty("redeliveryDelay") == null;
} catch (Exception ignored) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable,
protected double backOffMultiplier = 5.0;
protected long redeliveryDelay = initialRedeliveryDelay;
protected boolean preDispatchCheck = true;
protected boolean queueBrowserIgnored = true;

public RedeliveryPolicy() {
}
Expand Down Expand Up @@ -165,4 +166,12 @@ public void setPreDispatchCheck(boolean preDispatchCheck) {
public boolean isPreDispatchCheck() {
return preDispatchCheck;
}

public void setQueueBrowserIgnored(boolean queueBrowserIgnored) {
this.queueBrowserIgnored = queueBrowserIgnored;
}

public boolean isQueueBrowserIgnored() {
return queueBrowserIgnored;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
package org.apache.activemq.usecases;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import static org.junit.Assert.fail;

import java.io.IOException;
import java.net.URI;
import java.util.Enumeration;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
Expand All @@ -35,8 +38,14 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DestinationView;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -68,6 +77,10 @@ public void startBroker() throws Exception {

connectUri = connector.getConnectUri();
factory = new ActiveMQConnectionFactory(connectUri);
factory.setWatchTopicAdvisories(false);
factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0l);
factory.getRedeliveryPolicy().setRedeliveryDelay(0l);
factory.getRedeliveryPolicy().setMaximumRedeliveryDelay(0l);
}

public BrokerService createBroker() throws IOException {
Expand Down Expand Up @@ -217,4 +230,195 @@ public void testMemoryLimit() throws Exception {
browser.close();
assertTrue("got at least maxPageSize", received >= maxPageSize);
}

@Test // https://issues.apache.org/jira/browse/AMQ-9554
public void testBrowseRedeliveryMaxRedelivered() throws Exception {
browseRedelivery(0, true);
}

@Test // Ignore https://issues.apache.org/jira/browse/AMQ-9554
public void testBrowseRedeliveryIgnored() throws Exception {
browseRedelivery(1, false);
}

protected void browseRedelivery(int browseExpected, boolean dlqDlqExpected) throws Exception {
IndividualDeadLetterStrategy individualDeadLetterStrategy = new IndividualDeadLetterStrategy();
individualDeadLetterStrategy.setQueuePrefix("");
individualDeadLetterStrategy.setQueueSuffix(".dlq");
individualDeadLetterStrategy.setUseQueueForQueueMessages(true);
broker.getDestinationPolicy().getDefaultEntry().setDeadLetterStrategy(individualDeadLetterStrategy);
broker.getDestinationPolicy().getDefaultEntry().setPersistJMSRedelivered(true);

if(dlqDlqExpected) {
factory.getRedeliveryPolicy().setQueueBrowserIgnore(false);
}

String messageId = null;

String queueName = "browse.redeliverd.tx";
String dlqQueueName = "browse.redeliverd.tx.dlq";
String dlqDlqQueueName = "browse.redeliverd.tx.dlq.dlq";

ActiveMQQueue queue = new ActiveMQQueue(queueName + "?consumer.prefetchSize=0");
ActiveMQQueue queueDLQ = new ActiveMQQueue(dlqQueueName + "?consumer.prefetchSize=0");
ActiveMQQueue queueDLQDLQ = new ActiveMQQueue(dlqDlqQueueName);

broker.getAdminView().addQueue(queueName);
broker.getAdminView().addQueue(dlqQueueName);

DestinationView dlqQueueView = broker.getAdminView().getBroker().getQueueView(dlqQueueName);
DestinationView queueView = broker.getAdminView().getBroker().getQueueView(queueName);

verifyQueueStats(0l, 0l, 0l, dlqQueueView);
verifyQueueStats(0l, 0l, 0l, queueView);

Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(queue);

Message sendMessage = session.createTextMessage("Hello world!");
producer.send(sendMessage);
messageId = sendMessage.getJMSMessageID();
session.commit();
producer.close();

verifyQueueStats(0l, 0l, 0l, dlqQueueView);
verifyQueueStats(1l, 0l, 1l, queueView);

// Redeliver message to DLQ
Message message = null;
MessageConsumer consumer = session.createConsumer(queue);
int rollbackCount = 0;
do {
message = consumer.receive(2000l);
if(message != null) {
session.rollback();
rollbackCount++;
}
} while (message != null);

assertEquals(Integer.valueOf(7), Integer.valueOf(rollbackCount));
verifyQueueStats(1l, 0l, 1l, dlqQueueView);
verifyQueueStats(1l, 1l, 0l, queueView);

session.commit();
consumer.close();

// Increment redelivery counter on the message in the DLQ
// Close the consumer to force broker to dispatch
Message messageDLQ = null;
MessageConsumer consumerDLQ = session.createConsumer(queueDLQ);
int dlqRollbackCount = 0;
int dlqRollbackCountLimit = 5;
do {
messageDLQ = consumerDLQ.receive(2000l);
if(messageDLQ != null) {
session.rollback();
session.close();
consumerDLQ.close();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumerDLQ = session.createConsumer(queueDLQ);
dlqRollbackCount++;
}
} while (messageDLQ != null && dlqRollbackCount < dlqRollbackCountLimit);
session.commit();
consumerDLQ.close();

// Browse in tx mode works when we are at the edge of maxRedeliveries
// aka browse does not increment redeliverCounter as expected
Queue brokerQueueDLQ = resolveQueue(broker, queueDLQ);

for(int i=0; i<16; i++) {
QueueBrowser browser = session.createBrowser(queueDLQ);
Enumeration<?> enumeration = browser.getEnumeration();
ActiveMQMessage activemqMessage = null;
int received = 0;
while (enumeration.hasMoreElements()) {
activemqMessage = (ActiveMQMessage)enumeration.nextElement();
received++;
}
browser.close();
assertEquals(Integer.valueOf(1), Integer.valueOf(received));
assertEquals(Integer.valueOf(6), Integer.valueOf(activemqMessage.getRedeliveryCounter()));

// Confirm broker-side redeliveryCounter
QueueMessageReference queueMessageReference = brokerQueueDLQ.getMessage(messageId);
assertEquals(Integer.valueOf(6), Integer.valueOf(queueMessageReference.getRedeliveryCounter()));
}

session.close();
connection.close();

// Change redelivery max and the browser will fail
factory.getRedeliveryPolicy().setMaximumRedeliveries(3);
final Connection browseConnection = factory.createConnection();
browseConnection.start();

final AtomicInteger browseCounter = new AtomicInteger(0);
final AtomicInteger jmsExceptionCounter = new AtomicInteger(0);

final Session browseSession = browseConnection.createSession(true, Session.SESSION_TRANSACTED);

Thread browseThread = new Thread() {
public void run() {

QueueBrowser browser = null;
try {
browser = browseSession.createBrowser(queueDLQ);
Enumeration<?> enumeration = browser.getEnumeration();
if(Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
}
while (enumeration.hasMoreElements()) {
Message message = (Message)enumeration.nextElement();
Copy link
Contributor Author

@mattrpav mattrpav Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BUG: This blocks indefinitely

Scenario:

  1. Message on the queue at the edge of maximumRedeliveries
  2. Browse in transacted mode
  3. Consumer code poison-acks the one message
  4. Browse thinks there are messages to enumerate over, but there are not
  5. nextElement blocks indefinitely waiting to get messages b/c no conditions are met to break out of the while(true) loop

if(message != null) {
browseCounter.incrementAndGet();
}
}
} catch (JMSException e) {
jmsExceptionCounter.incrementAndGet();
} finally {
if(browser != null) { try { browser.close(); } catch (JMSException e) { jmsExceptionCounter.incrementAndGet(); } }
if(browseSession != null) { try { browseSession.close(); } catch (JMSException e) { jmsExceptionCounter.incrementAndGet(); } }
if(browseConnection != null) { try { browseConnection.close(); } catch (JMSException e) { jmsExceptionCounter.incrementAndGet(); } }
}
}
};
browseThread.start();
Thread.sleep(2000l);
browseThread.interrupt();

assertEquals(Integer.valueOf(browseExpected), Integer.valueOf(browseCounter.get()));
assertEquals(Integer.valueOf(0), Integer.valueOf(jmsExceptionCounter.get()));

// ActiveMQConsumer sends a poison ack, messages gets moved to .dlq.dlq AND remains on the .dlq
DestinationView dlqDlqQueueView = broker.getAdminView().getBroker().getQueueView(dlqDlqQueueName);
verifyQueueStats(1l, 1l, 0l, queueView);
verifyQueueStats(1l, 0l, 1l, dlqQueueView);

if(dlqDlqExpected) {
verifyQueueStats(1l, 0l, 1l, dlqDlqQueueView);
} else {
assertNull(dlqDlqQueueView);
}
}
protected static void verifyQueueStats(long enqueueCount, long dequeueCount, long queueSize, DestinationView queueView) {
assertEquals(Long.valueOf(enqueueCount), Long.valueOf(queueView.getEnqueueCount()));
assertEquals(Long.valueOf(dequeueCount), Long.valueOf(queueView.getDequeueCount()));
assertEquals(Long.valueOf(queueSize), Long.valueOf(queueView.getQueueSize()));
}

protected static Queue resolveQueue(BrokerService brokerService, ActiveMQQueue activemqQueue) throws Exception {
Set<Destination> destinations = brokerService.getBroker().getDestinations(activemqQueue);
if(destinations == null || destinations.isEmpty()) {
return null;
}

if(destinations.size() > 1) {
fail("Expected one-and-only one queue for: " + activemqQueue);
}

return (Queue)destinations.iterator().next();
}
}