Skip to content

Commit

Permalink
Merge pull request #172 from matrix-org/kegan/max-db-conns-test
Browse files Browse the repository at this point in the history
Test varying DB max conns
  • Loading branch information
kegsay committed Jun 19, 2023
2 parents fd198b3 + 77b49f7 commit fc1fce5
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 29 deletions.
17 changes: 10 additions & 7 deletions cmd/syncv3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ package main

import (
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/getsentry/sentry-go"
sentryhttp "github.com/getsentry/sentry-go/http"
syncv3 "github.com/matrix-org/sliding-sync"
Expand All @@ -10,13 +18,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"strings"
"syscall"
"time"
)

var GitCommit string
Expand Down Expand Up @@ -163,6 +164,8 @@ func main() {

h2, h3 := syncv3.Setup(args[EnvServer], args[EnvDB], args[EnvSecret], syncv3.Opts{
AddPrometheusMetrics: args[EnvPrometheus] != "",
DBMaxConns: 100,
DBConnMaxIdleTime: time.Hour,
})

go h2.StartV2Pollers()
Expand Down
4 changes: 0 additions & 4 deletions state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/getsentry/sentry-go"

Expand Down Expand Up @@ -58,9 +57,6 @@ func NewStorage(postgresURI string) *Storage {
// TODO: if we panic(), will sentry have a chance to flush the event?
logger.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB")
}
db.SetMaxOpenConns(100)
db.SetMaxIdleConns(80)
db.SetConnMaxLifetime(time.Hour)
acc := &Accumulator{
db: db,
roomsTable: NewRoomsTable(db),
Expand Down
4 changes: 0 additions & 4 deletions sync2/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package sync2

import (
"os"
"time"

"github.com/getsentry/sentry-go"
"github.com/jmoiron/sqlx"
Expand All @@ -27,9 +26,6 @@ func NewStore(postgresURI, secret string) *Storage {
// TODO: if we panic(), will sentry have a chance to flush the event?
logger.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB")
}
db.SetMaxOpenConns(100)
db.SetMaxIdleConns(80)
db.SetConnMaxLifetime(time.Hour)
return &Storage{
DevicesTable: NewDevicesTable(db),
TokensTable: NewTokensTable(db, secret),
Expand Down
2 changes: 1 addition & 1 deletion sync3/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type SyncLiveHandler struct {
}

func NewSync3Handler(
store *state.Storage, storev2 *sync2.Storage, v2Client sync2.Client, postgresDBURI, secret string,
store *state.Storage, storev2 *sync2.Storage, v2Client sync2.Client, secret string,
pub pubsub.Notifier, sub pubsub.Listener, enablePrometheus bool, maxPendingEventUpdates int,
) (*SyncLiveHandler, error) {
logger.Info().Msg("creating handler")
Expand Down
103 changes: 103 additions & 0 deletions tests-integration/db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package syncv3

import (
"encoding/json"
"fmt"
"sync"
"testing"
"time"

syncv3 "github.com/matrix-org/sliding-sync"
"github.com/matrix-org/sliding-sync/sync2"
"github.com/matrix-org/sliding-sync/sync3"
"github.com/matrix-org/sliding-sync/testutils"
"github.com/matrix-org/sliding-sync/testutils/m"
)

// Test that the proxy works fine with low max conns. Low max conns can be a problem
// if a request A needs 2 conns to respond and that blocks forward progress on the server,
// and the request can only obtain 1 conn.
func TestMaxDBConns(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// setup code
v2 := runTestV2Server(t)
opts := syncv3.Opts{
DBMaxConns: 1,
}
v3 := runTestServer(t, v2, pqString, opts)
defer v2.close()
defer v3.close()

testMaxDBConns := func() {
// make N users and drip feed some events, make sure they are all seen
numUsers := 5
var wg sync.WaitGroup
wg.Add(numUsers)
for i := 0; i < numUsers; i++ {
go func(n int) {
defer wg.Done()
userID := fmt.Sprintf("@maxconns_%d:localhost", n)
token := fmt.Sprintf("maxconns_%d", n)
roomID := fmt.Sprintf("!maxconns_%d", n)
v2.addAccount(t, userID, token)
v2.queueResponse(userID, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: roomID,
state: createRoomState(t, userID, time.Now()),
}),
},
})
// initial sync
res := v3.mustDoV3Request(t, token, sync3.Request{
Lists: map[string]sync3.RequestList{"a": {
Ranges: sync3.SliceRanges{
[2]int64{0, 1},
},
RoomSubscription: sync3.RoomSubscription{
TimelineLimit: 1,
},
}},
})
t.Logf("user %s has done an initial /sync OK", userID)
m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Count(1), m.MatchV3Ops(
m.MatchV3SyncOp(0, 0, []string{roomID}),
)), m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
roomID: {
m.MatchJoinCount(1),
},
}))
// drip feed and get update
dripMsg := testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{
"msgtype": "m.text",
"body": "drip drip",
})
v2.queueResponse(userID, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: roomID,
events: []json.RawMessage{
dripMsg,
},
}),
},
})
t.Logf("user %s has queued the drip", userID)
v2.waitUntilEmpty(t, userID)
t.Logf("user %s poller has received the drip", userID)
res = v3.mustDoV3RequestWithPos(t, token, res.Pos, sync3.Request{})
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
roomID: {
m.MatchRoomTimelineMostRecent(1, []json.RawMessage{dripMsg}),
},
}))
t.Logf("user %s has received the drip", userID)
}(i)
}
wg.Wait()
}

