Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure clients see their transaction_ids #146

Merged
merged 46 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
a825375
Reproduce the problem
Jun 8, 2023
f9d4972
Construct a gadget for tracking pending txn IDs
Jul 25, 2023
8592fe3
TODO note
Jun 12, 2023
c5d7570
poller: mark txn IDs as seen
Jul 25, 2023
008157c
poller: send all-clear
Jul 25, 2023
59d547d
Stub txn id waiter
Jul 25, 2023
db69afc
Never call s.live.onUpdate outside of s.OnUpdate
Jul 25, 2023
9804518
Plumb in txn id waiter
Jul 25, 2023
a59e8d5
Ask InternalRequestLists if room is Visible
Jul 25, 2023
99d042e
Decide whether we should queue the update
Jul 25, 2023
4a6623f
Include room ID in the txnid payload
Jul 25, 2023
c154293
Queue events for up to 1s
Jul 25, 2023
c897887
todo marker
Jul 25, 2023
142290f
WIP make test pass??
Jul 25, 2023
56411d6
gotestfmt: hide successful jobs, packages, tests and downloads
Jul 26, 2023
f74794b
Include txn_id (if present) in room event updates
Jul 26, 2023
5904e5a
Make transaction id delay time configurable
Jul 26, 2023
d7440ab
correctly pass arg to gotestfmt
Jul 26, 2023
1bb082d
fixup NewConnState args
Jul 26, 2023
d67445b
Correctly pop from left of queue
Jul 26, 2023
3af5624
runTestServer: if opts are given, specify MaxDelay explicitly
Jul 26, 2023
726868c
Bypass queue logic if delay is turned off
Jul 26, 2023
9d86406
make Publish func public
Jul 26, 2023
fe74488
Clear queues on receipt of txn payload
Jul 26, 2023
f3750d2
Tweak comments
Jul 26, 2023
ad309b5
Remove unused matcher
Jul 26, 2023
14113e2
WIP test cases for the TxnIDWaiter
Jul 26, 2023
6c81134
PollerMap.deviceIDs: use zero-inited string slice
Jul 27, 2023
210e95b
PendingTransactionIDs: defer mutex unlock
Jul 27, 2023
5d98629
Fixup tests
Jul 27, 2023
6e8bbcc
Don't use txn ID buffering in integration tests
Jul 27, 2023
d010ff4
Add missing interface method on connHandlerMock
Jul 27, 2023
d006700
Fix new integration test timing
Jul 27, 2023
e35f6b8
Remove visibility nonsense
Jul 27, 2023
b6dbe91
WIP test cases for TxnIDWaiter
Jul 27, 2023
347eb45
Remove debug
Jul 27, 2023
7bb244d
TxnIDWaiter: Use mutex to guard queues
Jul 27, 2023
504e024
Remove more debug
Jul 27, 2023
1cdb4dc
TestyMcTestFace
Jul 27, 2023
70243a4
Don't delay state events
Jul 27, 2023
e64de5c
Make it easier to debug TestInviteAcceptance
Jul 27, 2023
5878828
Add t.Helper calls
Jul 28, 2023
9248d4e
Review comments
Jul 28, 2023
7c5442d
Integration test review comments
Jul 28, 2023
baa3596
E2E test
Jul 31, 2023
28b11f8
Emit txn id payloads when there are no new events
Jul 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
- name: Test
run: |
set -euo pipefail
go test -count=1 -covermode=atomic -coverpkg ./... -p 1 -v -json $(go list ./... | grep -v tests-e2e) -coverprofile synccoverage.out 2>&1 | tee ./test-integration.log | gotestfmt
go test -count=1 -covermode=atomic -coverpkg ./... -p 1 -v -json $(go list ./... | grep -v tests-e2e) -coverprofile synccoverage.out 2>&1 | tee ./test-integration.log | gotestfmt -hide all
shell: bash
env:
POSTGRES_HOST: localhost
Expand Down Expand Up @@ -144,7 +144,7 @@ jobs:
- name: Run end-to-end tests
run: |
set -euo pipefail
./run-tests.sh -count=1 -v -json . 2>&1 | tee test-e2e-runner.log | gotestfmt
./run-tests.sh -count=1 -v -json . 2>&1 | tee test-e2e-runner.log | gotestfmt -hide all
working-directory: tests-e2e
shell: bash
env:
Expand Down
7 changes: 4 additions & 3 deletions cmd/syncv3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,10 @@ func main() {
panic("invalid value for " + EnvMaxConns + ": " + args[EnvMaxConns])
}
h2, h3 := syncv3.Setup(args[EnvServer], args[EnvDB], args[EnvSecret], syncv3.Opts{
AddPrometheusMetrics: args[EnvPrometheus] != "",
DBMaxConns: maxConnsInt,
DBConnMaxIdleTime: time.Hour,
AddPrometheusMetrics: args[EnvPrometheus] != "",
DBMaxConns: maxConnsInt,
DBConnMaxIdleTime: time.Hour,
MaxTransactionIDDelay: time.Second,
})

