Skip to content

Commit

Permalink
Merge pull request sonic-net#83 from tahmed-dev/taahme/add-redis-pipe…
Browse files Browse the repository at this point in the history
…line-operation

[configdb] Add Ability to Query/Update Redis Using Pipelines
  • Loading branch information
tahmed-dev authored Aug 19, 2020
2 parents f574b95 + 198d143 commit c25d492
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/swsssdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

try:
from .dbconnector import SonicDBConfig, SonicV2Connector
from .configdb import ConfigDBConnector
from .configdb import ConfigDBConnector, ConfigDBPipeConnector
from .sonic_db_dump_load import sonic_db_dump_load
except (KeyError, ValueError):
msg = "Failed to database connector objects -- incorrect database config schema."
Expand Down
159 changes: 142 additions & 17 deletions src/swsssdk/configdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,19 @@ def listen(self):
(table, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
if self.handlers.has_key(table):
client = self.get_redis_client(self.db_name)
data = self.__raw_to_typed(client.hgetall(key))
data = self.raw_to_typed(client.hgetall(key))
self.__fire(table, row, data)
except ValueError:
pass #Ignore non table-formated redis entries

def __raw_to_typed(self, raw_data):
if raw_data == None:
def raw_to_typed(self, raw_data):
if raw_data is None:
return None
typed_data = {}
for raw_key in raw_data:
key = raw_key
if PY3K:
key = raw_key.decode('utf-8')
key = raw_key.decode()

# "NULL:NULL" is used as a placeholder for objects with no attributes
if key == "NULL":
Expand All @@ -136,13 +136,13 @@ def __raw_to_typed(self, raw_data):
typed_data[key[:-1]] = value
else:
if PY3K:
typed_data[key] = raw_data[raw_key].decode('utf-8')
typed_data[key] = raw_data[raw_key].decode()
else:
typed_data[key] = raw_data[raw_key]
return typed_data

def __typed_to_raw(self, typed_data):
if typed_data == None:
def typed_to_raw(self, typed_data):
if typed_data is None:
return None
elif typed_data == {}:
return { "NULL": "NULL" }
Expand Down Expand Up @@ -183,11 +183,11 @@ def set_entry(self, table, key, data):
key = self.serialize_key(key)
client = self.get_redis_client(self.db_name)
_hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key)
if data == None:
if data is None:
client.delete(_hash)
else:
original = self.get_entry(table, key)
client.hmset(_hash, self.__typed_to_raw(data))
client.hmset(_hash, self.typed_to_raw(data))
for k in [ k for k in original.keys() if k not in data.keys() ]:
if type(original[k]) == list:
k = k + '@'
Expand All @@ -205,10 +205,10 @@ def mod_entry(self, table, key, data):
key = self.serialize_key(key)
client = self.get_redis_client(self.db_name)
_hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key)
if data == None:
if data is None:
client.delete(_hash)
else:
client.hmset(_hash, self.__typed_to_raw(data))
client.hmset(_hash, self.typed_to_raw(data))

