Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vc/move events #21

Merged
merged 4 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
# golangci.com configuration
# https://github.com/golangci/golangci/wiki/Configuration
service:
golangci-lint-version: 1.52.0 # use the fixed version to not introduce new linters unexpectedly
golangci-lint-version: 1.55.2 # use the fixed version to not introduce new linters unexpectedly

linters-settings:
govet:
enable:
- fieldalignment
auto-fix: true
check-shadowing: true
settings:
Expand Down
26 changes: 12 additions & 14 deletions kv/kv.go → condition/status.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package kv
package condition

import (
"encoding/json"
"fmt"
"time"

"github.com/metal-toolbox/flasher/types"
"github.com/metal-toolbox/rivets/condition"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"go.hollow.sh/toolbox/events/registry"
Expand Down Expand Up @@ -52,15 +51,14 @@ const (

// StatusValue is the canonical structure for reporting status of an ongoing task
type StatusValue struct {
UpdatedAt time.Time `json:"updated"`
WorkerID string `json:"worker"`
Target string `json:"target"`
TraceID string `json:"traceID"`
SpanID string `json:"spanID"`
State string `json:"state"`
Status json.RawMessage `json:"status"`
ResourceVersion int64 `json:"resourceVersion"` // for updates to server-service
MsgVersion int32 `json:"msgVersion"`
UpdatedAt time.Time `json:"updated"`
WorkerID string `json:"worker"`
Target string `json:"target"`
TraceID string `json:"traceID"`
SpanID string `json:"spanID"`
State string `json:"state"`
Status json.RawMessage `json:"status"`
MsgVersion int32 `json:"msgVersion"`
// WorkSpec json.RawMessage `json:"spec"` XXX: for re-publish use-cases
}

Expand Down Expand Up @@ -97,10 +95,10 @@ const (
Indeterminate // we got an error in the process of making the check
)

// ConditionStatus returns the status of the task from the KV store
// CheckConditionInProgress returns the status of the task from the KV store
//
//nolint:gocyclo // status checks are cyclomatic
func ConditionStatus(conditionID, facilityCode, kvBucket string, js nats.JetStreamContext) (TaskState, error) {
func CheckConditionInProgress(conditionID, facilityCode, kvBucket string, js nats.JetStreamContext) (TaskState, error) {
handle, err := js.KeyValue(kvBucket)
if err != nil {
errKV := errors.Wrap(err, "bind to status KV bucket for condition lookup failed")
Expand Down Expand Up @@ -128,7 +126,7 @@ func ConditionStatus(conditionID, facilityCode, kvBucket string, js nats.JetStre
return Indeterminate, newErrQueryStatus(errJSON, kvBucket, lookupKey, "")
}

if condition.StateIsComplete(condition.State(sv.State)) {
if StateIsComplete(State(sv.State)) {
return Complete, nil
}

Expand Down
17 changes: 8 additions & 9 deletions kv/kv_test.go → condition/status_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package kv
package condition

// nolint // test file
//nolint:all // test file

import (
"fmt"
Expand All @@ -10,7 +10,6 @@ import (

"github.com/google/uuid"
"github.com/metal-toolbox/flasher/types"
"github.com/metal-toolbox/rivets/condition"
"github.com/nats-io/nats-server/v2/server"
srvtest "github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -57,7 +56,7 @@ func shutdownJetStream(t *testing.T, s *server.Server) {
s.WaitForShutdown()
}

func TestConditionStatus(t *testing.T) {
func TestCheckConditionInProgress(t *testing.T) {
srv := startJetStreamServer(t)
defer shutdownJetStream(t, srv)
nc, js := jetStreamContext(t, srv)
Expand Down Expand Up @@ -108,7 +107,7 @@ func TestConditionStatus(t *testing.T) {
Complete,
"",
func() []byte {
sv := &types.StatusValue{State: string(condition.Failed)}
sv := &types.StatusValue{State: string(Failed)}
return sv.MustBytes()
},
false,
Expand All @@ -119,7 +118,7 @@ func TestConditionStatus(t *testing.T) {
"bad worker ID",
func() []byte {
sv := &StatusValue{
State: string(condition.Pending),
State: string(Pending),
WorkerID: "some junk id",
}

Expand All @@ -140,7 +139,7 @@ func TestConditionStatus(t *testing.T) {
require.NoError(t, err, "register test controller")

sv := &StatusValue{
State: string(condition.Pending),
State: string(Pending),
WorkerID: workerRegistryID.String(),
}
return sv.MustBytes()
Expand All @@ -157,7 +156,7 @@ func TestConditionStatus(t *testing.T) {
require.NoError(t, err, "deregister controller")

sv := &StatusValue{
State: string(condition.Pending),
State: string(Pending),
WorkerID: workerRegistryID.String(),
}
return sv.MustBytes()
Expand All @@ -175,7 +174,7 @@ func TestConditionStatus(t *testing.T) {
}
}

gotState, err := ConditionStatus(conditionID.String(), facilityCode, testKvBucket, js)
gotState, err := CheckConditionInProgress(conditionID.String(), facilityCode, testKvBucket, js)
if tt.expectErrorContains != "" {
assert.ErrorContains(t, err, tt.expectErrorContains)
return
Expand Down
92 changes: 92 additions & 0 deletions events/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Events

Package events provides an interface and methods to interact with an events stream broker.

The package provides methods to serialize, deserialize data sent on the stream
as a [pubsubx.Message](https://github.com/infratographer/x/tree/main/pubsubx) along with methods
to parse the message URN through [urnx](https://github.com/infratographer/x/tree/main/urnx).


### Connect to a NATS Jetstream to publish messages.

Example below sets up a NATS stream broker with the parameters provided,
The stream, consumer and subscription(s) are initialized when defined, based on the configuration.

```go
options := events.NatsOptions{
AppName: "foo",
URL: "nats://nats:4222",
StreamUser: viper.GetString("nats.stream.user"),
StreamPass: viper.GetString("nats.stream.pass"),
CredsFile: viper.GetString("nats.creds.file"),
...

// Defining a stream will result in the stream being added if not present.
Stream: &events.NatsStreamOptions{
// Name of the stream to be created.
Name: viper.GetString("nats.stream.name"),

// Subjects associated with the stream.
Subjects: viper.GetStringSlice("nats.stream.subjects"),
},

// Defining a consumer will result in the consumer being added if not present.
Consumer: &events.NatsConsumerOptions{
// Pull indicates this is a pull based stream, subcriptions to it will be pull based.
Pull: viper.GetBool("nats.stream.consumer.pull")

// Sets the durable consumer name, by setting a durable consumer name
// the consumer is not epheremal and removed once there are no subscribers.
Name: viper.GetString("nats.stream.consumer.name")

....
}
}

// initialize broker - validates the configuration and returns a Stream
stream, err := events.NewStream(natsOptions(appName, streamURL))
if err != nil {
panic(err)
}

// Open connection - sets up required streams, consumers.
if err := stream.Open(); err != nil {
panic(err)
}


// publish asynchronously to subscribed consumer.
if err := stream.PublishAsyncWithContext(ctx, resourceTypeServer, eventTypeCreate, uuid.New(), &Server{}); err != nil {
panic(err)
}


// subscribe to one or more consumers, this returns a single channel.
eventsCh, err := o.streamBroker.Subscribe(ctx)
if err != nil {
o.logger.Fatal(err)
}

for _, msg := range {
// unpacks the data as a *pubsubx.Message
data, err := msg.Data()
if err != nil {
panic(err)
}

// parse and retrieve the Subject URN
urn, err := msg.SubjectURN(data)
if err != nil {
panic(err)
}

// ack the message
if err := msg.Ack(); err != nil {
panic(err)
}
}
```

## Implementations

TODO(joel) : Link to implementations of this library.
2 changes: 2 additions & 0 deletions events/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package events provides methods to initializ§e and interface with an events broker.
package events
88 changes: 88 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Package events provides types and methods to interact with a messaging stream broker.
package events

import (
"context"
)

type (
// ResourceType is the kind of the object included the message.
ResourceType string

// EventType is a type identifying the Event kind that has occurred on an object.
EventType string

// StreamParameters is the configuration for the Stream broker, the interface
// is type asserted by the stream broker implementation.
StreamParameters interface{}
)

const (
// Create action kind identifies objects that were created.
Create EventType = "create"

// Update action kind identifies objects that were updated.
Update EventType = "update"

// Delete action kind identifies objects that were removed.
Delete EventType = "delete"
)

// when updating any of the interfaces here, make sure to regenerate the mocks
//
// mockgen -package=mock_events -source=events/events.go > events/mock/events.go
//

// Stream provides methods to interact with the event stream.
type Stream interface {
// Open sets up the stream connection.
Open() error

// Publish publishes the message to the message broker.
Publish(ctx context.Context, subject string, msg []byte) error

// Subscribe subscribes to one or more subjects on the stream returning a message channel for subscribers to read from.
Subscribe(ctx context.Context) (MsgCh, error)

// PullMsg pulls upto batch count of messages from the stream through the pull based subscription.
PullMsg(ctx context.Context, batch int) ([]Message, error)

// Closes the connection to the stream, along with unsubscribing any subscriptions.
Close() error
}

// MsgCh is a channel over which messages arrive when subscribed.
type MsgCh chan Message

// Message interface defines the methods available on the messages received on the stream.
//
// These methods are to be implemented by the stream broker for its messages.
type Message interface {
// Ack the message as processed on the stream.
Ack() error

// Nak the message as not processed on the stream.
Nak() error

// Term signals to the broker that the message processing has failed and the message
// must not be redelivered.
Term() error

// InProgress resets the redelivery timer for the message on the stream
// to indicate the message is being worked on.
InProgress() error

// Subject returns the message subject.
Subject() string

// Data returns the data contained in the message.
Data() []byte

// ExtractOtelTraceContext returns a context populated with the parent trace if any.
ExtractOtelTraceContext(ctx context.Context) context.Context
}

// NewStream returns a Stream implementation.
func NewStream(parameters StreamParameters) (Stream, error) {
return NewNatsBroker(parameters)
}
63 changes: 63 additions & 0 deletions events/internal/test/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// internal test helpers for NATS kv stuff. Yes this means the tests are
// strictly "integration tests." Many electrons have been spilt in the NATS
// space as to whether they can/should/will provide something mockable for
// unit testing with or without fault injection. The current answer is "look
// at our tests for examples." While this is strictly speaking unsatisfying
// this is what we have.

//nolint:all
package test

import (
"os"
"testing"
"time"

"github.com/nats-io/nats-server/v2/server"
srvtest "github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats.go"
)

// XXX: this will panic on an error
func StartJetStreamServer(t *testing.T) *server.Server {
t.Helper()
opts := srvtest.DefaultTestOptions
opts.Port = -1
opts.JetStream = true
return srvtest.RunServer(&opts)
}

func StartCoreServer(t *testing.T) *server.Server {
t.Helper()
opts := srvtest.DefaultTestOptions
opts.Port = -1
return srvtest.RunServer(&opts)
}

func JetStreamContext(t *testing.T, s *server.Server) (*nats.Conn, nats.JetStreamContext) {
t.Helper()
nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("connect => %v", err)
}
js, err := nc.JetStream(nats.MaxWait(10 * time.Second))
if err != nil {
t.Fatalf("JetStream => %v", err)
}
return nc, js
}

func ShutdownJetStream(t *testing.T, s *server.Server) {
t.Helper()
var sd string
if config := s.JetStreamConfig(); config != nil {
sd = config.StoreDir
}
s.Shutdown()
if sd != "" {
if err := os.RemoveAll(sd); err != nil {
t.Fatalf("Unable to remove storage %q: %v", sd, err)
}
}
s.WaitForShutdown()
}
Loading
Loading