Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track the time before processing a request #183

Merged
merged 3 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions sync3/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ConnHandler interface {
// Callback which is allowed to block as long as the context is active. Return the response
// to send back or an error. Errors of type *internal.HandlerError are inspected for the correct
// status code to send back.
OnIncomingRequest(ctx context.Context, cid ConnID, req *Request, isInitial bool) (*Response, error)
OnIncomingRequest(ctx context.Context, cid ConnID, req *Request, isInitial bool, start time.Time) (*Response, error)
OnUpdate(ctx context.Context, update caches.Update)
Destroy()
Alive() bool
Expand Down Expand Up @@ -88,7 +88,7 @@ func (c *Conn) OnUpdate(ctx context.Context, update caches.Update) {
// upwards but will NOT be logged to Sentry (neither here nor by the caller). Errors
// should be reported to Sentry as close as possible to the point of creating the error,
// to provide the best possible Sentry traceback.
func (c *Conn) tryRequest(ctx context.Context, req *Request) (res *Response, err error) {
func (c *Conn) tryRequest(ctx context.Context, req *Request, start time.Time) (res *Response, err error) {
// TODO: include useful information from the request in the sentry hub/context
// Might be better done in the caller though?
defer func() {
Expand Down Expand Up @@ -116,7 +116,7 @@ func (c *Conn) tryRequest(ctx context.Context, req *Request) (res *Response, err
ctx, task := internal.StartTask(ctx, taskType)
defer task.End()
internal.Logf(ctx, "connstate", "starting user=%v device=%v pos=%v", c.UserID, c.ConnID.DeviceID, req.pos)
return c.handler.OnIncomingRequest(ctx, c.ConnID, req, req.pos == 0)
return c.handler.OnIncomingRequest(ctx, c.ConnID, req, req.pos == 0, start)
}

func (c *Conn) isOutstanding(pos int64) bool {
Expand All @@ -132,7 +132,7 @@ func (c *Conn) isOutstanding(pos int64) bool {
// If an error is returned, it will be logged by the caller and transmitted to the
// client. It will NOT be reported to Sentry---this should happen as close as possible
// to the creation of the error (or else Sentry cannot provide a meaningful traceback.)
func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request) (resp *Response, herr *internal.HandlerError) {
func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request, start time.Time) (resp *Response, herr *internal.HandlerError) {
c.cancelOutstandingRequestMu.Lock()
if c.cancelOutstandingRequest != nil {
c.cancelOutstandingRequest()
Expand Down Expand Up @@ -217,7 +217,7 @@ func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request) (resp *Respo
req.SetTimeoutMSecs(1)
}

resp, err := c.tryRequest(ctx, req)
resp, err := c.tryRequest(ctx, req, start)
if err != nil {
herr, ok := err.(*internal.HandlerError)
if !ok {
Expand Down
40 changes: 20 additions & 20 deletions sync3/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type connHandlerMock struct {
fn func(ctx context.Context, cid ConnID, req *Request, isInitial bool) (*Response, error)
}

func (c *connHandlerMock) OnIncomingRequest(ctx context.Context, cid ConnID, req *Request, init bool) (*Response, error) {
func (c *connHandlerMock) OnIncomingRequest(ctx context.Context, cid ConnID, req *Request, init bool, start time.Time) (*Response, error) {
return c.fn(ctx, cid, req, init)
}
func (c *connHandlerMock) UserID() string {
Expand Down Expand Up @@ -47,22 +47,22 @@ func TestConn(t *testing.T) {
// initial request
resp, err := c.OnIncomingRequest(ctx, &Request{
pos: 0,
})
}, time.Now())
assertNoError(t, err)
assertPos(t, resp.Pos, 1)
assertInt(t, resp.Lists["a"].Count, 101)

// happy case, pos=1
resp, err = c.OnIncomingRequest(ctx, &Request{
pos: 1,
})
}, time.Now())
assertPos(t, resp.Pos, 2)
assertInt(t, resp.Lists["a"].Count, 102)
assertNoError(t, err)
// bogus position returns a 400
_, err = c.OnIncomingRequest(ctx, &Request{
pos: 31415,
})
}, time.Now())
if err == nil {
t.Fatalf("expected error, got none")
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestConnBlocking(t *testing.T) {
Sort: []string{"hi"},
},
},
})
}, time.Now())
}()
go func() {
defer wg.Done()
Expand All @@ -118,7 +118,7 @@ func TestConnBlocking(t *testing.T) {
Sort: []string{"hi2"},
},
},
})
}, time.Now())
}()
go func() {
wg.Wait()
Expand Down Expand Up @@ -148,18 +148,18 @@ func TestConnRetries(t *testing.T) {
},
}}, nil
}})
resp, err := c.OnIncomingRequest(ctx, &Request{})
resp, err := c.OnIncomingRequest(ctx, &Request{}, time.Now())
assertPos(t, resp.Pos, 1)
assertInt(t, resp.Lists["a"].Count, 20)
assertInt(t, callCount, 1)
assertNoError(t, err)
resp, err = c.OnIncomingRequest(ctx, &Request{pos: 1})
resp, err = c.OnIncomingRequest(ctx, &Request{pos: 1}, time.Now())
assertPos(t, resp.Pos, 2)
assertInt(t, resp.Lists["a"].Count, 20)
assertInt(t, callCount, 2)
assertNoError(t, err)
// retry! Shouldn't invoke handler again
resp, err = c.OnIncomingRequest(ctx, &Request{pos: 1})
resp, err = c.OnIncomingRequest(ctx, &Request{pos: 1}, time.Now())
assertPos(t, resp.Pos, 2)
assertInt(t, resp.Lists["a"].Count, 20)
assertInt(t, callCount, 2) // this doesn't increment
Expand All @@ -170,7 +170,7 @@ func TestConnRetries(t *testing.T) {
"a": {
Sort: []string{SortByName},
},
}})
}}, time.Now())
assertPos(t, resp.Pos, 2)
assertInt(t, resp.Lists["a"].Count, 20)
assertInt(t, callCount, 3) // this doesn't increment
Expand All @@ -191,25 +191,25 @@ func TestConnBufferRes(t *testing.T) {
},
}}, nil
}})
resp, err := c.OnIncomingRequest(ctx, &Request{})
resp, err := c.OnIncomingRequest(ctx, &Request{}, time.Now())
assertNoError(t, err)
assertPos(t, resp.Pos, 1)
assertInt(t, resp.Lists["a"].Count, 1)
assertInt(t, callCount, 1)
resp, err = c.OnIncomingRequest(ctx, &Request{pos: 1})
resp, err = c.OnIncomingRequest(ctx, &Request{pos: 1}, time.Now())
assertNoError(t, err)
assertPos(t, resp.Pos, 2)
assertInt(t, resp.Lists["a"].Count, 2)
assertInt(t, callCount, 2)
// retry with modified request data that shouldn't prompt data to be returned.
// should invoke handler again!
resp, err = c.OnIncomingRequest(ctx, &Request{pos: 1, UnsubscribeRooms: []string{"a"}})
resp, err = c.OnIncomingRequest(ctx, &Request{pos: 1, UnsubscribeRooms: []string{"a"}}, time.Now())
assertNoError(t, err)
assertPos(t, resp.Pos, 2)
assertInt(t, resp.Lists["a"].Count, 2)
assertInt(t, callCount, 3) // this DOES increment, the response is buffered and not returned yet.
// retry with same request body, so should NOT invoke handler again and return buffered response
resp, err = c.OnIncomingRequest(ctx, &Request{pos: 2, UnsubscribeRooms: []string{"a"}})
resp, err = c.OnIncomingRequest(ctx, &Request{pos: 2, UnsubscribeRooms: []string{"a"}}, time.Now())
assertNoError(t, err)
assertPos(t, resp.Pos, 3)
assertInt(t, resp.Lists["a"].Count, 3)
Expand All @@ -228,7 +228,7 @@ func TestConnErrors(t *testing.T) {

// random errors = 500
errCh <- errors.New("oops")
_, herr := c.OnIncomingRequest(ctx, &Request{})
_, herr := c.OnIncomingRequest(ctx, &Request{}, time.Now())
if herr.StatusCode != 500 {
t.Fatalf("random errors should be status 500, got %d", herr.StatusCode)
}
Expand All @@ -237,7 +237,7 @@ func TestConnErrors(t *testing.T) {
StatusCode: 400,
Err: errors.New("no way!"),
}
_, herr = c.OnIncomingRequest(ctx, &Request{})
_, herr = c.OnIncomingRequest(ctx, &Request{}, time.Now())
if herr.StatusCode != 400 {
t.Fatalf("expected status 400, got %d", herr.StatusCode)
}
Expand All @@ -258,7 +258,7 @@ func TestConnErrorsNoCache(t *testing.T) {
}
}})
// errors should not be cached
resp, herr := c.OnIncomingRequest(ctx, &Request{})
resp, herr := c.OnIncomingRequest(ctx, &Request{}, time.Now())
if herr != nil {
t.Fatalf("expected no error, got %+v", herr)
}
Expand All @@ -267,12 +267,12 @@ func TestConnErrorsNoCache(t *testing.T) {
StatusCode: 400,
Err: errors.New("no way!"),
}
_, herr = c.OnIncomingRequest(ctx, &Request{pos: resp.PosInt()})
_, herr = c.OnIncomingRequest(ctx, &Request{pos: resp.PosInt()}, time.Now())
if herr.StatusCode != 400 {
t.Fatalf("expected status 400, got %d", herr.StatusCode)
}
// but doing the exact same request should now work
_, herr = c.OnIncomingRequest(ctx, &Request{pos: resp.PosInt()})
_, herr = c.OnIncomingRequest(ctx, &Request{pos: resp.PosInt()}, time.Now())
if herr != nil {
t.Fatalf("expected no error, got %+v", herr)
}
Expand Down Expand Up @@ -361,7 +361,7 @@ func TestConnBufferRememberInflight(t *testing.T) {
var err *internal.HandlerError
for i, step := range steps {
t.Logf("Executing step %d", i)
resp, err = c.OnIncomingRequest(ctx, step.req)
resp, err = c.OnIncomingRequest(ctx, step.req, time.Now())
if !step.wantErr {
assertNoError(t, err)
}
Expand Down
19 changes: 17 additions & 2 deletions sync3/handler/connstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ type ConnState struct {
joinChecker JoinChecker

extensionsHandler extensions.HandlerInterface
setupHistogramVec *prometheus.HistogramVec
processHistogramVec *prometheus.HistogramVec
}

func NewConnState(
userID, deviceID string, userCache *caches.UserCache, globalCache *caches.GlobalCache,
ex extensions.HandlerInterface, joinChecker JoinChecker, histVec *prometheus.HistogramVec,
ex extensions.HandlerInterface, joinChecker JoinChecker, setupHistVec *prometheus.HistogramVec, histVec *prometheus.HistogramVec,
maxPendingEventUpdates int,
) *ConnState {
cs := &ConnState{
Expand All @@ -65,6 +66,7 @@ func NewConnState(
extensionsHandler: ex,
joinChecker: joinChecker,
lazyCache: NewLazyCache(),
setupHistogramVec: setupHistVec,
processHistogramVec: histVec,
}
cs.live = &connStateLive{
Expand Down Expand Up @@ -148,13 +150,15 @@ func (s *ConnState) load(ctx context.Context, req *sync3.Request) error {
}

// OnIncomingRequest is guaranteed to be called sequentially (it's protected by a mutex in conn.go)
func (s *ConnState) OnIncomingRequest(ctx context.Context, cid sync3.ConnID, req *sync3.Request, isInitial bool) (*sync3.Response, error) {
func (s *ConnState) OnIncomingRequest(ctx context.Context, cid sync3.ConnID, req *sync3.Request, isInitial bool, start time.Time) (*sync3.Response, error) {
if s.loadPosition == -1 {
// load() needs no ctx so drop it
_, region := internal.StartSpan(ctx, "load")
s.load(ctx, req)
region.End()
}
setupTime := time.Since(start)
s.trackSetupDuration(setupTime, isInitial)
return s.onIncomingRequest(ctx, req, isInitial)
}

Expand Down Expand Up @@ -577,6 +581,17 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
return rooms
}

func (s *ConnState) trackSetupDuration(dur time.Duration, isInitial bool) {
if s.setupHistogramVec == nil {
return
}
val := "0"
if isInitial {
val = "1"
}
s.setupHistogramVec.WithLabelValues(val).Observe(float64(dur.Seconds()))
}

func (s *ConnState) trackProcessDuration(dur time.Duration, isInitial bool) {
if s.processHistogramVec == nil {
return
Expand Down
Loading
Loading