Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish committed Aug 20, 2023
1 parent 2737c1d commit f216dfc
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 29 deletions.
2 changes: 1 addition & 1 deletion message/conditional.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (c *conditional) Structured() bool {
}

func (c *conditional) Loggable() bool {
return c.cond && safeDo(c.resolve(), func(c Composer) bool { return c.Loggable() })
return safeDo(c.resolve(), func(c Composer) bool { return c.Loggable() })
}

func (c *conditional) Priority() level.Priority {
Expand Down
2 changes: 1 addition & 1 deletion send/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *writerSenderImpl) doSend() error {
}
}

// Close writes any buffered messages to the underlying Sender. This does
// Close writbes any buffered messages to the underlying Sender. This does
// not close the underlying sender.
func (s *writerSenderImpl) Close() error {
s.mu.Lock()
Expand Down
39 changes: 19 additions & 20 deletions series/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/tychoish/fun/ers"
"github.com/tychoish/fun/ft"
"github.com/tychoish/fun/pubsub"
"github.com/tychoish/grip"
)

// MetricLabelRenderer provides an implementation for an ordered set
Expand Down Expand Up @@ -78,40 +79,35 @@ type MetricSnapshot struct {
// is controlled by the <>Renderer function in the Collector
// configuration.
func NewCollector(ctx context.Context, opts ...CollectorOptionProvider) (*Collector, error) {
conf := &CollectorConf{}
if err := fun.JoinOptionProviders(opts...).Apply(conf); err != nil {
c := &Collector{}

if err := fun.JoinOptionProviders(opts...).Apply(&c.CollectorConf); err != nil {
return nil, err
}

c := &Collector{broker: pubsub.NewDequeBroker(
ctx,
pubsub.NewUnlimitedDeque[MetricPublisher](),
conf.BrokerOptions,
)}
conf := c.CollectorConf
grip.Info(conf)

c.ctx, c.cancel = context.WithCancel(ctx)
c.local.Default.SetConstructor(func() *dt.List[*tracked] { return &dt.List[*tracked]{} })
c.pool.SetConstructor(func() *bytes.Buffer { return &bytes.Buffer{} })
c.pool.SetCleanupHook(func(buf *bytes.Buffer) *bytes.Buffer { buf.Reset(); return buf })
c.publish = pubsub.NewUnlimitedDeque[MetricPublisher]()
ec := c.errs.Handler().Join(func(err error) { ft.WhenCall(err != nil, c.cancel) }).Lock()

if len(conf.Backends) == 1 {
conf.Backends[0].Worker(c.publish.Distributor().Iterator()).
Lock().
Operation(ec).
Add(ctx, &c.wg)
c.wg.Launch(c.ctx, conf.Backends[0].Worker(c.publish.Distributor().Iterator()).Operation(ec))

return c, nil
}

c.broker = pubsub.NewDequeBroker[MetricPublisher](ctx, c.publish, c.BrokerOptions)
c.broker = pubsub.NewDequeBroker[MetricPublisher](c.ctx, c.publish, c.BrokerOptions)
for idx := range conf.Backends {
grip.Infoln("start collector backend", idx)
ch := c.broker.Subscribe(ctx)
conf.Backends[idx].Worker(fun.ChannelIterator(ch)).
Lock().

c.wg.Launch(c.ctx, conf.Backends[idx].Worker(fun.ChannelIterator(ch)).
Operation(ec).
PostHook(func() { c.broker.Unsubscribe(ctx, ch) }).
Add(ctx, &c.wg)
PostHook(func() { c.broker.Unsubscribe(ctx, ch) }))
}

if c.errs.HasErrors() {
Expand All @@ -127,9 +123,12 @@ func (c *Collector) Close() error {
c.broker.Stop()
}

c.errs.Add(c.publish.Close())
c.wg.Operation().Wait()

// c.errs.Add(c.publish.Close())
// c.wg.Operation().Wait()
grip.Infoln("shutting down", c.wg.Num())
time.Sleep(100 * time.Millisecond)
grip.Infoln("shut down wait over", c.wg.Num(), c.errs.Len())
grip.Warning(c.errs.Resolve())
return c.errs.Resolve()
}

Expand Down
2 changes: 1 addition & 1 deletion series/collector_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (conf *CollectorConf) Validate() error {
erc.When(ec, conf.MetricRenderer == nil, "must define a metric renderer")
erc.When(ec, conf.LabelRenderer == nil, "must define a label renderer")
erc.When(ec, conf.Buffer == 0, "must define buffer size (positive) or negative (unlimited)")
return nil
return ec.Resolve()
}

type CollectorOptionProvider = fun.OptionProvider[*CollectorConf]
Expand Down
33 changes: 28 additions & 5 deletions series/persistance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/tychoish/fun/ft"
"github.com/tychoish/fun/intish"
"github.com/tychoish/fun/pubsub"
"github.com/tychoish/grip"
"github.com/tychoish/grip/send"
)

Expand Down Expand Up @@ -106,6 +107,7 @@ func FileBackend(opts ...CollectorBakendFileOptionProvider) (CollectorBackend, e
op := iter.Value()

if err := op(wr); err != nil {

return errors.Join(err, ft.SafeDo(buf.Flush), ft.SafeDo(gzp.Close), ft.SafeDo(file.Close))
}

Expand All @@ -124,14 +126,17 @@ func FileBackend(opts ...CollectorBakendFileOptionProvider) (CollectorBackend, e
}

func LoggerBackend(sender send.Sender) CollectorBackend {
wr := send.MakeWriter(sender)
return func(ctx context.Context, iter *fun.Iterator[MetricPublisher]) error {
wr := send.MakeWriter(sender)
for iter.Next(ctx) {
op := iter.Value()
if err := op(wr); err != nil {
grip.Error(wr.Close())
return err
}
grip.Info("messagePushed")
}
grip.Error(wr.Close())
return nil
}
}
Expand Down Expand Up @@ -307,6 +312,7 @@ func (c *connCacheItem) Write(in []byte) (int, error) {
func (c *connCacheItem) Close() error { c.closed = true; return c.conn.Close() }

func SocketBackend(opts ...CollectorBakendSocketOptionProvider) (CollectorBackend, error) {
grip.Debug("socketz")
conf := &CollectorBackendSocketConf{}
if err := fun.JoinOptionProviders(opts...).Apply(conf); err != nil {
return nil, err
Expand All @@ -317,13 +323,17 @@ func SocketBackend(opts ...CollectorBakendSocketOptionProvider) (CollectorBacken
return nil, err
}

counter := &intish.Atomic[int]{}
ec := &erc.Collector{}
conPoolWorker := fun.Worker(func(ctx context.Context) error {
return fun.Worker(func(ctx context.Context) error {
timer := time.NewTimer(0)
defer timer.Stop()

var isFinal bool
grip.Notice("starting conn pool wokrer")

defer func() { grip.Infoln("con itters", counter.Load()) }()

LOOP:
for {
Expand All @@ -332,15 +342,18 @@ func SocketBackend(opts ...CollectorBakendSocketOptionProvider) (CollectorBacken
isFinal, err = conf.handleDialError(ec.Add, err)
switch {
case err == nil && cc != nil:
grip.Debug("pushing Connection")
if _, err = conf.handleDialError(ec.Add, connCache.WaitPushBack(ctx, &connCacheItem{conn: cc})); err != nil {
return err
}
counter.Add(1)
continue LOOP
case err == nil && cc == nil:
continue LOOP
case isFinal:
return err
case !isFinal:
continue
continue LOOP
}
}

Expand All @@ -363,22 +376,31 @@ func SocketBackend(opts ...CollectorBakendSocketOptionProvider) (CollectorBacken
})

return func(ctx context.Context, iter *fun.Iterator[MetricPublisher]) error {
counter := &intish.Atomic[int]{}
grip.Notice("starting socket writer")
defer func() { grip.Infoln("socket counter", counter.Load()) }()
return iter.ProcessParallel(
func(ctx context.Context, pub MetricPublisher) (err error) {
defer func() { grip.Error(err); grip.Infoln("done worker;", counter.Load()) }()
grip.Info("start worker")
conn, err := connCache.WaitFront(ctx)
grip.Info("got first conn")
if err != nil {
return err
}
defer func() {
grip.Infoln("exiting start", err)
if err == nil {
grip.Infoln("written", conn.written)
go ft.Ignore(connCache.WaitPushBack(ctx, conn))
return
}
if ers.ContextExpired(err) || conn == nil {
return
}

grip.Info("exiting end")
err = erc.Join(err, ft.IgnoreFirst(conf.handleMessageError(ec.Add, conn.conn.Close())))
grip.Error(err)
}()

timer := time.NewTimer(0)
Expand All @@ -393,8 +415,10 @@ func SocketBackend(opts ...CollectorBakendSocketOptionProvider) (CollectorBacken

err = pub(conn)

grip.Info("message socket pushing begin")
isFinal, err = conf.handleMessageError(ec.Add, err)
if err == nil {
grip.Info("message socket pushed")
return nil
}
if isFinal {
Expand All @@ -406,7 +430,6 @@ func SocketBackend(opts ...CollectorBakendSocketOptionProvider) (CollectorBacken
if err != nil {
return err
}

timer.Reset(conf.MinMessageRetryDelay +
intish.Max(0, time.Duration(
rand.Int63n(int64(conf.MaxMessageRetryDelay)))-conf.MinMessageRetryDelay,
Expand All @@ -419,6 +442,6 @@ func SocketBackend(opts ...CollectorBakendSocketOptionProvider) (CollectorBacken
fun.WorkerGroupConfErrorHandler(ec.Add),
fun.WorkerGroupConfNumWorkers(conf.MessageWorkers),
conf.MessageErrorHandling.poolErrorOptions(),
).PreHook(conPoolWorker.Operation(ec.Add)).PostHook(func() { /* do cancel */ }).Run(ctx)
).PreHook(conPoolWorker.Operation(ec.Add).Once().Go()).PostHook(func() { /* do cancel */ }).Run(ctx)
}, nil
}
2 changes: 1 addition & 1 deletion series/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ func RenderMetricOpenTSB(buf *bytes.Buffer, key string, labels fun.Future[[]byte

func RenderLabelsGraphite(builder *bytes.Buffer, labels []dt.Pair[string, string], extra ...dt.Pair[string, string]) {
for _, label := range append(labels, extra...) {
builder.WriteByte(';')
builder.WriteString(label.Key)
builder.WriteByte('=')
builder.WriteString(label.Value)
builder.WriteByte(';')
}
}

Expand Down
52 changes: 52 additions & 0 deletions series/series_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package series

import (
"fmt"
"math/rand"
"net"
"testing"
"time"

"github.com/tychoish/fun/assert"
"github.com/tychoish/fun/ft"
"github.com/tychoish/fun/testt"
"github.com/tychoish/grip"
)

func TestIntegration(t *testing.T) {
t.Run("EndToEnd", func(t *testing.T) {
grip.Infoln("starting", t.Name())
ctx := testt.Context(t)
coll, err := NewCollector(
ctx,
CollectorConfOutputGraphite(),
CollectorConfBuffer(9001),
CollectorConfAppendBackends(
LoggerBackend(grip.Sender()),
ft.Must(SocketBackend(
CollectorBackendSocketConfMessageWorkers(4),
CollectorBackendSocketConfDialer(net.Dialer{
Timeout: 2 * time.Second,
KeepAlive: time.Minute,
}),
CollectorBackendSocketConfNetowrkTCP(),
CollectorBackendSocketConfAddress("localhost:2003"),
CollectorBackendSocketConfMinDialRetryDelay(100*time.Millisecond),
CollectorBackendSocketConfIdleConns(6),
CollectorBackendSocketConfMaxDialRetryDelay(time.Second),
CollectorBackendSocketConfMessageErrorHandling(CollectorBackendSocketErrorCollect),
CollectorBackendSocketConfDialErrorHandling(CollectorBackendSocketErrorCollect),
),
),
))
assert.NotError(t, err)
assert.True(t, coll != nil)

for i := int64(0); i < 1000; i++ {
coll.Push(Counter("grip_counter").Label("one", "hundred").Label("itermod", fmt.Sprint(i%10)).Add(100))
coll.Push(Gauge("grip_gauge").Label("one", "random").Label("itermod", fmt.Sprint(i%10)).Add(rand.Int63n((i + 1) * 100)))
}
time.Sleep(2500 * time.Millisecond)
assert.NotError(t, coll.Close())
})
}

0 comments on commit f216dfc

Please sign in to comment.