From 6d49b6cabe331c4224dd7c5bf1456ff05eafc109 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 25 Jul 2023 14:18:58 +0100 Subject: [PATCH 1/4] Add malformed/unusual events test --- tests-integration/regressions_test.go | 52 +++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/tests-integration/regressions_test.go b/tests-integration/regressions_test.go index d24869ce..7ec68b72 100644 --- a/tests-integration/regressions_test.go +++ b/tests-integration/regressions_test.go @@ -110,3 +110,55 @@ func TestBackfillInviteDoesntCorruptState(t *testing.T) { }, )) } + +func TestMalformedEvents(t *testing.T) { + pqString := testutils.PrepareDBConnectionString() + // setup code + v2 := runTestV2Server(t) + v3 := runTestServer(t, v2, pqString) + defer v2.close() + defer v3.close() + + // unusual events ARE VALID EVENTS and should be sent to the client, but are unusual for some reason. + unusualEvents := []json.RawMessage{ + testutils.NewStateEvent(t, "", "", alice, map[string]interface{}{ + "empty string": "for event type", + }), + } + // malformed events are INVALID and should be ignored by the proxy. + malformedEvents := []json.RawMessage{ + testutils.NewStateEvent(t, "", "", alice, []string{"content", "as", "an", "array"}), + } + + room := roomEvents{ + roomID: "!TestMalformedEvents:localhost", + events: append(malformedEvents, unusualEvents...), + state: createRoomState(t, alice, time.Now()), + } + v2.addAccount(t, alice, aliceToken) + v2.queueResponse(alice, sync2.SyncResponse{ + Rooms: sync2.SyncRoomsResponse{ + Join: v2JoinTimeline(room), + }, + }) + + // alice syncs and should see the room. + aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + Ranges: sync3.SliceRanges{{0, 20}}, + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: int64(len(unusualEvents)), + }, + }, + }, + }) + m.MatchResponse(t, aliceRes, m.MatchList("a", m.MatchV3Count(1), m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{room.roomID}))), + m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{ + room.roomID: { + m.MatchJoinCount(1), + m.MatchRoomTimeline(unusualEvents), + }, + })) + +} From d745c90d95cf9e79f2c02f860b13a6d2d58eccba Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 25 Jul 2023 14:25:30 +0100 Subject: [PATCH 2/4] Ignore malformed events But handle unusual events. With regression test. Fixes https://github.com/matrix-org/sliding-sync/issues/223 --- state/accumulator.go | 5 ++++- state/event_table.go | 2 +- tests-integration/regressions_test.go | 9 +++++++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/state/accumulator.go b/state/accumulator.go index 7e29badb..89f595c2 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -415,7 +415,10 @@ func (a *Accumulator) filterAndParseTimelineEvents(txn *sqlx.Tx, roomID string, RoomID: roomID, } if err := e.ensureFieldsSetOnEvent(); err != nil { - return nil, fmt.Errorf("event malformed: %s", err) + logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Err(err).Msg( + "Accumulator.filterAndParseTimelineEvents: failed to parse event, ignoring", + ) + continue } if _, ok := seenEvents[e.ID]; ok { logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg( diff --git a/state/event_table.go b/state/event_table.go index 46f43df4..d45bd3bc 100644 --- a/state/event_table.go +++ b/state/event_table.go @@ -57,7 +57,7 @@ func (ev *Event) ensureFieldsSetOnEvent() error { } if ev.Type == "" { typeResult := evJSON.Get("type") - if !typeResult.Exists() || typeResult.Str == "" { + if !typeResult.Exists() { // empty strings for 'type' are valid apparently return fmt.Errorf("event JSON missing type key") } ev.Type = typeResult.Str diff --git a/tests-integration/regressions_test.go b/tests-integration/regressions_test.go index 7ec68b72..f0afba5f 100644 --- a/tests-integration/regressions_test.go +++ b/tests-integration/regressions_test.go @@ -127,12 +127,17 @@ func TestMalformedEvents(t *testing.T) { } // malformed events are INVALID and should be ignored by the proxy. malformedEvents := []json.RawMessage{ - testutils.NewStateEvent(t, "", "", alice, []string{"content", "as", "an", "array"}), + json.RawMessage(`{}`), // empty object + json.RawMessage(`{"type":5}`), // type is an integer + json.RawMessage(`{"type":"foo","content":{},"event_id":""}`), // 0-length string as event ID + json.RawMessage(`{"type":"foo","content":{}}`), // missing event ID } room := roomEvents{ roomID: "!TestMalformedEvents:localhost", - events: append(malformedEvents, unusualEvents...), + // append malformed after unusual. All malformed events should be dropped, + // leaving only unusualEvents. + events: append(unusualEvents, malformedEvents...), state: createRoomState(t, alice, time.Now()), } v2.addAccount(t, alice, aliceToken) From 0fea507b65fec9c5af9f42f92344a32ef22d1af0 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 25 Jul 2023 14:41:23 +0100 Subject: [PATCH 3/4] Add malformed tests for room state --- state/accumulator.go | 5 ++- state/event_table.go | 16 ++++--- tests-integration/regressions_test.go | 65 ++++++++++++++++++++++++++- 3 files changed, 76 insertions(+), 10 deletions(-) diff --git a/state/accumulator.go b/state/accumulator.go index 89f595c2..99471484 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -207,8 +207,9 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia IsState: true, } } - if err := ensureFieldsSet(events); err != nil { - return fmt.Errorf("events malformed: %s", err) + events = filterAndEnsureFieldsSet(events) + if len(events) == 0 { + return fmt.Errorf("failed to insert events, all events were filtered out: %w", err) } eventIDToNID, err := a.eventsTable.Insert(txn, events, false) if err != nil { diff --git a/state/event_table.go b/state/event_table.go index d45bd3bc..1c360b1b 100644 --- a/state/event_table.go +++ b/state/event_table.go @@ -57,7 +57,7 @@ func (ev *Event) ensureFieldsSetOnEvent() error { } if ev.Type == "" { typeResult := evJSON.Get("type") - if !typeResult.Exists() { // empty strings for 'type' are valid apparently + if !typeResult.Exists() || typeResult.Type != gjson.String { // empty strings for 'type' are valid apparently return fmt.Errorf("event JSON missing type key") } ev.Type = typeResult.Str @@ -153,7 +153,7 @@ func (t *EventTable) SelectHighestNID() (highest int64, err error) { // we insert new events A and B in that order, then NID(A) < NID(B). func (t *EventTable) Insert(txn *sqlx.Tx, events []Event, checkFields bool) (map[string]int64, error) { if checkFields { - ensureFieldsSet(events) + events = filterAndEnsureFieldsSet(events) } result := make(map[string]int64) for i := range events { @@ -449,14 +449,18 @@ func (c EventChunker) Subslice(i, j int) sqlutil.Chunker { return c[i:j] } -func ensureFieldsSet(events []Event) error { +func filterAndEnsureFieldsSet(events []Event) []Event { + result := make([]Event, 0, len(events)) // ensure fields are set for i := range events { ev := events[i] if err := ev.ensureFieldsSetOnEvent(); err != nil { - return err + logger.Warn().Str("event_id", ev.ID).Err(err).Msg( + "filterAndEnsureFieldsSet: failed to parse event, ignoring", + ) + continue } - events[i] = ev + result = append(result, ev) } - return nil + return result } diff --git a/tests-integration/regressions_test.go b/tests-integration/regressions_test.go index f0afba5f..fc6e1984 100644 --- a/tests-integration/regressions_test.go +++ b/tests-integration/regressions_test.go @@ -111,7 +111,7 @@ func TestBackfillInviteDoesntCorruptState(t *testing.T) { )) } -func TestMalformedEvents(t *testing.T) { +func TestMalformedEventsTimeline(t *testing.T) { pqString := testutils.PrepareDBConnectionString() // setup code v2 := runTestV2Server(t) @@ -134,7 +134,7 @@ func TestMalformedEvents(t *testing.T) { } room := roomEvents{ - roomID: "!TestMalformedEvents:localhost", + roomID: "!TestMalformedEventsTimeline:localhost", // append malformed after unusual. All malformed events should be dropped, // leaving only unusualEvents. events: append(unusualEvents, malformedEvents...), @@ -165,5 +165,66 @@ func TestMalformedEvents(t *testing.T) { m.MatchRoomTimeline(unusualEvents), }, })) +} + +func TestMalformedEventsState(t *testing.T) { + pqString := testutils.PrepareDBConnectionString() + // setup code + v2 := runTestV2Server(t) + v3 := runTestServer(t, v2, pqString) + defer v2.close() + defer v3.close() + + // unusual events ARE VALID EVENTS and should be sent to the client, but are unusual for some reason. + unusualEvents := []json.RawMessage{ + testutils.NewStateEvent(t, "", "", alice, map[string]interface{}{ + "empty string": "for event type", + }), + } + // malformed events are INVALID and should be ignored by the proxy. + malformedEvents := []json.RawMessage{ + json.RawMessage(`{}`), // empty object + json.RawMessage(`{"type":5,"content":{},"event_id":"f","state_key":""}`), // type is an integer + json.RawMessage(`{"type":"foo","content":{},"event_id":"","state_key":""}`), // 0-length string as event ID + json.RawMessage(`{"type":"foo","content":{},"state_key":""}`), // missing event ID + } + latestEvent := testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "hi"}) + + room := roomEvents{ + roomID: "!TestMalformedEventsState:localhost", + events: []json.RawMessage{latestEvent}, + // append malformed after unusual. All malformed events should be dropped, + // leaving only unusualEvents. + state: append(createRoomState(t, alice, time.Now()), append(unusualEvents, malformedEvents...)...), + } + v2.addAccount(t, alice, aliceToken) + v2.queueResponse(alice, sync2.SyncResponse{ + Rooms: sync2.SyncRoomsResponse{ + Join: v2JoinTimeline(room), + }, + }) + + // alice syncs and should see the room. + aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + Ranges: sync3.SliceRanges{{0, 20}}, + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: int64(len(unusualEvents)), + RequiredState: [][2]string{{"", ""}}, + }, + }, + }, + }) + m.MatchResponse(t, aliceRes, m.MatchList("a", m.MatchV3Count(1), m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{room.roomID}))), + m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{ + room.roomID: { + m.MatchJoinCount(1), + m.MatchRoomTimeline([]json.RawMessage{latestEvent}), + m.MatchRoomRequiredState([]json.RawMessage{ + unusualEvents[0], + }), + }, + })) } From 86d156c5113a39c500622d86612bb7f4dddbdadb Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 26 Jul 2023 17:12:42 +0100 Subject: [PATCH 4/4] Actually use the modified Event in result --- state/event_table.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/state/event_table.go b/state/event_table.go index 1c360b1b..c01b454e 100644 --- a/state/event_table.go +++ b/state/event_table.go @@ -453,14 +453,14 @@ func filterAndEnsureFieldsSet(events []Event) []Event { result := make([]Event, 0, len(events)) // ensure fields are set for i := range events { - ev := events[i] + ev := &events[i] if err := ev.ensureFieldsSetOnEvent(); err != nil { logger.Warn().Str("event_id", ev.ID).Err(err).Msg( "filterAndEnsureFieldsSet: failed to parse event, ignoring", ) continue } - result = append(result, ev) + result = append(result, *ev) } return result }