Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not able to discard new messages when max_msgs_per_subject limit is reached #599

Open
rkunnamp opened this issue Aug 12, 2024 · 0 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@rkunnamp
Copy link

rkunnamp commented Aug 12, 2024

Observed behavior

The "DiscardNewPerSubject" mentioned here https://docs.nats.io/nats-concepts/jetstream/streams is not supported by python client.

As a result, not able to discard new messages when max_msgs_per_subject limit is reached

import os
import asyncio
import nats
import time


from nats.js.api import  StreamConfig, StreamSource, RetentionPolicy, StorageType, DiscardPolicy



servers = os.environ.get("NATS_URL", "nats://localhost:4222").split(",")



async def run():
    # Connect to NATS server
    nc = await nats.connect(servers=servers)
    
    # Get a JetStream context
    js = nc.jetstream()


    # Stream configuration
    stream_name = "TEST_STREAM"
    subject = "test.subject"

    # Attempt to add stream, or get the stream if it already exists
    try:
        await js.add_stream(name=stream_name, config=StreamConfig(
            retention=RetentionPolicy.LIMITS,
            subjects=[subject], 
            max_msgs_per_subject=5,
            discard=DiscardPolicy.NEW,
        ))
        print(f"Stream {stream_name} created with max_msgs_per_subject=5")
    except Exception as e:
        print(f"Stream {stream_name} already exists. Proceeding with the test.")

    # Publish 10 messages, which should exceed the max_msgs_per_subject limit
    for i in range(10):
        ack = await js.publish(subject, f"Message {i+1}".encode())
        print(f"Published message {i+1}, seq={ack.seq}")

    # Get stream info to check how many messages are retained
    stream_info = await js.stream_info(stream_name)
    print(f"Stream Info: {stream_info.config}")

    # List the messages retained in the stream
    msg = await js.get_last_msg("TEST_STREAM", "test.subject")
    print(f"Last message has the sequence {msg.seq}")



    # Close the NATS connection
    await nc.close()

if __name__ == "__main__":
    asyncio.run(run())


Stream TEST_STREAM created with max_msgs_per_subject=5
Published message 1, seq=1
Published message 2, seq=2
Published message 3, seq=3
Published message 4, seq=4
Published message 5, seq=5
Published message 6, seq=6
Published message 7, seq=7
Published message 8, seq=8
Published message 9, seq=9
Published message 10, seq=10
Stream Info: StreamConfig(name='TEST_STREAM', description=None, subjects=['test.subject'], retention='limits', max_consumers=-1, max_msgs=-1, max_bytes=-1, discard='new', max_age=0.0, max_msgs_per_subject=5, max_msg_size=-1, storage='file', num_replicas=1, no_ack=False, template_owner=None, duplicate_window=120.0, placement=None, mirror=None, sources=None, sealed=False, deny_delete=False, deny_purge=False, allow_rollup_hdrs=False, republish=None, subject_transform=None, allow_direct=False, mirror_direct=False, compression='none', metadata=None)
Last message has the sequence 10

Expected behavior

Ideally there should be a way to discard new messages when max_msgs_per_subject

Server and client version

nats --version
0.1.5

Host environment

No response

Steps to reproduce

No response

@rkunnamp rkunnamp added the defect Suspected defect such as a bug or regression label Aug 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

1 participant