Skip to content

Commit

Permalink
Merge pull request #368 from matrix-org/kegan/fix-create-event
Browse files Browse the repository at this point in the history
bugfix: handle malformed state/timeline responses
  • Loading branch information
kegsay committed Nov 8, 2023
2 parents 421a572 + be0fb09 commit bb003c1
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 2 deletions.
46 changes: 44 additions & 2 deletions sync2/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync/atomic"
"time"

"golang.org/x/exp/slices"

"github.com/getsentry/sentry-go"

"github.com/matrix-org/sliding-sync/internal"
Expand Down Expand Up @@ -791,8 +793,48 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
stateCalls++
prependStateEvents, err := p.receiver.Initialise(ctx, roomID, roomData.State.Events)
if err != nil {
lastErrs = append(lastErrs, fmt.Errorf("Initialise[%s]: %w", roomID, err))
continue
_, ok := err.(*internal.DataError)
if ok {
// This typically happens when we are missing an m.room.create event.
// Synapse may sometimes send the m.room.create event erroneously in the timeline,
// so check if that is the case here. See https://github.com/matrix-org/complement/pull/690
var createEvent json.RawMessage
for i, ev := range roomData.Timeline.Events {
evv := gjson.ParseBytes(ev)
if evv.Get("type").Str == "m.room.create" && evv.Get("state_key").Exists() && evv.Get("state_key").Str == "" {
createEvent = roomData.Timeline.Events[i]
// remove the create event from the timeline so we don't double process it
roomData.Timeline.Events = append(roomData.Timeline.Events[:i], roomData.Timeline.Events[i+1:]...)
break
}
}
if createEvent != nil {
roomData.State.Events = slices.Insert(roomData.State.Events, 0, createEvent)
// retry the processing of the room state
prependStateEvents, err = p.receiver.Initialise(ctx, roomID, roomData.State.Events)
if err == nil {
const warnMsg = "parseRoomsResponse: m.room.create event was found in the timeline not state, info after moving create event"
logger.Warn().Str("user_id", p.userID).Str("room_id", roomID).Int(
"timeline", len(roomData.Timeline.Events),
).Int("state", len(roomData.State.Events)).Msg(warnMsg)
hub := internal.GetSentryHubFromContextOrDefault(ctx)
hub.WithScope(func(scope *sentry.Scope) {
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
"room_id": roomID,
"timeline": len(roomData.Timeline.Events),
"state": len(roomData.State.Events),
})
hub.CaptureMessage(warnMsg)
})
}
}
}
// either err isn't a data error OR we retried Initialise and it still returned an error
// either way, give up.
if err != nil {
lastErrs = append(lastErrs, fmt.Errorf("Initialise[%s]: %w", roomID, err))
continue
}
}
if len(prependStateEvents) > 0 {
// The poller has just learned of these state events due to an
Expand Down
69 changes: 69 additions & 0 deletions tests-integration/timeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"testing"
"time"

"github.com/jmoiron/sqlx"
slidingsync "github.com/matrix-org/sliding-sync"

"github.com/matrix-org/sliding-sync/sqlutil"
"github.com/matrix-org/sliding-sync/state"
"github.com/matrix-org/sliding-sync/sync2"
"github.com/matrix-org/sliding-sync/sync3"
"github.com/matrix-org/sliding-sync/testutils"
Expand Down Expand Up @@ -1335,3 +1338,69 @@ func TestNumLiveBulk(t *testing.T) {
},
))
}

// Regression test for a thing which Synapse can sometimes send down sync v2.
// See https://github.com/matrix-org/sliding-sync/issues/367
// This would cause this room to not be processed at all, which is bad.
func TestSeeCreateEvent(t *testing.T) {
// setup code
pqString := testutils.PrepareDBConnectionString()
db, err := sqlx.Open("postgres", pqString)
if err != nil {
t.Fatalf("failed to open postgres: %s", err)
}
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()

roomID := "!TestSeeCreateEvent:localhost"
userID := "@TestSeeCreateEvent:localhost"
token := "TestSeeCreateEvent_TOKEN"
v2.addAccount(t, userID, token)
v2.queueResponse(userID, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
roomID: {
State: sync2.EventsResponse{
Events: []json.RawMessage{
testutils.NewStateEvent(t, "m.room.join_rules", "", "@someone:somewhere", map[string]interface{}{"join_rule": "invite"}),
testutils.NewStateEvent(t, "m.room.power_levels", "", "@someone:somewhere", map[string]interface{}{"users_default": 0}),
testutils.NewStateEvent(t, "m.room.history_visibility", "", "@someone:somewhere", map[string]interface{}{"history_visibility": "shared"}),
},
},
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{
testutils.NewStateEvent(t, "m.room.create", "", "@someone:somewhere", map[string]interface{}{"room_version": "10"}),
testutils.NewJoinEvent(t, "@someone:somewhere"),
testutils.NewJoinEvent(t, "@someone2:somewhere"),
},
},
},
},
},
})
v3.mustDoV3Request(t, token, sync3.Request{})
// ensure the room exists
roomsTable := state.NewRoomsTable(db)
err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
nids, err := roomsTable.LatestNIDs(txn, []string{roomID})
if err != nil {
t.Fatalf("LatestNIDs: %s", err)
}
if len(nids) != 1 {
t.Fatalf("LatestNIDs missing: %+v", nids)
}
nid, ok := nids[roomID]
if !ok {
t.Fatalf("LatestNIDs missing room %s : %+v", roomID, nids)
}
if nid == 0 {
t.Fatalf("LatestNIDs 0 nid for room %s", roomID)
}
return nil
})
if err != nil {
t.Fatalf("WithTransaction: %s", err)
}
}

0 comments on commit bb003c1

Please sign in to comment.