Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Make SlavedIdTracker.advance have same interface as MultiWriterIDGene…
Browse files Browse the repository at this point in the history
…rator (#8171)
  • Loading branch information
erikjohnston authored Aug 26, 2020
1 parent 4c6c56d commit e3c91a3
Show file tree
Hide file tree
Showing 12 changed files with 16 additions and 15 deletions.
1 change: 1 addition & 0 deletions changelog.d/8171.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make `SlavedIdTracker.advance` have the same interface as `MultiWriterIDGenerator`.
4 changes: 2 additions & 2 deletions synapse/replication/slave/storage/_slaved_id_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ def __init__(self, db_conn, table, column, extra_tables=[], step=1):
self.step = step
self._current = _load_current_id(db_conn, table, column, step)
for table, column in extra_tables:
self.advance(_load_current_id(db_conn, table, column))
self.advance(None, _load_current_id(db_conn, table, column))

def advance(self, new_id):
def advance(self, instance_name, new_id):
self._current = (max if self.step > 0 else min)(self._current, new_id)

def get_current_token(self):
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/slave/storage/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ def get_max_account_data_stream_id(self):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == TagAccountDataStream.NAME:
self._account_data_id_gen.advance(token)
self._account_data_id_gen.advance(instance_name, token)
for row in rows:
self.get_tags_for_user.invalidate((row.user_id,))
self._account_data_stream_cache.entity_has_changed(row.user_id, token)
elif stream_name == AccountDataStream.NAME:
self._account_data_id_gen.advance(token)
self._account_data_id_gen.advance(instance_name, token)
for row in rows:
if not row.room_id:
self.get_global_account_data_by_type_for_user.invalidate(
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == ToDeviceStream.NAME:
self._device_inbox_id_gen.advance(token)
self._device_inbox_id_gen.advance(instance_name, token)
for row in rows:
if row.entity.startswith("@"):
self._device_inbox_stream_cache.entity_has_changed(
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ def __init__(self, database: DatabasePool, db_conn, hs):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == DeviceListsStream.NAME:
self._device_list_id_gen.advance(token)
self._device_list_id_gen.advance(instance_name, token)
self._invalidate_caches_for_devices(token, rows)
elif stream_name == UserSignatureStream.NAME:
self._device_list_id_gen.advance(token)
self._device_list_id_gen.advance(instance_name, token)
for row in rows:
self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def get_group_stream_token(self):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == GroupServerStream.NAME:
self._group_updates_id_gen.advance(token)
self._group_updates_id_gen.advance(instance_name, token)
for row in rows:
self._group_updates_stream_cache.entity_has_changed(row.user_id, token)

Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def get_current_presence_token(self):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == PresenceStream.NAME:
self._presence_id_gen.advance(token)
self._presence_id_gen.advance(instance_name, token)
for row in rows:
self.presence_stream_cache.entity_has_changed(row.user_id, token)
self._get_presence_for_user.invalidate((row.user_id,))
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def process_replication_rows(self, stream_name, instance_name, token, rows):
assert isinstance(self._push_rules_stream_id_gen, SlavedIdTracker)

if stream_name == PushRulesStream.NAME:
self._push_rules_stream_id_gen.advance(token)
self._push_rules_stream_id_gen.advance(instance_name, token)
for row in rows:
self.get_push_rules_for_user.invalidate((row.user_id,))
self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/pushers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ def get_pushers_stream_token(self):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == PushersStream.NAME:
self._pushers_id_gen.advance(token)
self._pushers_id_gen.advance(instance_name, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == ReceiptsStream.NAME:
self._receipts_id_gen.advance(token)
self._receipts_id_gen.advance(instance_name, token)
for row in rows:
self.invalidate_caches_for_receipt(
row.room_id, row.receipt_type, row.user_id
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ def get_current_public_room_stream_id(self):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == PublicRoomsStream.NAME:
self._public_room_id_gen.advance(token)
self._public_room_id_gen.advance(instance_name, token)

return super().process_replication_rows(stream_name, instance_name, token, rows)
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ def __init__(self, database: DatabasePool, db_conn, hs):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == EventsStream.NAME:
self._stream_id_gen.advance(token)
self._stream_id_gen.advance(instance_name, token)
elif stream_name == BackfillStream.NAME:
self._backfill_id_gen.advance(-token)
self._backfill_id_gen.advance(instance_name, -token)

super().process_replication_rows(stream_name, instance_name, token, rows)

Expand Down

0 comments on commit e3c91a3

Please sign in to comment.