go h2.StartV2Pollers()
Expand Down
9 changes: 6 additions & 3 deletions pubsub/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ type V2Accumulate struct {

func (*V2Accumulate) Type() string { return "V2Accumulate" }

// V2TransactionID is emitted by a poller when it sees an event with a transaction ID.
// V2TransactionID is emitted by a poller when it sees an event with a transaction ID,
// or when it is certain that no other poller will see a transaction ID for this event
// (the "all-clear").
type V2TransactionID struct {
EventID string
UserID string
RoomID string
UserID string // of the sender
DeviceID string
TransactionID string
TransactionID string // Note: an empty transaction ID represents the all-clear.
NID int64
}

Expand Down
85 changes: 50 additions & 35 deletions sync2/handler2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package handler2
import (
"context"
"encoding/json"
"fmt"
"hash/fnv"
"os"
"sync"
Expand Down Expand Up @@ -240,15 +239,24 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prev
// Remember any transaction IDs that may be unique to this user
eventIDsWithTxns := make([]string, 0, len(timeline)) // in timeline order
eventIDToTxnID := make(map[string]string, len(timeline)) // event_id -> txn_id
// Also remember events which were sent by this user but lack a transaction ID.
eventIDsLackingTxns := make([]string, 0, len(timeline))

for _, e := range timeline {
txnID := gjson.GetBytes(e, "unsigned.transaction_id")
if !txnID.Exists() {
parsed := gjson.ParseBytes(e)
eventID := parsed.Get("event_id").Str

if txnID := parsed.Get("unsigned.transaction_id"); txnID.Exists() {
eventIDsWithTxns = append(eventIDsWithTxns, eventID)
eventIDToTxnID[eventID] = txnID.Str
continue
}
eventID := gjson.GetBytes(e, "event_id").Str
eventIDsWithTxns = append(eventIDsWithTxns, eventID)
eventIDToTxnID[eventID] = txnID.Str

if sender := parsed.Get("sender"); sender.Str == userID {
eventIDsLackingTxns = append(eventIDsLackingTxns, eventID)
}
}

if len(eventIDToTxnID) > 0 {
// persist the txn IDs
err := h.Store.TransactionsTable.Insert(userID, deviceID, eventIDToTxnID)
Expand All @@ -265,56 +273,63 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prev
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
return
}
if numNew == 0 {
// no new events
return

// We've updated the database. Now tell any pubsub listeners what we learned.
if numNew != 0 {
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2Accumulate{
RoomID: roomID,
PrevBatch: prevBatch,
EventNIDs: latestNIDs,
})
}
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2Accumulate{
RoomID: roomID,
PrevBatch: prevBatch,
EventNIDs: latestNIDs,
})

if len(eventIDToTxnID) > 0 {
if len(eventIDToTxnID) > 0 || len(eventIDsLackingTxns) > 0 {
// The call to h.Store.Accumulate above only tells us about new events' NIDS;
// for existing events we need to requery the database to fetch them.
// Rather than try to reuse work, keep things simple and just fetch NIDs for
// all events with txnIDs.
var nidsByIDs map[string]int64
eventIDsToFetch := append(eventIDsWithTxns, eventIDsLackingTxns...)
err = sqlutil.WithTransaction(h.Store.DB, func(txn *sqlx.Tx) error {
nidsByIDs, err = h.Store.EventsTable.SelectNIDsByIDs(txn, eventIDsWithTxns)
nidsByIDs, err = h.Store.EventsTable.SelectNIDsByIDs(txn, eventIDsToFetch)
return err
})
if err != nil {
logger.Err(err).
Int("timeline", len(timeline)).
Int("num_transaction_ids", len(eventIDsWithTxns)).
Int("num_missing_transaction_ids", len(eventIDsLackingTxns)).
Str("room", roomID).
Msg("V2: failed to fetch nids for events with transaction_ids")
Msg("V2: failed to fetch nids for event transaction_id handling")
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
return
}

for _, eventID := range eventIDsWithTxns {
for eventID, nid := range nidsByIDs {
txnID, ok := eventIDToTxnID[eventID]
if !ok {
continue
}
nid, ok := nidsByIDs[eventID]
if !ok {
errMsg := "V2: failed to fetch NID for txnID"
logger.Error().Str("user", userID).Str("device", deviceID).Msg(errMsg)
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(fmt.Errorf("errMsg"))
continue
if ok {
h.pMap.SeenTxnID(eventID)
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2TransactionID{
EventID: eventID,
RoomID: roomID,
UserID: userID,
DeviceID: deviceID,
TransactionID: txnID,
NID: nid,
})
} else {
allClear, _ := h.pMap.MissingTxnID(eventID, userID, deviceID)
if allClear {
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2TransactionID{
EventID: eventID,
RoomID: roomID,
UserID: userID,
DeviceID: deviceID,
TransactionID: "",
NID: nid,
})
}
}

h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2TransactionID{
EventID: eventID,
UserID: userID,
DeviceID: deviceID,
TransactionID: txnID,
NID: nid,
})
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions sync2/handler2/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ func (p *mockPollerMap) NumPollers() int {
}
func (p *mockPollerMap) Terminate() {}

func (p *mockPollerMap) MissingTxnID(eventID, userID, deviceID string) (bool, error) {
return false, nil
}

func (p *mockPollerMap) SeenTxnID(eventID string) error {
return nil
}

func (p *mockPollerMap) EnsurePolling(pid sync2.PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger) {
p.calls = append(p.calls, pollInfo{
pid: pid,
Expand Down
26 changes: 26 additions & 0 deletions sync2/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type IPollerMap interface {
EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger)
NumPollers() int
Terminate()
MissingTxnID(eventID, userID, deviceID string) (bool, error)
SeenTxnID(eventID string) error
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
}

// PollerMap is a map of device ID to Poller
Expand All @@ -72,6 +74,7 @@ type PollerMap struct {
callbacks V2DataReceiver
pollerMu *sync.Mutex
Pollers map[PollerID]*poller
pendingTxnIDs *PendingTransactionIDs
executor chan func()
executorRunning bool
processHistogramVec *prometheus.HistogramVec
Expand Down Expand Up @@ -112,6 +115,7 @@ func NewPollerMap(v2Client Client, enablePrometheus bool) *PollerMap {
Pollers: make(map[PollerID]*poller),
executor: make(chan func(), 0),
}
pm.pendingTxnIDs = NewPendingTransactionIDs(pm.deviceIDs)
if enablePrometheus {
pm.processHistogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "sliding_sync",
Expand Down Expand Up @@ -195,6 +199,28 @@ func (h *PollerMap) NumPollers() (count int) {
return
}

// deviceIDs returns the slice of all devices currently being polled for by this user.
// The return value is brand-new and is fully owned by the caller.
func (h *PollerMap) deviceIDs(userID string) []string {
h.pollerMu.Lock()
defer h.pollerMu.Unlock()
var devices []string
for _, p := range h.Pollers {
if !p.terminated.Load() && p.userID == userID {
devices = append(devices, p.deviceID)
}
}
return devices
}

func (h *PollerMap) MissingTxnID(eventID, userID, deviceID string) (bool, error) {
return h.pendingTxnIDs.MissingTxnID(eventID, userID, deviceID)
}

func (h *PollerMap) SeenTxnID(eventID string) error {
return h.pendingTxnIDs.SeenTxnID(eventID)
}

// EnsurePolling makes sure there is a poller for this device, making one if need be.
// Blocks until at least 1 sync is done if and only if the poller was just created.
// This ensures that calls to the database will return data.
Expand Down
117 changes: 100 additions & 17 deletions sync2/txnid.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,121 @@
package sync2

import (
"fmt"
"sync"
"time"

"github.com/ReneKroon/ttlcache/v2"
)

type TransactionIDCache struct {
cache *ttlcache.Cache
type loaderFunc func(userID string) (deviceIDs []string)

// PendingTransactionIDs is (conceptually) a map from event IDs to a list of device IDs.
// Its keys are the IDs of event we've seen which a) lack a transaction ID, and b) were
// sent by one of the users we are polling for. The values are the list of the sender's
// devices whose pollers are yet to see a transaction ID.
//
// If another poller sees the same event
//
// - with a transaction ID, it emits a V2TransactionID payload with that ID and
// removes the event ID from this map.
//
// - without a transaction ID, it removes the polling device ID from the values
// list. If the device ID list is now empty, the poller emits an "all clear"
// V2TransactionID payload.
//
// This is a best-effort affair to ensure that the rest of the proxy can wait for
// transaction IDs to appear before transmitting an event down /sync to its sender.
//
// It's possible that we add an entry to this map and then the list of remaining
// device IDs becomes out of date, either due to a new device creation or an
// existing device expiring. We choose not to handle this case, because it is relatively
// rare.
//
// To avoid the map growing without bound, we use a ttlcache and drop entries
// after a short period of time.
type PendingTransactionIDs struct {
// mu guards the pending field. See MissingTxnID for rationale.
mu sync.Mutex
pending *ttlcache.Cache
// loader should provide the list of device IDs
loader loaderFunc
}

func NewTransactionIDCache() *TransactionIDCache {
func NewPendingTransactionIDs(loader loaderFunc) *PendingTransactionIDs {
c := ttlcache.NewCache()
c.SetTTL(5 * time.Minute) // keep transaction IDs for 5 minutes before forgetting about them
c.SkipTTLExtensionOnHit(true) // we don't care how many times they ask for the item, 5min is the limit.
return &TransactionIDCache{
cache: c,
return &PendingTransactionIDs{
mu: sync.Mutex{},
pending: c,
loader: loader,
}
}

// Store a new transaction ID received via v2 /sync
func (c *TransactionIDCache) Store(userID, eventID, txnID string) {
c.cache.Set(cacheKey(userID, eventID), txnID)
}
// MissingTxnID should be called to report that this device ID did not see a
// transaction ID for this event ID. Returns true if this is the first time we know
// for sure that we'll never see a txn ID for this event.
func (c *PendingTransactionIDs) MissingTxnID(eventID, userID, myDeviceID string) (bool, error) {
// While ttlcache is threadsafe, it does not provide a way to atomically update
// (get+set) a value, which means we are still open to races. For example:
//
// - We have three pollers A, B, C.
// - Poller A sees an event without txn id and calls MissingTxnID.
// - `c.pending.Get()` fails, so we load up all device IDs: [A, B, C].
// - Then `c.pending.Set()` with [B, C].
// - Poller B sees the same event, also missing txn ID and calls MissingTxnID.
// - Poller C does the same concurrently.
//
// If the Get+Set isn't atomic, then we might do e.g.
// - B gets [B, C] and prepares to write [C].
// - C gets [B, C] and prepares to write [B].
// - Last writer wins. Either way, we never write [] and so never return true
// (the all-clear signal.)
//
// This wouldn't be the end of the world (the API process has a maximum delay, and
// the ttlcache will expire the entry), but it would still be nice to avoid it.
c.mu.Lock()
defer c.mu.Unlock()

// Get a transaction ID previously stored.
func (c *TransactionIDCache) Get(userID, eventID string) string {
val, _ := c.cache.Get(cacheKey(userID, eventID))
if val != nil {
return val.(string)
data, err := c.pending.Get(eventID)
if err == ttlcache.ErrNotFound {
data = c.loader(userID)
} else if err != nil {
return false, fmt.Errorf("PendingTransactionIDs: failed to get device ids: %w", err)
}
return ""

deviceIDs, ok := data.([]string)
if !ok {
return false, fmt.Errorf("PendingTransactionIDs: failed to cast device IDs")
}

deviceIDs, changed := removeDevice(myDeviceID, deviceIDs)
if changed {
err = c.pending.Set(eventID, deviceIDs)
if err != nil {
return false, fmt.Errorf("PendingTransactionIDs: failed to set device IDs: %w", err)
}
}
return changed && len(deviceIDs) == 0, nil
}

// SeenTxnID should be called to report that this device saw a transaction ID
// for this event.
func (c *PendingTransactionIDs) SeenTxnID(eventID string) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.pending.Set(eventID, []string{})
}

func cacheKey(userID, eventID string) string {
return userID + " " + eventID
// removeDevice takes a device ID slice and returns a device ID slice with one
// particular string removed. Assumes that the given slice has no duplicates.
// Does not modify the given slice in situ.
func removeDevice(device string, devices []string) ([]string, bool) {
for i, otherDevice := range devices {
if otherDevice == device {
return append(devices[:i], devices[i+1:]...), true
}
}
return devices, false
}
Loading
Loading