diff --git a/cmd/syncv3/main.go b/cmd/syncv3/main.go index 274bd084..9938ecc3 100644 --- a/cmd/syncv3/main.go +++ b/cmd/syncv3/main.go @@ -2,6 +2,14 @@ package main import ( "fmt" + "net/http" + _ "net/http/pprof" + "os" + "os/signal" + "strings" + "syscall" + "time" + "github.com/getsentry/sentry-go" sentryhttp "github.com/getsentry/sentry-go/http" syncv3 "github.com/matrix-org/sliding-sync" @@ -10,13 +18,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" - "net/http" - _ "net/http/pprof" - "os" - "os/signal" - "strings" - "syscall" - "time" ) var GitCommit string @@ -163,6 +164,8 @@ func main() { h2, h3 := syncv3.Setup(args[EnvServer], args[EnvDB], args[EnvSecret], syncv3.Opts{ AddPrometheusMetrics: args[EnvPrometheus] != "", + DBMaxConns: 100, + DBConnMaxIdleTime: time.Hour, }) go h2.StartV2Pollers() diff --git a/state/storage.go b/state/storage.go index a6af1c71..ff70d981 100644 --- a/state/storage.go +++ b/state/storage.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "strings" - "time" "github.com/getsentry/sentry-go" @@ -58,9 +57,6 @@ func NewStorage(postgresURI string) *Storage { // TODO: if we panic(), will sentry have a chance to flush the event? logger.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB") } - db.SetMaxOpenConns(100) - db.SetMaxIdleConns(80) - db.SetConnMaxLifetime(time.Hour) acc := &Accumulator{ db: db, roomsTable: NewRoomsTable(db), diff --git a/sync2/storage.go b/sync2/storage.go index 88d3c759..5f484179 100644 --- a/sync2/storage.go +++ b/sync2/storage.go @@ -2,7 +2,6 @@ package sync2 import ( "os" - "time" "github.com/getsentry/sentry-go" "github.com/jmoiron/sqlx" @@ -27,9 +26,6 @@ func NewStore(postgresURI, secret string) *Storage { // TODO: if we panic(), will sentry have a chance to flush the event? logger.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB") } - db.SetMaxOpenConns(100) - db.SetMaxIdleConns(80) - db.SetConnMaxLifetime(time.Hour) return &Storage{ DevicesTable: NewDevicesTable(db), TokensTable: NewTokensTable(db, secret), diff --git a/sync3/handler/handler.go b/sync3/handler/handler.go index 98d4e610..31870c5d 100644 --- a/sync3/handler/handler.go +++ b/sync3/handler/handler.go @@ -65,7 +65,7 @@ type SyncLiveHandler struct { } func NewSync3Handler( - store *state.Storage, storev2 *sync2.Storage, v2Client sync2.Client, postgresDBURI, secret string, + store *state.Storage, storev2 *sync2.Storage, v2Client sync2.Client, secret string, pub pubsub.Notifier, sub pubsub.Listener, enablePrometheus bool, maxPendingEventUpdates int, ) (*SyncLiveHandler, error) { logger.Info().Msg("creating handler") diff --git a/tests-integration/db_test.go b/tests-integration/db_test.go new file mode 100644 index 00000000..3897009e --- /dev/null +++ b/tests-integration/db_test.go @@ -0,0 +1,103 @@ +package syncv3 + +import ( + "encoding/json" + "fmt" + "sync" + "testing" + "time" + + syncv3 "github.com/matrix-org/sliding-sync" + "github.com/matrix-org/sliding-sync/sync2" + "github.com/matrix-org/sliding-sync/sync3" + "github.com/matrix-org/sliding-sync/testutils" + "github.com/matrix-org/sliding-sync/testutils/m" +) + +// Test that the proxy works fine with low max conns. Low max conns can be a problem +// if a request A needs 2 conns to respond and that blocks forward progress on the server, +// and the request can only obtain 1 conn. +func TestMaxDBConns(t *testing.T) { + pqString := testutils.PrepareDBConnectionString() + // setup code + v2 := runTestV2Server(t) + opts := syncv3.Opts{ + DBMaxConns: 1, + } + v3 := runTestServer(t, v2, pqString, opts) + defer v2.close() + defer v3.close() + + testMaxDBConns := func() { + // make N users and drip feed some events, make sure they are all seen + numUsers := 5 + var wg sync.WaitGroup + wg.Add(numUsers) + for i := 0; i < numUsers; i++ { + go func(n int) { + defer wg.Done() + userID := fmt.Sprintf("@maxconns_%d:localhost", n) + token := fmt.Sprintf("maxconns_%d", n) + roomID := fmt.Sprintf("!maxconns_%d", n) + v2.addAccount(t, userID, token) + v2.queueResponse(userID, sync2.SyncResponse{ + Rooms: sync2.SyncRoomsResponse{ + Join: v2JoinTimeline(roomEvents{ + roomID: roomID, + state: createRoomState(t, userID, time.Now()), + }), + }, + }) + // initial sync + res := v3.mustDoV3Request(t, token, sync3.Request{ + Lists: map[string]sync3.RequestList{"a": { + Ranges: sync3.SliceRanges{ + [2]int64{0, 1}, + }, + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: 1, + }, + }}, + }) + t.Logf("user %s has done an initial /sync OK", userID) + m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Count(1), m.MatchV3Ops( + m.MatchV3SyncOp(0, 0, []string{roomID}), + )), m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{ + roomID: { + m.MatchJoinCount(1), + }, + })) + // drip feed and get update + dripMsg := testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{ + "msgtype": "m.text", + "body": "drip drip", + }) + v2.queueResponse(userID, sync2.SyncResponse{ + Rooms: sync2.SyncRoomsResponse{ + Join: v2JoinTimeline(roomEvents{ + roomID: roomID, + events: []json.RawMessage{ + dripMsg, + }, + }), + }, + }) + t.Logf("user %s has queued the drip", userID) + v2.waitUntilEmpty(t, userID) + t.Logf("user %s poller has received the drip", userID) + res = v3.mustDoV3RequestWithPos(t, token, res.Pos, sync3.Request{}) + m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{ + roomID: { + m.MatchRoomTimelineMostRecent(1, []json.RawMessage{dripMsg}), + }, + })) + t.Logf("user %s has received the drip", userID) + }(i) + } + wg.Wait() + } + + testMaxDBConns() + v3.restart(t, v2, pqString, opts) + testMaxDBConns() +} diff --git a/tests-integration/v3_test.go b/tests-integration/v3_test.go index cd8bd2c1..de72d954 100644 --- a/tests-integration/v3_test.go +++ b/tests-integration/v3_test.go @@ -291,11 +291,11 @@ func (s *testV3Server) close() { s.h2.Teardown() } -func (s *testV3Server) restart(t *testing.T, v2 *testV2Server, pq string) { +func (s *testV3Server) restart(t *testing.T, v2 *testV2Server, pq string, opts ...syncv3.Opts) { t.Helper() log.Printf("restarting server") s.close() - ss := runTestServer(t, v2, pq) + ss := runTestServer(t, v2, pq, opts...) // replace all the fields which will be close()d to ensure we don't leak s.srv = ss.srv s.h2 = ss.h2 @@ -366,20 +366,22 @@ func runTestServer(t testutils.TestBenchInterface, v2Server *testV2Server, postg //tests often repeat requests. To ensure tests remain fast, reduce the spam protection limits. sync3.SpamProtectionInterval = time.Millisecond - metricsEnabled := false - maxPendingEventUpdates := 200 + combinedOpts := syncv3.Opts{ + TestingSynchronousPubsub: true, // critical to avoid flakey tests + AddPrometheusMetrics: false, + MaxPendingEventUpdates: 200, + } if len(opts) > 0 { - metricsEnabled = opts[0].AddPrometheusMetrics - if opts[0].MaxPendingEventUpdates > 0 { - maxPendingEventUpdates = opts[0].MaxPendingEventUpdates + opt := opts[0] + combinedOpts.AddPrometheusMetrics = opt.AddPrometheusMetrics + combinedOpts.DBConnMaxIdleTime = opt.DBConnMaxIdleTime + combinedOpts.DBMaxConns = opt.DBMaxConns + if opt.MaxPendingEventUpdates > 0 { + combinedOpts.MaxPendingEventUpdates = opt.MaxPendingEventUpdates handler.BufferWaitTime = 5 * time.Millisecond } } - h2, h3 := syncv3.Setup(v2Server.url(), postgresConnectionString, os.Getenv("SYNCV3_SECRET"), syncv3.Opts{ - TestingSynchronousPubsub: true, // critical to avoid flakey tests - MaxPendingEventUpdates: maxPendingEventUpdates, - AddPrometheusMetrics: metricsEnabled, - }) + h2, h3 := syncv3.Setup(v2Server.url(), postgresConnectionString, os.Getenv("SYNCV3_SECRET"), combinedOpts) // for ease of use we don't start v2 pollers at startup in tests r := mux.NewRouter() r.Use(hlog.NewHandler(logger)) diff --git a/v3.go b/v3.go index acb87575..74ae2be2 100644 --- a/v3.go +++ b/v3.go @@ -9,6 +9,7 @@ import ( "time" "github.com/getsentry/sentry-go" + "github.com/jmoiron/sqlx" "github.com/gorilla/mux" "github.com/matrix-org/sliding-sync/internal" @@ -36,6 +37,9 @@ type Opts struct { // if true, publishing messages will block until the consumer has consumed it. // Assumes a single producer and a single consumer. TestingSynchronousPubsub bool + + DBMaxConns int + DBConnMaxIdleTime time.Duration } type server struct { @@ -75,6 +79,18 @@ func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Han } store := state.NewStorage(postgresURI) storev2 := sync2.NewStore(postgresURI, secret) + for _, db := range []*sqlx.DB{store.DB, storev2.DB} { + if opts.DBMaxConns > 0 { + // https://github.com/go-sql-driver/mysql#important-settings + // "db.SetMaxIdleConns() is recommended to be set same to db.SetMaxOpenConns(). When it is smaller + // than SetMaxOpenConns(), connections can be opened and closed much more frequently than you expect." + db.SetMaxOpenConns(opts.DBMaxConns) + db.SetMaxIdleConns(opts.DBMaxConns) + } + if opts.DBConnMaxIdleTime > 0 { + db.SetConnMaxIdleTime(opts.DBConnMaxIdleTime) + } + } bufferSize := 50 if opts.TestingSynchronousPubsub { bufferSize = 0 @@ -93,7 +109,7 @@ func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Han pMap.SetCallbacks(h2) // create v3 handler - h3, err := handler.NewSync3Handler(store, storev2, v2Client, postgresURI, secret, pubSub, pubSub, opts.AddPrometheusMetrics, opts.MaxPendingEventUpdates) + h3, err := handler.NewSync3Handler(store, storev2, v2Client, secret, pubSub, pubSub, opts.AddPrometheusMetrics, opts.MaxPendingEventUpdates) if err != nil { panic(err) }