diff --git a/send/async_group.go b/send/async_group.go index 03206e2..7af1433 100644 --- a/send/async_group.go +++ b/send/async_group.go @@ -61,21 +61,21 @@ func MakeAsyncGroup(ctx context.Context, bufferSize int, senders ...Sender) Send wg := &s.wg s.closer.Set(func() (err error) { s.doClose.Do(func() { - catcher := &erc.Collector{} - defer func() { err = catcher.Resolve() }() + ec := &erc.Collector{} + defer func() { err = ec.Resolve() }() defer s.cancel() - catcher.Add(s.senders.Close()) + ec.Add(s.senders.Close()) wg.Add(1) go func() { defer wg.Done() - catcher.Add(s.senders.Iterator().Observe(ctx, func(sender Sender) { - catcher.Add(sender.Close()) - })) + ec.Add(s.senders.Iterator().Observe(func(sender Sender) { + ec.Add(sender.Close()) + }).Run(ctx)) }() - catcher.Add(s.senders.Close()) + ec.Add(s.senders.Close()) close(shutdown) wg.Wait(ctx) s.cancel() @@ -111,24 +111,24 @@ func (s *asyncGroupSender) startSenderWorker(newSender Sender) { func (s *asyncGroupSender) SetPriority(p level.Priority) { s.Base.SetPriority(p) - fun.Invariant.Must(s.senders.Iterator().Observe(s.ctx, func(sender Sender) { + fun.Invariant.Must(s.senders.Iterator().Observe(func(sender Sender) { sender.SetPriority(p) - })) + }).Run(s.ctx)) } func (s *asyncGroupSender) SetErrorHandler(erh ErrorHandler) { s.Base.SetErrorHandler(erh) - fun.Invariant.Must(s.senders.Iterator().Observe(s.ctx, func(sender Sender) { + fun.Invariant.Must(s.senders.Iterator().Observe(func(sender Sender) { sender.SetErrorHandler(erh) - })) + }).Run(s.ctx)) } func (s *asyncGroupSender) SetFormatter(fmtr MessageFormatter) { s.Base.SetFormatter(fmtr) - fun.Invariant.Must(s.senders.Iterator().Observe(s.ctx, func(sender Sender) { + fun.Invariant.Must(s.senders.Iterator().Observe(func(sender Sender) { sender.SetFormatter(fmtr) - })) + }).Run(s.ctx)) } func (s *asyncGroupSender) Send(m message.Composer) { @@ -141,9 +141,9 @@ func (s *asyncGroupSender) Send(m message.Composer) { func (s *asyncGroupSender) Flush(ctx context.Context) error { catcher := &erc.Collector{} - fun.Invariant.Must(s.senders.Iterator().Observe(s.ctx, func(sender Sender) { + fun.Invariant.Must(s.senders.Iterator().Observe(func(sender Sender) { catcher.Add(sender.Flush(ctx)) - })) + }).Run(ctx)) return catcher.Resolve() } diff --git a/send/base.go b/send/base.go index 62b4b9e..eeb8a06 100644 --- a/send/base.go +++ b/send/base.go @@ -7,7 +7,7 @@ import ( "sync/atomic" "github.com/tychoish/fun/adt" - "github.com/tychoish/fun/erc" + "github.com/tychoish/fun/ers" "github.com/tychoish/grip/level" "github.com/tychoish/grip/message" ) @@ -43,7 +43,7 @@ var ErrAlreadyClosed = errors.New("sender already closed") // an error rooted in ErrAlreadyClosed. func (b *Base) Close() error { if swapped := b.closed.CompareAndSwap(false, true); !swapped { - return erc.Join(fmt.Errorf("sender %q is already closed: %w", b.Name(), ErrAlreadyClosed), b.doClose()) + return ers.Join(fmt.Errorf("sender %q is already closed: %w", b.Name(), ErrAlreadyClosed), b.doClose()) } return b.doClose() diff --git a/send/error_handler.go b/send/error_handler.go index f1346d9..6293499 100644 --- a/send/error_handler.go +++ b/send/error_handler.go @@ -5,7 +5,6 @@ import ( "io" "log" - "github.com/tychoish/fun/erc" "github.com/tychoish/fun/ers" "github.com/tychoish/grip/level" "github.com/tychoish/grip/message" @@ -22,7 +21,7 @@ func WrapError(err error, m message.Composer) error { return nil } - return erc.Join(ErrGripMessageSendError, err, ers.Error(m.String())) + return ers.Join(ErrGripMessageSendError, err, ers.Error(m.String())) } func ErrorHandlerWriter(writer io.Writer) ErrorHandler { diff --git a/series/collector.go b/series/collector.go index 17629eb..b983ce5 100644 --- a/series/collector.go +++ b/series/collector.go @@ -303,8 +303,7 @@ func (c *Collector) spawnBackground(dur time.Duration, tr *tracked) { case <-ctx.Done(): return case <-ticker.C: - if err := pipe.Iterator().Observe(ctx, func(tr *tracked) { - + if err := pipe.Iterator().Observe(func(tr *tracked) { c.broker.Publish(c.ctx, func(wr io.Writer, r Renderer) error { buf := c.pool.Get() defer c.pool.Put(buf) @@ -313,7 +312,7 @@ func (c *Collector) spawnBackground(dur time.Duration, tr *tracked) { return ft.IgnoreFirst(wr.Write(buf.Bytes())) }) - }); err != nil { + }).Run(ctx); err != nil { return } } diff --git a/series/persistance.go b/series/persistance.go index 3c8d244..f6d174b 100644 --- a/series/persistance.go +++ b/series/persistance.go @@ -139,7 +139,7 @@ func LoggerBackend(sender send.Sender, r Renderer) CollectorBackend { for iter.Next(ctx) { op := iter.Value() if err := op(wr, r); err != nil { - return erc.Join(err, wr.Close()) + return ers.Join(err, wr.Close()) } count++ } @@ -417,7 +417,7 @@ func SocketBackend(opts ...CollectorBakendSocketOptionProvider) (CollectorBacken connCacheSize.Add(1) return } - err = erc.Join(err, ft.IgnoreFirst(conf.handleMessageError(ec.Add, conn.conn.Close()))) + err = ers.Join(err, ft.IgnoreFirst(conf.handleMessageError(ec.Add, conn.conn.Close()))) }() timer := time.NewTimer(0) diff --git a/x/metrics/go.mod b/x/metrics/go.mod index a4ac746..e2b9c5f 100644 --- a/x/metrics/go.mod +++ b/x/metrics/go.mod @@ -5,7 +5,7 @@ go 1.20 require ( github.com/shirou/gopsutil v3.21.11+incompatible github.com/tychoish/birch v0.2.3-0.20230815160103-70c52db1b271 - github.com/tychoish/birch/x/ftdc v0.0.0-20230815160103-70c52db1b271 + github.com/tychoish/birch/x/ftdc v0.0.0-20230824231239-7522c174b74b github.com/tychoish/fun v0.10.4 github.com/tychoish/grip v0.3.5 ) diff --git a/x/metrics/go.sum b/x/metrics/go.sum index 70cfa56..72d8ae6 100644 --- a/x/metrics/go.sum +++ b/x/metrics/go.sum @@ -15,7 +15,10 @@ github.com/tychoish/birch v0.2.3-0.20230815160103-70c52db1b271 h1:4BBpigJOisXsnT github.com/tychoish/birch v0.2.3-0.20230815160103-70c52db1b271/go.mod h1:6VXjKviYBKvDKj5pADE1Ecw79+4AQMQ2PJTkfLXd3Vg= github.com/tychoish/birch/x/ftdc v0.0.0-20230815160103-70c52db1b271 h1:atNiog1swK3hIgzPTzdMU20a+rexlCDlbCndAzkzjAo= github.com/tychoish/birch/x/ftdc v0.0.0-20230815160103-70c52db1b271/go.mod h1:eAkHGxL73BATG+UC2yeIO9vnyitSjfNbImsN4k+Cx/o= +github.com/tychoish/birch/x/ftdc v0.0.0-20230824231239-7522c174b74b h1:iIB16kwCNPVRlz6T5Nv5wwzut5acOCTFTMqCIcduVHE= +github.com/tychoish/birch/x/ftdc v0.0.0-20230824231239-7522c174b74b/go.mod h1:r89lWK437DDM2fS4JO3kYKaYRSTCmrVeSeH6tKvNXos= github.com/tychoish/fun v0.10.4 h1:ZV//9hmGCKt5QDdEhEYLVWWjkjJkzhpIwXiHeTjUAOo= +github.com/tychoish/fun v0.10.4/go.mod h1:ZZfrwtsnHHV81ecZCBPp57DjjYY9Io39JH2QSXNpKn4= github.com/tychoish/grip v0.3.5 h1:HPJTW9xaO+kFubhSNCPGBiIBsXWCvGmX8kCtawX0NOA= github.com/tychoish/grip v0.3.5/go.mod h1:yUxPpU6c/Z40pyHGlmdUXQyOIW9QooUsw9qB3y0NmBA= github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= diff --git a/x/metrics/series.go b/x/metrics/series.go index b50225b..e3a6c98 100644 --- a/x/metrics/series.go +++ b/x/metrics/series.go @@ -13,28 +13,19 @@ import ( "github.com/tychoish/grip/series" ) -func CollectorConfOutputBSON() series.CollectorOptionProvider { - return func(conf *series.CollectorConf) error { - conf.LabelRenderer = RenderLabelsBSON - conf.MetricRenderer = RenderMetricBSON - conf.DefaultHistogramRender = RenderHistogramBSON - return nil +func SeriesRendererBSON() series.Renderer { + return series.Renderer{ + Metric: RenderMetricBSON, + Histogram: RenderHistogramBSON, } } -func RenderLabelsBSON(output *bytes.Buffer, labels []dt.Pair[string, string], extra ...dt.Pair[string, string]) { - doc := birch.DC.Make(len(labels)) - dt.Sliceify(append(labels, extra...)).Observe(func(label dt.Pair[string, string]) { - doc.Append(birch.EC.String(label.Key, label.Value)) - }) - - fun.Invariant.Must(ft.IgnoreFirst(doc.WriteTo(output))) -} - -func RenderMetricBSON(buf *bytes.Buffer, key string, labels fun.Future[[]byte], value int64, ts time.Time) { +func RenderMetricBSON(buf *bytes.Buffer, key string, labels fun.Future[*dt.Pairs[string, string]], value int64, ts time.Time) { doc := birch.DC.Elements(birch.EC.String("metric", key)) if tags := labels(); tags != nil { - doc.Append(birch.EC.SubDocumentFromReader("labels", birch.Reader(tags))) + tagdoc := birch.DC.Make(tags.Len()) + tags.Observe(func(kv dt.Pair[string, string]) { tagdoc.Append(birch.EC.String(kv.Key, kv.Value)) }) + doc.Append(birch.EC.SubDocument("labels", tagdoc)) } doc.Append( birch.EC.Time("ts", ts), @@ -46,15 +37,18 @@ func RenderMetricBSON(buf *bytes.Buffer, key string, labels fun.Future[[]byte], func RenderHistogramBSON( wr *bytes.Buffer, key string, - labels fun.Future[[]byte], + labels fun.Future[*dt.Pairs[string, string]], sample *dt.Pairs[float64, int64], ts time.Time, ) { doc := birch.DC.Elements(birch.EC.String("metric", key)) if tags := labels(); tags != nil { - doc.Append(birch.EC.SubDocumentFromReader("labels", birch.Reader(tags))) + tagdoc := birch.DC.Make(tags.Len()) + tags.Observe(func(kv dt.Pair[string, string]) { tagdoc.Append(birch.EC.String(kv.Key, kv.Value)) }) + doc.Append(birch.EC.SubDocument("labels", tagdoc)) } + doc.Append(birch.EC.Time("ts", ts)) quants := birch.DC.Make(sample.Len()) risky.Observe(sample.Iterator(), func(pair dt.Pair[float64, int64]) {