testMaxDBConns()
v3.restart(t, v2, pqString, opts)
testMaxDBConns()
}
26 changes: 14 additions & 12 deletions tests-integration/v3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,11 @@ func (s *testV3Server) close() {
s.h2.Teardown()
}

func (s *testV3Server) restart(t *testing.T, v2 *testV2Server, pq string) {
func (s *testV3Server) restart(t *testing.T, v2 *testV2Server, pq string, opts ...syncv3.Opts) {
t.Helper()
log.Printf("restarting server")
s.close()
ss := runTestServer(t, v2, pq)
ss := runTestServer(t, v2, pq, opts...)
// replace all the fields which will be close()d to ensure we don't leak
s.srv = ss.srv
s.h2 = ss.h2
Expand Down Expand Up @@ -366,20 +366,22 @@ func runTestServer(t testutils.TestBenchInterface, v2Server *testV2Server, postg
//tests often repeat requests. To ensure tests remain fast, reduce the spam protection limits.
sync3.SpamProtectionInterval = time.Millisecond

metricsEnabled := false
maxPendingEventUpdates := 200
combinedOpts := syncv3.Opts{
TestingSynchronousPubsub: true, // critical to avoid flakey tests
AddPrometheusMetrics: false,
MaxPendingEventUpdates: 200,
}
if len(opts) > 0 {
metricsEnabled = opts[0].AddPrometheusMetrics
if opts[0].MaxPendingEventUpdates > 0 {
maxPendingEventUpdates = opts[0].MaxPendingEventUpdates
opt := opts[0]
combinedOpts.AddPrometheusMetrics = opt.AddPrometheusMetrics
combinedOpts.DBConnMaxIdleTime = opt.DBConnMaxIdleTime
combinedOpts.DBMaxConns = opt.DBMaxConns
if opt.MaxPendingEventUpdates > 0 {
combinedOpts.MaxPendingEventUpdates = opt.MaxPendingEventUpdates
handler.BufferWaitTime = 5 * time.Millisecond
}
}
h2, h3 := syncv3.Setup(v2Server.url(), postgresConnectionString, os.Getenv("SYNCV3_SECRET"), syncv3.Opts{
TestingSynchronousPubsub: true, // critical to avoid flakey tests
MaxPendingEventUpdates: maxPendingEventUpdates,
AddPrometheusMetrics: metricsEnabled,
})
h2, h3 := syncv3.Setup(v2Server.url(), postgresConnectionString, os.Getenv("SYNCV3_SECRET"), combinedOpts)
// for ease of use we don't start v2 pollers at startup in tests
r := mux.NewRouter()
r.Use(hlog.NewHandler(logger))
Expand Down
18 changes: 17 additions & 1 deletion v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/getsentry/sentry-go"
"github.com/jmoiron/sqlx"

"github.com/gorilla/mux"
"github.com/matrix-org/sliding-sync/internal"
Expand Down Expand Up @@ -36,6 +37,9 @@ type Opts struct {
// if true, publishing messages will block until the consumer has consumed it.
// Assumes a single producer and a single consumer.
TestingSynchronousPubsub bool

DBMaxConns int
DBConnMaxIdleTime time.Duration
}

type server struct {
Expand Down Expand Up @@ -75,6 +79,18 @@ func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Han
}
store := state.NewStorage(postgresURI)
storev2 := sync2.NewStore(postgresURI, secret)
for _, db := range []*sqlx.DB{store.DB, storev2.DB} {
if opts.DBMaxConns > 0 {
// https://github.com/go-sql-driver/mysql#important-settings
// "db.SetMaxIdleConns() is recommended to be set same to db.SetMaxOpenConns(). When it is smaller
// than SetMaxOpenConns(), connections can be opened and closed much more frequently than you expect."
db.SetMaxOpenConns(opts.DBMaxConns)
db.SetMaxIdleConns(opts.DBMaxConns)
}
if opts.DBConnMaxIdleTime > 0 {
db.SetConnMaxIdleTime(opts.DBConnMaxIdleTime)
}
}
bufferSize := 50
if opts.TestingSynchronousPubsub {
bufferSize = 0
Expand All @@ -93,7 +109,7 @@ func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Han
pMap.SetCallbacks(h2)

// create v3 handler
h3, err := handler.NewSync3Handler(store, storev2, v2Client, postgresURI, secret, pubSub, pubSub, opts.AddPrometheusMetrics, opts.MaxPendingEventUpdates)
h3, err := handler.NewSync3Handler(store, storev2, v2Client, secret, pubSub, pubSub, opts.AddPrometheusMetrics, opts.MaxPendingEventUpdates)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit fc1fce5

Please sign in to comment.