From fbae11fecc222f2b5a8aecfc718699a568117ac1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 16 Sep 2024 16:09:53 -0500 Subject: [PATCH 1/9] Add tracing and test --- .../storage/databases/main/events_worker.py | 29 ++++++++++++++++++- .../databases/main/test_events_worker.py | 26 +++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 029f4bd87d..3407ba39f4 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -61,7 +61,13 @@ current_context, make_deferred_yieldable, ) -from synapse.logging.opentracing import start_active_span, tag_args, trace +from synapse.logging.opentracing import ( + SynapseTags, + set_tag, + start_active_span, + tag_args, + trace, +) from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -603,6 +609,10 @@ async def get_events_as_list( Note that the returned list may be smaller than the list of event IDs if not all events could be fetched. """ + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", + str(len(event_ids)), + ) if not event_ids: return [] @@ -723,6 +733,7 @@ async def get_events_as_list( return events + @trace @cancellable async def get_unredacted_events_from_cache_or_db( self, @@ -748,6 +759,11 @@ async def get_unredacted_events_from_cache_or_db( Returns: map from event id to result """ + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", + str(len(event_ids)), + ) + # Shortcut: check if we have any events in the *in memory* cache - this function # may be called repeatedly for the same event so at this point we cannot reach # out to any external cache for performance reasons. The external cache is @@ -946,6 +962,7 @@ async def _get_events_from_cache( return event_map + @trace async def _get_events_from_external_cache( self, events: Iterable[str], update_metrics: bool = True ) -> Dict[str, EventCacheEntry]: @@ -957,6 +974,10 @@ async def _get_events_from_external_cache( events: list of event_ids to fetch update_metrics: Whether to update the cache hit ratio metrics """ + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "events.length", + str(len(events)), + ) event_map = {} for event_id in events: @@ -1222,6 +1243,7 @@ def fire_errback(exc: Exception) -> None: with PreserveLoggingContext(): self.hs.get_reactor().callFromThread(fire_errback, e) + @trace async def _get_events_from_db( self, event_ids: Collection[str] ) -> Dict[str, EventCacheEntry]: @@ -1240,6 +1262,11 @@ async def _get_events_from_db( map from event id to result. May return extra events which weren't asked for. """ + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", + str(len(event_ids)), + ) + fetched_event_ids: Set[str] = set() fetched_events: Dict[str, _EventRow] = {} diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index fd1f5e7fd5..8c62367ca2 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -295,6 +295,32 @@ def test_dedupe(self) -> None: self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1) +class GetEventsTestCase(unittest.HomeserverTestCase): + """Test `get_events(...)`/`get_events_as_list(...)`""" + + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store: EventsWorkerStore = hs.get_datastores().main + + def test_get_lots_of_messages(self) -> None: + user_id = self.register_user("user", "pass") + user_tok = self.login(user_id, "pass") + + room_id = self.helper.create_room_as(user_id, tok=user_tok) + + event_ids = [] + for _ in range(1000): + res = self.helper.send(room_id, tok=user_tok) + event_ids.append(res["event_id"]) + + self.get_success(self.store.get_events(event_ids)) + + class DatabaseOutageTestCase(unittest.HomeserverTestCase): """Test event fetching during a database outage.""" From fed75020f4fb07fcf9ee488ce657af8ef7ad26f5 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 16 Sep 2024 18:11:47 -0500 Subject: [PATCH 2/9] More tracing and test --- .../storage/databases/main/events_worker.py | 12 +++- .../databases/main/test_events_worker.py | 63 ++++++++++++++++--- tests/unittest.py | 5 ++ 3 files changed, 70 insertions(+), 10 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 3407ba39f4..049d8d7efa 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -531,6 +531,7 @@ async def get_event( return event + @trace async def get_events( self, event_ids: Collection[str], @@ -562,6 +563,11 @@ async def get_events( Returns: A mapping from event_id to event. """ + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", + str(len(event_ids)), + ) + events = await self.get_events_as_list( event_ids, redact_behaviour=redact_behaviour, @@ -737,7 +743,7 @@ async def get_events_as_list( @cancellable async def get_unredacted_events_from_cache_or_db( self, - event_ids: Iterable[str], + event_ids: Collection[str], allow_rejected: bool = False, ) -> Dict[str, EventCacheEntry]: """Fetch a bunch of events from the cache or the database. @@ -952,7 +958,7 @@ async def _get_events_from_cache( events, update_metrics=update_metrics ) - missing_event_ids = (e for e in events if e not in event_map) + missing_event_ids = [e for e in events if e not in event_map] event_map.update( await self._get_events_from_external_cache( events=missing_event_ids, @@ -964,7 +970,7 @@ async def _get_events_from_cache( @trace async def _get_events_from_external_cache( - self, events: Iterable[str], update_metrics: bool = True + self, events: Collection[str], update_metrics: bool = True ) -> Dict[str, EventCacheEntry]: """Fetch events from any configured external cache. diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 8c62367ca2..f161d5641f 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -19,8 +19,9 @@ # # import json +import time from contextlib import contextmanager -from typing import Generator, List, Tuple +from typing import Dict, Generator, List, Set, Tuple from unittest import mock from twisted.enterprise.adbapi import ConnectionPool @@ -28,8 +29,12 @@ from twisted.test.proto_helpers import MemoryReactor from synapse.api.room_versions import EventFormatVersions, RoomVersions -from synapse.events import make_event_from_dict +from synapse.events import EventBase, make_event_from_dict from synapse.logging.context import LoggingContext +from synapse.logging.opentracing import ( + set_tag, + start_active_span, +) from synapse.rest import admin from synapse.rest.client import login, room from synapse.server import HomeServer @@ -38,6 +43,7 @@ EventsWorkerStore, ) from synapse.storage.types import Connection +from synapse.types import JsonDict from synapse.util import Clock from synapse.util.async_helpers import yieldable_gather_results @@ -304,21 +310,64 @@ class GetEventsTestCase(unittest.HomeserverTestCase): login.register_servlets, ] + def default_config(self) -> JsonDict: + config = super().default_config() + config["opentracing"] = { + "enabled": True, + "jaeger_config": { + "sampler": {"type": "const", "param": 1}, + "logging": False, + }, + } + return config + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store: EventsWorkerStore = hs.get_datastores().main def test_get_lots_of_messages(self) -> None: + num_events = 10000 + user_id = self.register_user("user", "pass") user_tok = self.login(user_id, "pass") room_id = self.helper.create_room_as(user_id, tok=user_tok) - event_ids = [] - for _ in range(1000): - res = self.helper.send(room_id, tok=user_tok) - event_ids.append(res["event_id"]) + setup_start_ts = time.time() + event_ids: Set[str] = set() + for i in range(num_events): + event = self.get_success( + inject_event( + self.hs, + room_id=room_id, + type="m.room.message", + sender=user_id, + content={ + "body": f"foo{i}", + "msgtype": "m.text", + }, + ) + ) + event_ids.add(event.event_id) + + setup_end_ts = time.time() + # Sanity check that we actually created the events + self.assertEqual(len(event_ids), num_events) + + fetched_event_map: Dict[str, EventBase] = {} + with LoggingContext("test") as _ctx: + with start_active_span("test_get_lots_of_messages"): + set_tag("num_events", num_events) + set_tag("setup_time", setup_end_ts - setup_start_ts) + + # This is the function under test + fetched_event_map = self.get_success(self.store.get_events(event_ids)) + + # Sanity check that we go the events back + self.assertIncludes(fetched_event_map.keys(), event_ids, exact=True) - self.get_success(self.store.get_events(event_ids)) + # Sleep so the traces get flushed + # TODO: Remove + time.sleep(6) class DatabaseOutageTestCase(unittest.HomeserverTestCase): diff --git a/tests/unittest.py b/tests/unittest.py index 614e805abd..1485cea424 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -395,6 +395,11 @@ def setUp(self) -> None: self._hs_args = {"clock": self.clock, "reactor": self.reactor} self.hs = self.make_homeserver(self.reactor, self.clock) + from synapse.logging.opentracing import init_tracer + + # Start the tracer + init_tracer(self.hs) # noqa + self.hs.get_datastores().main.tests_allow_no_chain_cover_index = False # Honour the `use_frozen_dicts` config option. We have to do this From d3e67f1304ac7cd1a833e01793a42b2060e30c97 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 16 Sep 2024 18:19:47 -0500 Subject: [PATCH 3/9] Reduce test event count --- tests/storage/databases/main/test_events_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index f161d5641f..d3624bde32 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -325,7 +325,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store: EventsWorkerStore = hs.get_datastores().main def test_get_lots_of_messages(self) -> None: - num_events = 10000 + num_events = 100 user_id = self.register_user("user", "pass") user_tok = self.login(user_id, "pass") @@ -357,7 +357,7 @@ def test_get_lots_of_messages(self) -> None: with LoggingContext("test") as _ctx: with start_active_span("test_get_lots_of_messages"): set_tag("num_events", num_events) - set_tag("setup_time", setup_end_ts - setup_start_ts) + set_tag("setup_time_seconds", setup_end_ts - setup_start_ts) # This is the function under test fetched_event_map = self.get_success(self.store.get_events(event_ids)) From c84ee8f71dad4ba95899046a1c1b782be4c30322 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 16 Sep 2024 18:42:10 -0500 Subject: [PATCH 4/9] No need to make an intermediate map if we are just iterating over it anyway --- synapse/handlers/sliding_sync/__init__.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index c3b5bbbf6f..63f48f3561 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -443,13 +443,11 @@ async def get_current_state_at( to_token=to_token, ) - event_map = await self.store.get_events(list(state_ids.values())) + events = await self.store.get_events_as_list(list(state_ids.values())) state_map = {} - for key, event_id in state_ids.items(): - event = event_map.get(event_id) - if event: - state_map[key] = event + for event in events: + state_map[(event.type, event.state_key)] = event return state_map From 4b6254651cb0812dd2859ae39bbbe5041eebdbda Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 16 Sep 2024 19:00:20 -0500 Subject: [PATCH 5/9] Remove tracing specifics --- tests/storage/databases/main/test_events_worker.py | 4 ---- tests/unittest.py | 5 ----- 2 files changed, 9 deletions(-) diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index d3624bde32..2533d31244 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -365,10 +365,6 @@ def test_get_lots_of_messages(self) -> None: # Sanity check that we go the events back self.assertIncludes(fetched_event_map.keys(), event_ids, exact=True) - # Sleep so the traces get flushed - # TODO: Remove - time.sleep(6) - class DatabaseOutageTestCase(unittest.HomeserverTestCase): """Test event fetching during a database outage.""" diff --git a/tests/unittest.py b/tests/unittest.py index 1485cea424..614e805abd 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -395,11 +395,6 @@ def setUp(self) -> None: self._hs_args = {"clock": self.clock, "reactor": self.reactor} self.hs = self.make_homeserver(self.reactor, self.clock) - from synapse.logging.opentracing import init_tracer - - # Start the tracer - init_tracer(self.hs) # noqa - self.hs.get_datastores().main.tests_allow_no_chain_cover_index = False # Honour the `use_frozen_dicts` config option. We have to do this From be025a41e3c3b143bdc430a6e9c56f019a24b629 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 16 Sep 2024 19:00:29 -0500 Subject: [PATCH 6/9] Fix typo --- tests/storage/databases/main/test_events_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 2533d31244..e8af267694 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -362,7 +362,7 @@ def test_get_lots_of_messages(self) -> None: # This is the function under test fetched_event_map = self.get_success(self.store.get_events(event_ids)) - # Sanity check that we go the events back + # Sanity check that we got the events back self.assertIncludes(fetched_event_map.keys(), event_ids, exact=True) From 55c5811472adbab11c148ac433a42fa5ab016e40 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 16 Sep 2024 19:01:18 -0500 Subject: [PATCH 7/9] Add changelog --- changelog.d/17718.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17718.misc diff --git a/changelog.d/17718.misc b/changelog.d/17718.misc new file mode 100644 index 0000000000..ea73a03f53 --- /dev/null +++ b/changelog.d/17718.misc @@ -0,0 +1 @@ +Slight optimization when fetching state/events for Sliding Sync. From 58550d0b4b374f6dcb3732db9f6df159002245f0 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 16 Sep 2024 19:04:26 -0500 Subject: [PATCH 8/9] Remove tracing generic things that could stick around --- .../databases/main/test_events_worker.py | 33 +++---------------- 1 file changed, 4 insertions(+), 29 deletions(-) diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index e8af267694..629ded7b8f 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -19,9 +19,8 @@ # # import json -import time from contextlib import contextmanager -from typing import Dict, Generator, List, Set, Tuple +from typing import Generator, List, Set, Tuple from unittest import mock from twisted.enterprise.adbapi import ConnectionPool @@ -29,12 +28,8 @@ from twisted.test.proto_helpers import MemoryReactor from synapse.api.room_versions import EventFormatVersions, RoomVersions -from synapse.events import EventBase, make_event_from_dict +from synapse.events import make_event_from_dict from synapse.logging.context import LoggingContext -from synapse.logging.opentracing import ( - set_tag, - start_active_span, -) from synapse.rest import admin from synapse.rest.client import login, room from synapse.server import HomeServer @@ -43,7 +38,6 @@ EventsWorkerStore, ) from synapse.storage.types import Connection -from synapse.types import JsonDict from synapse.util import Clock from synapse.util.async_helpers import yieldable_gather_results @@ -310,17 +304,6 @@ class GetEventsTestCase(unittest.HomeserverTestCase): login.register_servlets, ] - def default_config(self) -> JsonDict: - config = super().default_config() - config["opentracing"] = { - "enabled": True, - "jaeger_config": { - "sampler": {"type": "const", "param": 1}, - "logging": False, - }, - } - return config - def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store: EventsWorkerStore = hs.get_datastores().main @@ -332,7 +315,6 @@ def test_get_lots_of_messages(self) -> None: room_id = self.helper.create_room_as(user_id, tok=user_tok) - setup_start_ts = time.time() event_ids: Set[str] = set() for i in range(num_events): event = self.get_success( @@ -349,18 +331,11 @@ def test_get_lots_of_messages(self) -> None: ) event_ids.add(event.event_id) - setup_end_ts = time.time() # Sanity check that we actually created the events self.assertEqual(len(event_ids), num_events) - fetched_event_map: Dict[str, EventBase] = {} - with LoggingContext("test") as _ctx: - with start_active_span("test_get_lots_of_messages"): - set_tag("num_events", num_events) - set_tag("setup_time_seconds", setup_end_ts - setup_start_ts) - - # This is the function under test - fetched_event_map = self.get_success(self.store.get_events(event_ids)) + # This is the function under test + fetched_event_map = self.get_success(self.store.get_events(event_ids)) # Sanity check that we got the events back self.assertIncludes(fetched_event_map.keys(), event_ids, exact=True) From ff6e51f3b3f0136f8c4fe060cb4bedf82e4f7b82 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 16 Sep 2024 19:05:22 -0500 Subject: [PATCH 9/9] Add test description --- tests/storage/databases/main/test_events_worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 629ded7b8f..104d141a72 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -308,6 +308,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store: EventsWorkerStore = hs.get_datastores().main def test_get_lots_of_messages(self) -> None: + """Sanity check that `get_events(...)`/`get_events_as_list(...)` works""" num_events = 100 user_id = self.register_user("user", "pass")