Skip to content

Commit

Permalink
bury overwrite behavior boolean
Browse files Browse the repository at this point in the history
  • Loading branch information
DoctorVin committed Jul 16, 2024
1 parent a2b2cde commit 73e114e
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 18 deletions.
5 changes: 4 additions & 1 deletion events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ type Stream interface {
// Publish publishes the message to the message broker.
//
// rollupSubject when set to true will cause any previous messages with the same subject to be overwritten by this new msg.
Publish(ctx context.Context, subject string, msg []byte, rollupSubject bool) error
Publish(ctx context.Context, subject string, msg []byte) error

// PublishOverwrite publishes the message to the message broker overwriting any existing message with that subject
PublishOverwrite(ctx context.Context, subject string, msg []byte) error

// Subscribe subscribes to one or more subjects on the stream returning a message channel for subscribers to read from.
Subscribe(ctx context.Context) (MsgCh, error)
Expand Down
69 changes: 58 additions & 11 deletions events/mock_stream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 10 additions & 3 deletions events/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,18 @@ func (n *NatsJetstream) consumerConfigIsEqual(consumerInfo *nats.ConsumerInfo) b

// Publish publishes an event onto the NATS Jetstream.
// The caller is responsible for message addressing and data serialization.
//
func (n *NatsJetstream) Publish(ctx context.Context, subjectSuffix string, data []byte) error {
return n._publish(ctx, subjectSuffix, data, false)
}

// PublishOverwrite publishes an event and will overwrite any existing message with that subject in the queue
func (n *NatsJetstream) PublishOverwrite(ctx context.Context, subjectSuffix string, data []byte) error {
return n._publish(ctx, subjectSuffix, data, true)
}

// rollupSubject when set to true will cause any previous messages with the same subject to be overwritten by this new msg.
//
// NOTE: The subject passed here will be prepended with the configured PublisherSubjectPrefix.
func (n *NatsJetstream) Publish(ctx context.Context, subjectSuffix string, data []byte, rollupSubject bool) error {
func (n *NatsJetstream) _publish(ctx context.Context, subjectSuffix string, data []byte, rollupSubject bool) error {
if n.jsctx == nil {
return errors.Wrap(ErrNatsJetstreamAddConsumer, "Jetstream context is not setup")
}
Expand Down
6 changes: 3 additions & 3 deletions events/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestPublishAndSubscribe(t *testing.T) {
require.NoError(t, err)

payload := []byte("test data")
require.NoError(t, njs.Publish(context.TODO(), "test", payload, false))
require.NoError(t, njs.Publish(context.TODO(), "test", payload))

msgs, err := njs.PullMsg(context.TODO(), 1)
require.NoError(t, err)
Expand Down Expand Up @@ -129,9 +129,9 @@ func TestPublishAndSubscribe_WithRollup(t *testing.T) {
require.NoError(t, err)

payload := []byte("test data")
require.NoError(t, njs.Publish(context.TODO(), "test", payload, true))
require.NoError(t, njs.PublishOverwrite(context.TODO(), "test", payload))
payload2 := []byte("rollup")
require.NoError(t, njs.Publish(context.TODO(), "test", payload2, true))
require.NoError(t, njs.PublishOverwrite(context.TODO(), "test", payload2))

msgs, err := njs.PullMsg(context.TODO(), 1)
require.NoError(t, err)
Expand Down

0 comments on commit 73e114e

Please sign in to comment.