diff --git a/sync3/connmap.go b/sync3/connmap.go index ebe24195..f37907b5 100644 --- a/sync3/connmap.go +++ b/sync3/connmap.go @@ -5,6 +5,7 @@ import ( "time" "github.com/ReneKroon/ttlcache/v2" + "github.com/prometheus/client_golang/prometheus" ) // ConnMap stores a collection of Conns. @@ -15,10 +16,15 @@ type ConnMap struct { userIDToConn map[string][]*Conn connIDToConn map[string]*Conn + numConns prometheus.Gauge + // counters for reasons why connections have expired + expiryTimedOutCounter prometheus.Counter + expiryBufferFullCounter prometheus.Counter + mu *sync.Mutex } -func NewConnMap() *ConnMap { +func NewConnMap(enablePrometheus bool) *ConnMap { cm := &ConnMap{ userIDToConn: make(map[string][]*Conn), connIDToConn: make(map[string]*Conn), @@ -27,17 +33,61 @@ func NewConnMap() *ConnMap { } cm.cache.SetTTL(30 * time.Minute) // TODO: customisable cm.cache.SetExpirationCallback(cm.closeConnExpires) + + if enablePrometheus { + cm.expiryTimedOutCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "sliding_sync", + Subsystem: "api", + Name: "expiry_conn_timed_out", + Help: "Counter of expired API connections due to reaching TTL limit", + }) + prometheus.MustRegister(cm.expiryTimedOutCounter) + cm.expiryBufferFullCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "sliding_sync", + Subsystem: "api", + Name: "expiry_conn_buffer_full", + Help: "Counter of expired API connections due to reaching buffer update limit", + }) + prometheus.MustRegister(cm.expiryBufferFullCounter) + cm.numConns = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "sliding_sync", + Subsystem: "api", + Name: "num_active_conns", + Help: "Number of active sliding sync connections.", + }) + prometheus.MustRegister(cm.numConns) + } return cm } func (m *ConnMap) Teardown() { m.cache.Close() + + if m.numConns != nil { + prometheus.Unregister(m.numConns) + } + if m.expiryBufferFullCounter != nil { + prometheus.Unregister(m.expiryBufferFullCounter) + } + if m.expiryTimedOutCounter != nil { + prometheus.Unregister(m.expiryTimedOutCounter) + } } -func (m *ConnMap) Len() int { +// UpdateMetrics recalculates the number of active connections. Do this when you think there is a change. +func (m *ConnMap) UpdateMetrics() { m.mu.Lock() defer m.mu.Unlock() - return len(m.connIDToConn) + m.updateMetrics(len(m.connIDToConn)) +} + +// updateMetrics is like UpdateMetrics but doesn't touch connIDToConn and hence need a lock. We use this internally +// when we need to update the metric and already have the lock held, as calling UpdateMetrics would deadlock. +func (m *ConnMap) updateMetrics(numConns int) { + if m.numConns == nil { + return + } + m.numConns.Set(float64(numConns)) } // Conns return all connections for this user|device @@ -64,8 +114,9 @@ func (m *ConnMap) Conn(cid ConnID) *Conn { return conn } // e.g buffer exceeded, close it and remove it from the cache - logger.Trace().Str("conn", cid.String()).Msg("closing connection due to dead connection (buffer full)") + logger.Info().Str("conn", cid.String()).Msg("closing connection due to dead connection (buffer full)") m.closeConn(conn) + m.expiryBufferFullCounter.Inc() return nil } @@ -92,6 +143,7 @@ func (m *ConnMap) CreateConn(cid ConnID, newConnHandler func() ConnHandler) (*Co m.cache.Set(cid.String(), conn) m.connIDToConn[cid.String()] = conn m.userIDToConn[cid.UserID] = append(m.userIDToConn[cid.UserID], conn) + m.updateMetrics(len(m.connIDToConn)) return conn, true } @@ -121,7 +173,8 @@ func (m *ConnMap) closeConnExpires(connID string, value interface{}) { m.mu.Lock() defer m.mu.Unlock() conn := value.(*Conn) - logger.Trace().Str("conn", connID).Msg("closing connection due to expired TTL in cache") + logger.Info().Str("conn", connID).Msg("closing connection due to expired TTL in cache") + m.expiryTimedOutCounter.Inc() m.closeConn(conn) } @@ -147,4 +200,5 @@ func (m *ConnMap) closeConn(conn *Conn) { m.userIDToConn[conn.UserID] = conns // remove user cache listeners etc h.Destroy() + m.updateMetrics(len(m.connIDToConn)) } diff --git a/sync3/handler/handler.go b/sync3/handler/handler.go index 84c461c3..c02c5a29 100644 --- a/sync3/handler/handler.go +++ b/sync3/handler/handler.go @@ -59,7 +59,6 @@ type SyncLiveHandler struct { GlobalCache *caches.GlobalCache maxPendingEventUpdates int - numConns prometheus.Gauge setupHistVec *prometheus.HistogramVec histVec *prometheus.HistogramVec slowReqs prometheus.Counter @@ -74,7 +73,7 @@ func NewSync3Handler( V2: v2Client, Storage: store, V2Store: storev2, - ConnMap: sync3.NewConnMap(), + ConnMap: sync3.NewConnMap(enablePrometheus), userCaches: &sync.Map{}, Dispatcher: sync3.NewDispatcher(), GlobalCache: caches.NewGlobalCache(store), @@ -128,9 +127,6 @@ func (h *SyncLiveHandler) Teardown() { h.V2Sub.Teardown() h.EnsurePoller.Teardown() h.ConnMap.Teardown() - if h.numConns != nil { - prometheus.Unregister(h.numConns) - } if h.setupHistVec != nil { prometheus.Unregister(h.setupHistVec) } @@ -142,20 +138,7 @@ func (h *SyncLiveHandler) Teardown() { } } -func (h *SyncLiveHandler) updateMetrics() { - if h.numConns == nil { - return - } - h.numConns.Set(float64(h.ConnMap.Len())) -} - func (h *SyncLiveHandler) addPrometheusMetrics() { - h.numConns = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "sliding_sync", - Subsystem: "api", - Name: "num_active_conns", - Help: "Number of active sliding sync connections.", - }) h.setupHistVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "sliding_sync", Subsystem: "api", @@ -176,7 +159,6 @@ func (h *SyncLiveHandler) addPrometheusMetrics() { Name: "slow_requests", Help: "Counter of slow (>=50s) requests, initial or otherwise.", }) - prometheus.MustRegister(h.numConns) prometheus.MustRegister(h.setupHistVec) prometheus.MustRegister(h.histVec) prometheus.MustRegister(h.slowReqs) @@ -398,7 +380,7 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Requ } pid := sync2.PollerID{UserID: token.UserID, DeviceID: token.DeviceID} - log.Trace().Any("pid", pid).Msg("checking poller exists and is running") + log.Trace().Any("pid", pid).Msg("checking poller exists and is running") h.EnsurePoller.EnsurePolling(req.Context(), pid, token.AccessTokenHash) log.Trace().Msg("poller exists and is running") // this may take a while so if the client has given up (e.g timed out) by this point, just stop. @@ -421,7 +403,7 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Requ } // once we have the conn, make sure our metrics are correct - defer h.updateMetrics() + defer h.ConnMap.UpdateMetrics() // Now the v2 side of things are running, we can make a v3 live sync conn // NB: this isn't inherently racey (we did the check for an existing conn before EnsurePolling)