Skip to content

Commit

Permalink
Fix the issue of ignoring callback calls for removed keys. (#811)
Browse files Browse the repository at this point in the history
**What I did**
Fix the issue of ignoring callback calls for removed keys.

**Why I did it**
ConfigDBConnector.listen method has a caching mechanism (added in #587 PR) that preloads the DB state before starting. When the notification about the changed key is received the listen method gets key data from the DB (in all cases when the key was added, updated, or removed) and compares the data with the cache. It fires the callback only if data differ from the cache. Otherwise, the callback is ignored. 

If the `client.hgetall(key)` is called for the removed key it returns an empty dictionary (`{}`). This can be confused with the data of the key with no attributes. For example: `{"TABLE": {"KEY": {}}}`. 

So if preloaded data contains a key with no attributes and the listener method receives a notification about the removal of such key the callback is not called. The listener will simply remove the key from the cache without calling the callback. This leads to the situation when the removal of the key is not handled.

The solution is to get data for the added or updated keys, and for the removed keys use `None` instead. This will ensure that the comparison with the cache will work as expected.

**How I verified it**
Compile the package and run the unit test. Unit tests are extended to cover the expected behavior.
  • Loading branch information
oleksandrivantsiv authored Aug 29, 2023
1 parent 2365fe8 commit dc14d36
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 21 deletions.
7 changes: 5 additions & 2 deletions common/configdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
try:
(table, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
if table in self.handlers:
client = self.get_redis_client(self.db_name)
data = self.raw_to_typed(client.hgetall(key))
if item['data'] == 'del':
data = None
else:
client = self.get_redis_client(self.db_name)
data = self.raw_to_typed(client.hgetall(key))
if table in init_data and row in init_data[table]:
cache_hit = init_data[table][row] == data
del init_data[table][row]
Expand Down
90 changes: 71 additions & 19 deletions tests/test_redis_ut.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,46 +634,98 @@ def thread_coming_entry():
def test_ConfigDBInit():
table_name_1 = 'TEST_TABLE_1'
table_name_2 = 'TEST_TABLE_2'
table_name_3 = 'TEST_TABLE_3'
test_key = 'key1'
test_data = {'field1': 'value1'}
test_data_update = {'field1': 'value2'}
test_data = {'field1': 'value1', 'field2': 'value2'}

queue = multiprocessing.Queue()

manager = multiprocessing.Manager()
ret_data = manager.dict()

def test_handler(table, key, data, ret):
ret[table] = {key: data}

def test_init_handler(data, ret):
def test_handler(table, key, data, ret, q=None):
if table not in ret:
ret[table] = {}
if data is None:
ret[table] = {k: v for k, v in ret[table].items() if k != key}
if q:
q.put(ret[table])
elif key not in ret[table] or ret[table][key] != data:
ret[table] = {key: data}
if q:
q.put(ret[table])

def test_init_handler(data, ret, queue):
ret.update(data)
queue.put(ret)

def thread_listen(ret):
def thread_listen(ret, queue):
config_db = ConfigDBConnector()
config_db.connect(wait_for_init=False)

config_db.subscribe(table_name_1, lambda table, key, data: test_handler(table, key, data, ret),
config_db.subscribe(table_name_1, lambda table, key, data: test_handler(table, key, data, ret, queue),
fire_init_data=False)
config_db.subscribe(table_name_2, lambda table, key, data: test_handler(table, key, data, ret),
config_db.subscribe(table_name_2, lambda table, key, data: test_handler(table, key, data, ret, queue),
fire_init_data=True)
config_db.subscribe(table_name_3, lambda table, key, data: test_handler(table, key, data, ret, queue),
fire_init_data=False)

config_db.listen(init_data_handler=lambda data: test_init_handler(data, ret))
config_db.listen(init_data_handler=lambda data: test_init_handler(data, ret, queue))

config_db = ConfigDBConnector()
config_db.connect(wait_for_init=False)
client = config_db.get_redis_client(config_db.CONFIG_DB)
client.flushdb()

# Init table data
config_db.set_entry(table_name_1, test_key, test_data)
config_db.set_entry(table_name_2, test_key, test_data)
# Prepare unique data per each table to track if correct data are received in the update
table_1_data = {f'{table_name_1}_{k}': v for k, v in test_data.items()}
config_db.set_entry(table_name_1, test_key, table_1_data)
table_2_data = {f'{table_name_2}_{k}': v for k, v in test_data.items()}
config_db.set_entry(table_name_2, test_key, table_2_data)
config_db.set_entry(table_name_3, test_key, {})

thread = multiprocessing.Process(target=thread_listen, args=(ret_data,))
thread.start()
time.sleep(5)
thread.terminate()
# Run the listener in a separate process. It is not possible to stop a listener when it is running as a thread.
# When it runs in a separate process we can terminate it with a signal.
listener = multiprocessing.Process(target=thread_listen, args=(ret_data, queue))
listener.start()

assert ret_data[table_name_1] == {test_key: test_data}
assert ret_data[table_name_2] == {test_key: test_data}
try:
# During the subscription to table 2 'fire_init_data=True' is passed. The callback should be called before the listener.
# Verify that callback is fired before listener initialization.
data = queue.get(timeout=5)
assert data == {test_key: table_2_data}

# Wait for init data
init_data = queue.get(timeout=5)

# Verify that all tables initialized correctly
assert init_data[table_name_1] == {test_key: table_1_data}
assert init_data[table_name_2] == {test_key: table_2_data}
assert init_data[table_name_3] == {test_key: {}}

# Remove one key-value pair from the data. Verify that the entry is updated correctly
table_1_data.popitem()
config_db.set_entry(table_name_1, test_key, table_1_data)
data = queue.get(timeout=5)
assert data == {test_key: table_1_data}

# Remove all key-value pairs. Verify that the table still contains key
config_db.set_entry(table_name_1, test_key, {})
data = queue.get(timeout=5)
assert data == {test_key: {}}

# Remove the key
config_db.set_entry(table_name_1, test_key, None)
data = queue.get(timeout=5)
assert test_key not in data

# Remove the entry (with no attributes) from the table.
# Verify that the update is received and a callback is called
config_db.set_entry(table_name_3, test_key, None)
table_3_data = queue.get(timeout=5)
assert test_key not in table_3_data
finally:
listener.terminate()


def test_DBConnectFailure():
Expand Down

0 comments on commit dc14d36

Please sign in to comment.