Skip to content

Commit

Permalink
Add more metrics around connection buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
kegsay committed Jul 24, 2023
1 parent d19bbbd commit 7dc999a
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 26 deletions.
64 changes: 59 additions & 5 deletions sync3/connmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/ReneKroon/ttlcache/v2"
"github.com/prometheus/client_golang/prometheus"
)

// ConnMap stores a collection of Conns.
Expand All @@ -15,10 +16,15 @@ type ConnMap struct {
userIDToConn map[string][]*Conn
connIDToConn map[string]*Conn

numConns prometheus.Gauge
// counters for reasons why connections have expired
expiryTimedOutCounter prometheus.Counter
expiryBufferFullCounter prometheus.Counter

mu *sync.Mutex
}

func NewConnMap() *ConnMap {
func NewConnMap(enablePrometheus bool) *ConnMap {
cm := &ConnMap{
userIDToConn: make(map[string][]*Conn),
connIDToConn: make(map[string]*Conn),
Expand All @@ -27,17 +33,61 @@ func NewConnMap() *ConnMap {
}
cm.cache.SetTTL(30 * time.Minute) // TODO: customisable
cm.cache.SetExpirationCallback(cm.closeConnExpires)

if enablePrometheus {
cm.expiryTimedOutCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "sliding_sync",
Subsystem: "api",
Name: "expiry_conn_timed_out",
Help: "Counter of expired API connections due to reaching TTL limit",
})
prometheus.MustRegister(cm.expiryTimedOutCounter)
cm.expiryBufferFullCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "sliding_sync",
Subsystem: "api",
Name: "expiry_conn_buffer_full",
Help: "Counter of expired API connections due to reaching buffer update limit",
})
prometheus.MustRegister(cm.expiryBufferFullCounter)
cm.numConns = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "sliding_sync",
Subsystem: "api",
Name: "num_active_conns",
Help: "Number of active sliding sync connections.",
})
prometheus.MustRegister(cm.numConns)
}
return cm
}

func (m *ConnMap) Teardown() {
m.cache.Close()

if m.numConns != nil {
prometheus.Unregister(m.numConns)
}
if m.expiryBufferFullCounter != nil {
prometheus.Unregister(m.expiryBufferFullCounter)
}
if m.expiryTimedOutCounter != nil {
prometheus.Unregister(m.expiryTimedOutCounter)
}
}

func (m *ConnMap) Len() int {
// UpdateMetrics recalculates the number of active connections. Do this when you think there is a change.
func (m *ConnMap) UpdateMetrics() {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.connIDToConn)
m.updateMetrics(len(m.connIDToConn))
}

// updateMetrics is like UpdateMetrics but doesn't touch connIDToConn and hence need a lock. We use this internally
// when we need to update the metric and already have the lock held, as calling UpdateMetrics would deadlock.
func (m *ConnMap) updateMetrics(numConns int) {
if m.numConns == nil {
return
}
m.numConns.Set(float64(numConns))
}

// Conns return all connections for this user|device
Expand All @@ -64,8 +114,9 @@ func (m *ConnMap) Conn(cid ConnID) *Conn {
return conn
}
// e.g buffer exceeded, close it and remove it from the cache
logger.Trace().Str("conn", cid.String()).Msg("closing connection due to dead connection (buffer full)")
logger.Info().Str("conn", cid.String()).Msg("closing connection due to dead connection (buffer full)")
m.closeConn(conn)
m.expiryBufferFullCounter.Inc()
return nil
}

Expand All @@ -92,6 +143,7 @@ func (m *ConnMap) CreateConn(cid ConnID, newConnHandler func() ConnHandler) (*Co
m.cache.Set(cid.String(), conn)
m.connIDToConn[cid.String()] = conn
m.userIDToConn[cid.UserID] = append(m.userIDToConn[cid.UserID], conn)
m.updateMetrics(len(m.connIDToConn))
return conn, true
}

Expand Down Expand Up @@ -121,7 +173,8 @@ func (m *ConnMap) closeConnExpires(connID string, value interface{}) {
m.mu.Lock()
defer m.mu.Unlock()
conn := value.(*Conn)
logger.Trace().Str("conn", connID).Msg("closing connection due to expired TTL in cache")
logger.Info().Str("conn", connID).Msg("closing connection due to expired TTL in cache")
m.expiryTimedOutCounter.Inc()
m.closeConn(conn)
}

Expand All @@ -147,4 +200,5 @@ func (m *ConnMap) closeConn(conn *Conn) {
m.userIDToConn[conn.UserID] = conns
// remove user cache listeners etc
h.Destroy()
m.updateMetrics(len(m.connIDToConn))
}
24 changes: 3 additions & 21 deletions sync3/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type SyncLiveHandler struct {
GlobalCache *caches.GlobalCache
maxPendingEventUpdates int

numConns prometheus.Gauge
setupHistVec *prometheus.HistogramVec
histVec *prometheus.HistogramVec
slowReqs prometheus.Counter
Expand All @@ -74,7 +73,7 @@ func NewSync3Handler(
V2: v2Client,
Storage: store,
V2Store: storev2,
ConnMap: sync3.NewConnMap(),
ConnMap: sync3.NewConnMap(enablePrometheus),
userCaches: &sync.Map{},
Dispatcher: sync3.NewDispatcher(),
GlobalCache: caches.NewGlobalCache(store),
Expand Down Expand Up @@ -128,9 +127,6 @@ func (h *SyncLiveHandler) Teardown() {
h.V2Sub.Teardown()
h.EnsurePoller.Teardown()
h.ConnMap.Teardown()
if h.numConns != nil {
prometheus.Unregister(h.numConns)
}
if h.setupHistVec != nil {
prometheus.Unregister(h.setupHistVec)
}
Expand All @@ -142,20 +138,7 @@ func (h *SyncLiveHandler) Teardown() {
}
}

func (h *SyncLiveHandler) updateMetrics() {
if h.numConns == nil {
return
}
h.numConns.Set(float64(h.ConnMap.Len()))
}

func (h *SyncLiveHandler) addPrometheusMetrics() {
h.numConns = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "sliding_sync",
Subsystem: "api",
Name: "num_active_conns",
Help: "Number of active sliding sync connections.",
})
h.setupHistVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "sliding_sync",
Subsystem: "api",
Expand All @@ -176,7 +159,6 @@ func (h *SyncLiveHandler) addPrometheusMetrics() {
Name: "slow_requests",
Help: "Counter of slow (>=50s) requests, initial or otherwise.",
})
prometheus.MustRegister(h.numConns)
prometheus.MustRegister(h.setupHistVec)
prometheus.MustRegister(h.histVec)
prometheus.MustRegister(h.slowReqs)
Expand Down Expand Up @@ -398,7 +380,7 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Requ
}

pid := sync2.PollerID{UserID: token.UserID, DeviceID: token.DeviceID}
log.Trace().Any("pid", pid).Msg("checking poller exists and is running")
log.Trace().Any("pid", pid).Msg("checking poller exists and is running")
h.EnsurePoller.EnsurePolling(req.Context(), pid, token.AccessTokenHash)
log.Trace().Msg("poller exists and is running")
// this may take a while so if the client has given up (e.g timed out) by this point, just stop.
Expand All @@ -421,7 +403,7 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Requ
}

// once we have the conn, make sure our metrics are correct
defer h.updateMetrics()
defer h.ConnMap.UpdateMetrics()

// Now the v2 side of things are running, we can make a v3 live sync conn
// NB: this isn't inherently racey (we did the check for an existing conn before EnsurePolling)
Expand Down

0 comments on commit 7dc999a

Please sign in to comment.