Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use cache store remove base slaved #13329

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13329.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove old base slaved store and de-duplicate cache ID generators. Contributed by Nick @ Beeper (@fizzadar).
2 changes: 0 additions & 2 deletions synapse/app/admin_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from synapse.config.logger import setup_logging
from synapse.events import EventBase
from synapse.handlers.admin import ExfiltrationWriter
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
Expand Down Expand Up @@ -58,7 +57,6 @@ class AdminCmdSlavedStore(
SlavedDeviceStore,
SlavedPushRuleStore,
SlavedEventStore,
BaseSlavedStore,
RoomWorkerStore,
):
def __init__(
Expand Down
2 changes: 0 additions & 2 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
Expand Down Expand Up @@ -251,7 +250,6 @@ class GenericWorkerSlavedStore(
TransactionWorkerStore,
LockStore,
SessionStore,
BaseSlavedStore,
):
# Properties that multiple storage classes define. Tell mypy what the
# expected type is.
Expand Down
58 changes: 0 additions & 58 deletions synapse/replication/slave/storage/_base.py

This file was deleted.

3 changes: 1 addition & 2 deletions synapse/replication/slave/storage/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.storage.databases.main.account_data import AccountDataWorkerStore
from synapse.storage.databases.main.tags import TagsWorkerStore


class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore):
pass
3 changes: 1 addition & 2 deletions synapse/replication/slave/storage/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore


class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
class SlavedDeviceInboxStore(DeviceInboxWorkerStore):
pass
3 changes: 1 addition & 2 deletions synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

from typing import TYPE_CHECKING, Any, Iterable

from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
Expand All @@ -24,7 +23,7 @@
from synapse.server import HomeServer


class SlavedDeviceStore(DeviceWorkerStore, BaseSlavedStore):
class SlavedDeviceStore(DeviceWorkerStore):
def __init__(
self,
database: DatabasePool,
Expand Down
4 changes: 1 addition & 3 deletions synapse/replication/slave/storage/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

from synapse.storage.databases.main.directory import DirectoryWorkerStore

from ._base import BaseSlavedStore


class DirectoryStore(DirectoryWorkerStore, BaseSlavedStore):
class DirectoryStore(DirectoryWorkerStore):
pass
3 changes: 0 additions & 3 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
from synapse.util.caches.stream_change_cache import StreamChangeCache

from ._base import BaseSlavedStore

if TYPE_CHECKING:
from synapse.server import HomeServer

Expand All @@ -56,7 +54,6 @@ class SlavedEventStore(
EventsWorkerStore,
UserErasureWorkerStore,
RelationsWorkerStore,
BaseSlavedStore,
):
def __init__(
self,
Expand Down
5 changes: 2 additions & 3 deletions synapse/replication/slave/storage/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@

from typing import TYPE_CHECKING

from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.filtering import FilteringStore

from ._base import BaseSlavedStore

if TYPE_CHECKING:
from synapse.server import HomeServer


class SlavedFilteringStore(BaseSlavedStore):
class SlavedFilteringStore(SQLBaseStore):
def __init__(
self,
database: DatabasePool,
Expand Down
3 changes: 1 addition & 2 deletions synapse/replication/slave/storage/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.storage.databases.main.profile import ProfileWorkerStore


class SlavedProfileStore(ProfileWorkerStore, BaseSlavedStore):
class SlavedProfileStore(ProfileWorkerStore):
pass
3 changes: 1 addition & 2 deletions synapse/replication/slave/storage/pushers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.pusher import PusherWorkerStore

from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker

if TYPE_CHECKING:
from synapse.server import HomeServer


class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
class SlavedPusherStore(PusherWorkerStore):
def __init__(
self,
database: DatabasePool,
Expand Down
4 changes: 1 addition & 3 deletions synapse/replication/slave/storage/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

from synapse.storage.databases.main.receipts import ReceiptsWorkerStore

from ._base import BaseSlavedStore


class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
class SlavedReceiptsStore(ReceiptsWorkerStore):
pass
4 changes: 1 addition & 3 deletions synapse/replication/slave/storage/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

from synapse.storage.databases.main.registration import RegistrationWorkerStore

from ._base import BaseSlavedStore


class SlavedRegistrationStore(RegistrationWorkerStore, BaseSlavedStore):
class SlavedRegistrationStore(RegistrationWorkerStore):
pass
29 changes: 2 additions & 27 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
LoggingTransaction,
)
from synapse.storage.databases.main.stats import UserSortOrder
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import JsonDict, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache

Expand Down Expand Up @@ -149,31 +149,6 @@ def __init__(
],
)

self._cache_id_gen: Optional[MultiWriterIdGenerator]
if isinstance(self.database_engine, PostgresEngine):
# We set the `writers` to an empty list here as we don't care about
# missing updates over restarts, as we'll not have anything in our
# caches to invalidate. (This reduces the amount of writes to the DB
# that happen).
self._cache_id_gen = MultiWriterIdGenerator(
db_conn,
database,
stream_name="caches",
instance_name=hs.get_instance_name(),
tables=[
(
"cache_invalidation_stream_by_instance",
"instance_name",
"stream_id",
)
],
sequence_name="cache_invalidation_stream_seq",
writers=[],
)

else:
self._cache_id_gen = None

super().__init__(database, db_conn, hs)

events_max = self._stream_id_gen.get_current_token()
Expand Down
26 changes: 26 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
LoggingTransaction,
)
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.util.caches.descriptors import _CachedFunction
from synapse.util.iterutils import batch_iter

Expand Down Expand Up @@ -65,6 +66,31 @@ def __init__(
psql_only=True, # The table is only on postgres DBs.
)

self._cache_id_gen: Optional[MultiWriterIdGenerator]
if isinstance(self.database_engine, PostgresEngine):
# We set the `writers` to an empty list here as we don't care about
# missing updates over restarts, as we'll not have anything in our
# caches to invalidate. (This reduces the amount of writes to the DB
# that happen).
self._cache_id_gen = MultiWriterIdGenerator(
db_conn,
database,
stream_name="caches",
instance_name=hs.get_instance_name(),
tables=[
(
"cache_invalidation_stream_by_instance",
"instance_name",
"stream_id",
)
],
sequence_name="cache_invalidation_stream_seq",
writers=[],
)

else:
self._cache_id_gen = None

async def get_all_updated_caches(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
Expand Down