Skip to content

Commit

Permalink
build: compile and dependency upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish committed Aug 24, 2023
1 parent fb914b0 commit fd39190
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 44 deletions.
30 changes: 15 additions & 15 deletions send/async_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,21 @@ func MakeAsyncGroup(ctx context.Context, bufferSize int, senders ...Sender) Send
wg := &s.wg
s.closer.Set(func() (err error) {
s.doClose.Do(func() {
catcher := &erc.Collector{}
defer func() { err = catcher.Resolve() }()
ec := &erc.Collector{}
defer func() { err = ec.Resolve() }()
defer s.cancel()
catcher.Add(s.senders.Close())
ec.Add(s.senders.Close())

wg.Add(1)
go func() {
defer wg.Done()
catcher.Add(s.senders.Iterator().Observe(ctx, func(sender Sender) {
catcher.Add(sender.Close())
}))
ec.Add(s.senders.Iterator().Observe(func(sender Sender) {
ec.Add(sender.Close())
}).Run(ctx))

}()

catcher.Add(s.senders.Close())
ec.Add(s.senders.Close())
close(shutdown)
wg.Wait(ctx)
s.cancel()
Expand Down Expand Up @@ -111,24 +111,24 @@ func (s *asyncGroupSender) startSenderWorker(newSender Sender) {
func (s *asyncGroupSender) SetPriority(p level.Priority) {
s.Base.SetPriority(p)

fun.Invariant.Must(s.senders.Iterator().Observe(s.ctx, func(sender Sender) {
fun.Invariant.Must(s.senders.Iterator().Observe(func(sender Sender) {
sender.SetPriority(p)
}))
}).Run(s.ctx))
}

func (s *asyncGroupSender) SetErrorHandler(erh ErrorHandler) {
s.Base.SetErrorHandler(erh)

fun.Invariant.Must(s.senders.Iterator().Observe(s.ctx, func(sender Sender) {
fun.Invariant.Must(s.senders.Iterator().Observe(func(sender Sender) {
sender.SetErrorHandler(erh)
}))
}).Run(s.ctx))
}

func (s *asyncGroupSender) SetFormatter(fmtr MessageFormatter) {
s.Base.SetFormatter(fmtr)
fun.Invariant.Must(s.senders.Iterator().Observe(s.ctx, func(sender Sender) {
fun.Invariant.Must(s.senders.Iterator().Observe(func(sender Sender) {
sender.SetFormatter(fmtr)
}))
}).Run(s.ctx))
}

func (s *asyncGroupSender) Send(m message.Composer) {
Expand All @@ -141,9 +141,9 @@ func (s *asyncGroupSender) Send(m message.Composer) {
func (s *asyncGroupSender) Flush(ctx context.Context) error {
catcher := &erc.Collector{}

fun.Invariant.Must(s.senders.Iterator().Observe(s.ctx, func(sender Sender) {
fun.Invariant.Must(s.senders.Iterator().Observe(func(sender Sender) {
catcher.Add(sender.Flush(ctx))
}))
}).Run(ctx))

return catcher.Resolve()
}
4 changes: 2 additions & 2 deletions send/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync/atomic"

"github.com/tychoish/fun/adt"
"github.com/tychoish/fun/erc"
"github.com/tychoish/fun/ers"
"github.com/tychoish/grip/level"
"github.com/tychoish/grip/message"
)
Expand Down Expand Up @@ -43,7 +43,7 @@ var ErrAlreadyClosed = errors.New("sender already closed")
// an error rooted in ErrAlreadyClosed.
func (b *Base) Close() error {
if swapped := b.closed.CompareAndSwap(false, true); !swapped {
return erc.Join(fmt.Errorf("sender %q is already closed: %w", b.Name(), ErrAlreadyClosed), b.doClose())
return ers.Join(fmt.Errorf("sender %q is already closed: %w", b.Name(), ErrAlreadyClosed), b.doClose())
}

return b.doClose()
Expand Down
3 changes: 1 addition & 2 deletions send/error_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"io"
"log"

"github.com/tychoish/fun/erc"
"github.com/tychoish/fun/ers"
"github.com/tychoish/grip/level"
"github.com/tychoish/grip/message"
Expand All @@ -22,7 +21,7 @@ func WrapError(err error, m message.Composer) error {
return nil
}

return erc.Join(ErrGripMessageSendError, err, ers.Error(m.String()))
return ers.Join(ErrGripMessageSendError, err, ers.Error(m.String()))
}

func ErrorHandlerWriter(writer io.Writer) ErrorHandler {
Expand Down
5 changes: 2 additions & 3 deletions series/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,7 @@ func (c *Collector) spawnBackground(dur time.Duration, tr *tracked) {
case <-ctx.Done():
return
case <-ticker.C:
if err := pipe.Iterator().Observe(ctx, func(tr *tracked) {

if err := pipe.Iterator().Observe(func(tr *tracked) {
c.broker.Publish(c.ctx, func(wr io.Writer, r Renderer) error {
buf := c.pool.Get()
defer c.pool.Put(buf)
Expand All @@ -313,7 +312,7 @@ func (c *Collector) spawnBackground(dur time.Duration, tr *tracked) {

return ft.IgnoreFirst(wr.Write(buf.Bytes()))
})
}); err != nil {
}).Run(ctx); err != nil {
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions series/persistance.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func LoggerBackend(sender send.Sender, r Renderer) CollectorBackend {
for iter.Next(ctx) {
op := iter.Value()
if err := op(wr, r); err != nil {
return erc.Join(err, wr.Close())
return ers.Join(err, wr.Close())
}
count++
}
Expand Down Expand Up @@ -417,7 +417,7 @@ func SocketBackend(opts ...CollectorBakendSocketOptionProvider) (CollectorBacken
connCacheSize.Add(1)
return
}
err = erc.Join(err, ft.IgnoreFirst(conf.handleMessageError(ec.Add, conn.conn.Close())))
err = ers.Join(err, ft.IgnoreFirst(conf.handleMessageError(ec.Add, conn.conn.Close())))
}()

timer := time.NewTimer(0)
Expand Down
2 changes: 1 addition & 1 deletion x/metrics/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/tychoish/birch v0.2.3-0.20230815160103-70c52db1b271
github.com/tychoish/birch/x/ftdc v0.0.0-20230815160103-70c52db1b271
github.com/tychoish/birch/x/ftdc v0.0.0-20230824231239-7522c174b74b
github.com/tychoish/fun v0.10.4
github.com/tychoish/grip v0.3.5
)
Expand Down
3 changes: 3 additions & 0 deletions x/metrics/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ github.com/tychoish/birch v0.2.3-0.20230815160103-70c52db1b271 h1:4BBpigJOisXsnT
github.com/tychoish/birch v0.2.3-0.20230815160103-70c52db1b271/go.mod h1:6VXjKviYBKvDKj5pADE1Ecw79+4AQMQ2PJTkfLXd3Vg=
github.com/tychoish/birch/x/ftdc v0.0.0-20230815160103-70c52db1b271 h1:atNiog1swK3hIgzPTzdMU20a+rexlCDlbCndAzkzjAo=
github.com/tychoish/birch/x/ftdc v0.0.0-20230815160103-70c52db1b271/go.mod h1:eAkHGxL73BATG+UC2yeIO9vnyitSjfNbImsN4k+Cx/o=
github.com/tychoish/birch/x/ftdc v0.0.0-20230824231239-7522c174b74b h1:iIB16kwCNPVRlz6T5Nv5wwzut5acOCTFTMqCIcduVHE=
github.com/tychoish/birch/x/ftdc v0.0.0-20230824231239-7522c174b74b/go.mod h1:r89lWK437DDM2fS4JO3kYKaYRSTCmrVeSeH6tKvNXos=
github.com/tychoish/fun v0.10.4 h1:ZV//9hmGCKt5QDdEhEYLVWWjkjJkzhpIwXiHeTjUAOo=
github.com/tychoish/fun v0.10.4/go.mod h1:ZZfrwtsnHHV81ecZCBPp57DjjYY9Io39JH2QSXNpKn4=
github.com/tychoish/grip v0.3.5 h1:HPJTW9xaO+kFubhSNCPGBiIBsXWCvGmX8kCtawX0NOA=
github.com/tychoish/grip v0.3.5/go.mod h1:yUxPpU6c/Z40pyHGlmdUXQyOIW9QooUsw9qB3y0NmBA=
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
Expand Down
32 changes: 13 additions & 19 deletions x/metrics/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,19 @@ import (
"github.com/tychoish/grip/series"
)

func CollectorConfOutputBSON() series.CollectorOptionProvider {
return func(conf *series.CollectorConf) error {
conf.LabelRenderer = RenderLabelsBSON
conf.MetricRenderer = RenderMetricBSON
conf.DefaultHistogramRender = RenderHistogramBSON
return nil
func SeriesRendererBSON() series.Renderer {
return series.Renderer{
Metric: RenderMetricBSON,
Histogram: RenderHistogramBSON,
}
}

func RenderLabelsBSON(output *bytes.Buffer, labels []dt.Pair[string, string], extra ...dt.Pair[string, string]) {
doc := birch.DC.Make(len(labels))
dt.Sliceify(append(labels, extra...)).Observe(func(label dt.Pair[string, string]) {
doc.Append(birch.EC.String(label.Key, label.Value))
})

fun.Invariant.Must(ft.IgnoreFirst(doc.WriteTo(output)))
}

func RenderMetricBSON(buf *bytes.Buffer, key string, labels fun.Future[[]byte], value int64, ts time.Time) {
func RenderMetricBSON(buf *bytes.Buffer, key string, labels fun.Future[*dt.Pairs[string, string]], value int64, ts time.Time) {
doc := birch.DC.Elements(birch.EC.String("metric", key))
if tags := labels(); tags != nil {
doc.Append(birch.EC.SubDocumentFromReader("labels", birch.Reader(tags)))
tagdoc := birch.DC.Make(tags.Len())
tags.Observe(func(kv dt.Pair[string, string]) { tagdoc.Append(birch.EC.String(kv.Key, kv.Value)) })
doc.Append(birch.EC.SubDocument("labels", tagdoc))
}
doc.Append(
birch.EC.Time("ts", ts),
Expand All @@ -46,15 +37,18 @@ func RenderMetricBSON(buf *bytes.Buffer, key string, labels fun.Future[[]byte],
func RenderHistogramBSON(
wr *bytes.Buffer,
key string,
labels fun.Future[[]byte],
labels fun.Future[*dt.Pairs[string, string]],
sample *dt.Pairs[float64, int64],
ts time.Time,
) {
doc := birch.DC.Elements(birch.EC.String("metric", key))

if tags := labels(); tags != nil {
doc.Append(birch.EC.SubDocumentFromReader("labels", birch.Reader(tags)))
tagdoc := birch.DC.Make(tags.Len())
tags.Observe(func(kv dt.Pair[string, string]) { tagdoc.Append(birch.EC.String(kv.Key, kv.Value)) })
doc.Append(birch.EC.SubDocument("labels", tagdoc))
}

doc.Append(birch.EC.Time("ts", ts))
quants := birch.DC.Make(sample.Len())
risky.Observe(sample.Iterator(), func(pair dt.Pair[float64, int64]) {
Expand Down

0 comments on commit fd39190

Please sign in to comment.