From 60d70d74bbe6cb4f7daa29b93c5f8a8fd4ed982c Mon Sep 17 00:00:00 2001 From: roeschter Date: Tue, 3 Sep 2024 12:29:01 +0200 Subject: [PATCH 1/3] createConsumerContext fail fast on push consumer --- src/main/java/io/nats/client/api/Error.java | 7 +++++- .../nats/client/impl/NatsConsumerContext.java | 13 ++++++++++- .../impl/NatsOrderedConsumerContext.java | 2 +- .../nats/client/impl/SimplificationTests.java | 22 +++++++++++++++++++ 4 files changed, 41 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/nats/client/api/Error.java b/src/main/java/io/nats/client/api/Error.java index 3ec3124b9..912dd2ebe 100644 --- a/src/main/java/io/nats/client/api/Error.java +++ b/src/main/java/io/nats/client/api/Error.java @@ -32,7 +32,12 @@ public class Error implements JsonSerializable { static Error optionalInstance(JsonValue vError) { return vError == null ? null : new Error(vError); } - + + public static Error instance(int code, String description) { + return new Error(code, NOT_SET, description); + } + + Error(JsonValue jv) { this.jv = jv; } diff --git a/src/main/java/io/nats/client/impl/NatsConsumerContext.java b/src/main/java/io/nats/client/impl/NatsConsumerContext.java index 36557522e..f46729e4e 100644 --- a/src/main/java/io/nats/client/impl/NatsConsumerContext.java +++ b/src/main/java/io/nats/client/impl/NatsConsumerContext.java @@ -47,7 +47,11 @@ public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscript private final AtomicReference defaultDispatcher; private final AtomicReference lastConsumer; - NatsConsumerContext(NatsStreamContext sc, ConsumerInfo unorderedConsumerInfo, OrderedConsumerConfiguration orderedCc) { + /* + * Only called from the internal implementation. + *

+ */ + NatsConsumerContext(NatsStreamContext sc, ConsumerInfo unorderedConsumerInfo, OrderedConsumerConfiguration orderedCc) throws JetStreamApiException { stateLock = new ReentrantLock(); streamCtx = sc; cachedConsumerInfo = new AtomicReference<>(); @@ -56,6 +60,13 @@ public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscript defaultDispatcher = new AtomicReference<>(); lastConsumer = new AtomicReference<>(); if (unorderedConsumerInfo != null) { + if (unorderedConsumerInfo != null) { + //Fail fast on push consumers as all operation on ConsumerContext expect a pull consumer. + //If we don't fail fast, calls to consume() or iterat() will hang with the server returning: code=409, message='Consumer is push based' + ConsumerConfiguration consumerConfig = unorderedConsumerInfo.getConsumerConfiguration(); + if ( consumerConfig.getDeliverSubject() != null ) + throw new JetStreamApiException(io.nats.client.api.Error.instance(409, "Consumer is push based. ConsumerContext only supports pull consumers.")); + } ordered = false; originalOrderedCc = null; subscribeSubject = null; diff --git a/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java b/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java index ed3b64750..bba95cd61 100644 --- a/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java +++ b/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java @@ -25,7 +25,7 @@ public class NatsOrderedConsumerContext implements OrderedConsumerContext { private final NatsConsumerContext impl; - NatsOrderedConsumerContext(NatsStreamContext streamContext, OrderedConsumerConfiguration config) { + NatsOrderedConsumerContext(NatsStreamContext streamContext, OrderedConsumerConfiguration config) throws JetStreamApiException { impl = new NatsConsumerContext(streamContext, null, config); } diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java index de77a3b12..e927c323c 100644 --- a/src/test/java/io/nats/client/impl/SimplificationTests.java +++ b/src/test/java/io/nats/client/impl/SimplificationTests.java @@ -975,6 +975,28 @@ public void testOrderedConsumerBuilder() throws IOException, ClassNotFoundExcept check_values(roundTripSerialize(occ), zdt); } + + @Test + public void testConsumerContextRejectsPush() throws Exception { + jsServer.run(TestBase::atLeast2_9_1, nc -> { + JetStreamManagement jsm = nc.jetStreamManagement(); + JetStream js = nc.jetStream(); + + TestingStreamContainer tsc = new TestingStreamContainer(jsm); + jsPublish(js, tsc.subject(), 4); + + String name = name(); + + // Pre define a consumer - set delivery subject to make it a push consumer + ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(name).deliverSubject("delivery_"+name).build(); + jsm.addOrUpdateConsumer(tsc.stream, cc); + + // Consumer[Context] + assertThrows(JetStreamApiException.class, () -> js.getConsumerContext(tsc.stream, name)); // Push consumer not allowed + + }); + } + private static void check_default_values(OrderedConsumerConfiguration occ) { assertEquals(">", occ.getFilterSubject()); assertNull(occ.getDeliverPolicy()); From 2f86fd90cb9533ed1bdba4f7622ebeaa17bbfa10 Mon Sep 17 00:00:00 2001 From: roeschter Date: Tue, 3 Sep 2024 14:52:52 +0200 Subject: [PATCH 2/3] Update JetStream.java --- src/main/java/io/nats/client/JetStream.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/nats/client/JetStream.java b/src/main/java/io/nats/client/JetStream.java index 5b9677821..71cfdc340 100644 --- a/src/main/java/io/nats/client/JetStream.java +++ b/src/main/java/io/nats/client/JetStream.java @@ -35,11 +35,11 @@ * which can be checked for acknowledgement at a later point. * *

Use {@link #getStreamContext(String ) getStreamContext(String)} to access a simplified API for consuming/subscribing messages from Jetstream. - * It is recommened to manage consumers explicitely through {@link StreamContext StreamContext} or {@link JetStreamManagement JetStreamManagement} + * It is recommened to manage consumers explicitly through {@link StreamContext StreamContext} or {@link JetStreamManagement JetStreamManagement} * *

{@link #subscribe(String)} is a convenience method for implicitly creating a consumer on a stream and receiving messages. This method should be used for ephemeral (not durable) conusmers. * It can create a named durable consumers though Options, but we prefer to avoid creating durable consumers implictly. - * It is recommened to manage consumers explicitely through {@link StreamContext StreamContext} or {@link JetStreamManagement JetStreamManagement} + * It is recommened to manage consumers explicitly through {@link StreamContext StreamContext} or {@link JetStreamManagement JetStreamManagement} * * {@link ConsumerContext ConsumerContext} based subscription. * From 0927b0f5912461cca33918cf7d663183d40b5e3a Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Mon, 9 Sep 2024 09:36:11 -0400 Subject: [PATCH 3/3] Update JetStream.java --- src/main/java/io/nats/client/JetStream.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/nats/client/JetStream.java b/src/main/java/io/nats/client/JetStream.java index b478d361c..eef1b757d 100644 --- a/src/main/java/io/nats/client/JetStream.java +++ b/src/main/java/io/nats/client/JetStream.java @@ -33,15 +33,13 @@ * *

{@link #publishAsync(String, byte[]) PublishAsync} will not wait for acknowledgement but return a {@link CompletableFuture CompletableFuture}, * which can be checked for acknowledgement at a later point. - * - *

Use {@link #getStreamContext(String ) getStreamContext(String)} to access a simplified API for consuming/subscribing messages from Jetstream. - * It is recommened to manage consumers explicitly through {@link StreamContext StreamContext} or {@link JetStreamManagement JetStreamManagement} - * - *

{@link #subscribe(String)} is a convenience method for implicitly creating a consumer on a stream and receiving messages. This method should be used for ephemeral (not durable) conusmers. - * It can create a named durable consumers though Options, but we prefer to avoid creating durable consumers implictly. + * + *

Use {@link #getStreamContext(String ) getStreamContext(String)} to access a simplified API for consuming/subscribing messages from Jetstream. * It is recommened to manage consumers explicitly through {@link StreamContext StreamContext} or {@link JetStreamManagement JetStreamManagement} - * - * {@link ConsumerContext ConsumerContext} based subscription. + * + *

{@link #subscribe(String)} is a convenience method for implicitly creating a consumer on a stream and receiving messages. This method should be used for ephemeral (not durable) conusmers. + * It can create a named durable consumers though Options, but we prefer to avoid creating durable consumers implictly. + * It is recommened to manage consumers explicitly through {@link StreamContext StreamContext} and {@link ConsumerContext ConsumerContext} or {@link JetStreamManagement JetStreamManagement} * *

Recommended usage for creating streams, consumers, publish and listen on a stream

*