def get_entry(self, table, key):
"""Read a table entry from config db.
Expand All @@ -222,7 +222,7 @@ def get_entry(self, table, key):
key = self.serialize_key(key)
client = self.get_redis_client(self.db_name)
_hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key)
return self.__raw_to_typed(client.hgetall(_hash))
return self.raw_to_typed(client.hgetall(_hash))

def get_keys(self, table, split=True):
"""Read all keys of a table from config db.
Expand All @@ -240,7 +240,7 @@ def get_keys(self, table, split=True):
for key in keys:
try:
if PY3K:
key = key.decode('utf-8')
key = key.decode()
if split:
(_, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
data.append(self.deserialize_key(row))
Expand All @@ -266,10 +266,10 @@ def get_table(self, table):
data = {}
for key in keys:
try:
entry = self.__raw_to_typed(client.hgetall(key))
entry = self.raw_to_typed(client.hgetall(key))
if entry != None:
if PY3K:
key = key.decode('utf-8')
key = key.decode()
(_, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
data[self.deserialize_key(row)] = entry
else:
Expand Down Expand Up @@ -325,13 +325,138 @@ def get_config(self):
data = {}
for key in keys:
if PY3K:
key = key.decode('utf-8')
key = key.decode()
try:
(table_name, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
entry = self.__raw_to_typed(client.hgetall(key))
entry = self.raw_to_typed(client.hgetall(key))
if entry != None:
data.setdefault(table_name, {})[self.deserialize_key(row)] = entry
except ValueError:
pass #Ignore non table-formated redis entries
return data


class ConfigDBPipeConnector(ConfigDBConnector):
REDIS_SCAN_BATCH_SIZE = 30

def __init__(self, **kwargs):
super(ConfigDBPipeConnector, self).__init__(**kwargs)

def __delete_entries(self, client, pipe, pattern, cursor):
"""Helper method to delete table entries from config db using Redis pipeline
with batch size of REDIS_SCAN_BATCH_SIZE.
The caller should call pipeline execute once ready
Args:
client: Redis client
pipe: Redis DB pipe
pattern: key pattern
cursor: position to start scanning from
Returns:
cur: poition of next item to scan
"""
cur, keys = client.scan(cursor=cursor, match=pattern, count=self.REDIS_SCAN_BATCH_SIZE)
for key in keys:
pipe.delete(key)

return cur

def __delete_table(self, client, pipe, table):
"""Helper method to delete table entries from config db using Redis pipeline.
The caller should call pipeline execute once ready
Args:
client: Redis client
pipe: Redis DB pipe
table: Table name.
"""
pattern = '{}{}*'.format(table.upper(), self.TABLE_NAME_SEPARATOR)
cur = self.__delete_entries(client, pipe, pattern, 0)
while cur != 0:
cur = self.__delete_entries(client, pipe, pattern, cur)

def __mod_entry(self, pipe, table, key, data):
"""Modify a table entry to config db.
Args:
table: Table name.
pipe: Redis DB pipe
table: Table name.
key: Key of table entry, or a tuple of keys if it is a multi-key table.
data: Table row data in a form of dictionary {'column_key': 'value', ...}.
Pass {} as data will create an entry with no column if not already existed.
Pass None as data will delete the entry.
"""
key = self.serialize_key(key)
_hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key)
if data is None:
pipe.delete(_hash)
else:
pipe.hmset(_hash, self.typed_to_raw(data))

def mod_config(self, data):
"""Write multiple tables into config db.
Extra entries/fields in the db which are not in the data are kept.
Args:
data: config data in a dictionary form
{
'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...},
'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...},
...
}
"""
client = self.get_redis_client(self.db_name)
pipe = client.pipeline()
for table_name in data:
table_data = data[table_name]
if table_data is None:
self.__delete_table(client, pipe, table_name)
continue
for key in table_data:
self.__mod_entry(pipe, table_name, key, table_data[key])
pipe.execute()
client.bgsave()

def __get_config(self, client, pipe, data, cursor):
"""Read config data in batches of size REDIS_SCAN_BATCH_SIZE using Redis pipelines
Args:
client: Redis client
pipe: Redis DB pipe
data: config dictionary
cursor: position to start scanning from
Returns:
cur: poition of next item to scan
"""
cur, keys = client.scan(cursor=cursor, match='*', count=self.REDIS_SCAN_BATCH_SIZE)
keys = [key.decode() for key in keys if key != self.INIT_INDICATOR]
for key in keys:
pipe.hgetall(key)
records = pipe.execute()

for index, key in enumerate(keys):
(table_name, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
entry = self.raw_to_typed(records[index])
if entry is not None:
data.setdefault(table_name, {})[self.deserialize_key(row)] = entry

return cur

def get_config(self):
"""Read all config data.
Returns:
Config data in a dictionary form of
{
'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...},
'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...},
...
}
"""
client = self.get_redis_client(self.db_name)
pipe = client.pipeline()
data = {}

cur = self.__get_config(client, pipe, data, 0)
while cur != 0:
cur = self.__get_config(client, pipe, data, cur)

return data

0 comments on commit c25d492

Please sign in to comment.