From 9c071b03be97f7ed9edba422784fe98ef1c593b1 Mon Sep 17 00:00:00 2001 From: Matthew D'Alonzo Date: Tue, 16 Apr 2024 15:00:25 -0400 Subject: [PATCH] Clean up Lint Resolve issues related to lint issues. Went through issues returned on flake8 and resolved. --- .github/workflows/tck-test.yml | 2 +- dispatcher/dispatcher.py | 32 +- test_agent/python/testagent.py | 143 ++++++--- test_manager/features/environment.py | 30 +- .../steps/tck_step_implementations.py | 277 +++++++++++------- test_manager/features/utils/loggerutils.py | 2 - test_manager/testmanager.py | 168 +++++++---- up_client_socket/python/socket_transport.py | 3 +- 8 files changed, 425 insertions(+), 232 deletions(-) diff --git a/.github/workflows/tck-test.yml b/.github/workflows/tck-test.yml index 68ec3c61..35c72831 100644 --- a/.github/workflows/tck-test.yml +++ b/.github/workflows/tck-test.yml @@ -45,7 +45,7 @@ jobs: # stop the build if there are Python syntax errors or undefined names flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide - flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + flake8 --ignore E203,E402,W503,W504,F811 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Get Behave Scripts uses: actions/github-script@v6 id: check-env diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index b2dbc800..3de77eee 100644 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -31,8 +31,10 @@ from threading import Lock from typing import Set -logging.basicConfig(format='%(levelname)s| %(filename)s:%(lineno)s %(message)s') -logger = logging.getLogger('File:Line# Debugger') +logging.basicConfig( + format="%(levelname)s| %(filename)s:%(lineno)s %(message)s" +) +logger = logging.getLogger("File:Line# Debugger") logger.setLevel(logging.DEBUG) DISPATCHER_ADDR = ("127.0.0.1", 44444) BYTES_MSG_LENGTH: int = 32767 @@ -64,8 +66,10 @@ def __init__(self): logger.info("Dispatcher server is running/listening") # Register server socket for accepting connections - self.selector.register(self.server, selectors.EVENT_READ, self._accept_client_conn) - + self.selector.register( + self.server, selectors.EVENT_READ, self._accept_client_conn + ) + # Cleanup essentials self.dispatcher_exit = False @@ -77,13 +81,17 @@ def _accept_client_conn(self, server: socket.socket): """ up_client_socket, _ = server.accept() - logger.info(f'accepted conn. {up_client_socket.getpeername()}') + logger.info(f"accepted conn. {up_client_socket.getpeername()}") with self.lock: self.connected_sockets.add(up_client_socket) # Register socket for receiving data - self.selector.register(up_client_socket, selectors.EVENT_READ, self._receive_from_up_client) + self.selector.register( + up_client_socket, + selectors.EVENT_READ, + self._receive_from_up_client, + ) def _receive_from_up_client(self, up_client_socket: socket.socket): """ @@ -101,7 +109,7 @@ def _receive_from_up_client(self, up_client_socket: socket.socket): logger.info(f"received data: {recv_data}") self._flood_to_sockets(recv_data) - except: + except Exception: logger.error("Received error while reading data from up-client") self._close_connected_socket(up_client_socket) @@ -113,11 +121,13 @@ def _flood_to_sockets(self, data: bytes): :param data: The data to be sent. """ # for up_client_socket in self.connected_sockets.copy(): # copy() to avoid RuntimeError - for up_client_socket in self.connected_sockets: + for up_client_socket in self.connected_sockets: try: up_client_socket.sendall(data) except ConnectionAbortedError as e: - logger.error(f"Error sending data to {up_client_socket.getpeername()}: {e}") + logger.error( + f"Error sending data to {up_client_socket.getpeername()}: {e}" + ) self._close_connected_socket(up_client_socket) def listen_for_client_connections(self): @@ -139,10 +149,10 @@ def _close_connected_socket(self, up_client_socket: socket.socket): logger.info(f"closing socket {up_client_socket.getpeername()}") with self.lock: self.connected_sockets.remove(up_client_socket) - + self.selector.unregister(up_client_socket) up_client_socket.close() - + def close(self): self.dispatcher_exit = True for utransport_socket in self.connected_sockets.copy(): diff --git a/test_agent/python/testagent.py b/test_agent/python/testagent.py index a17fcb05..7a16323e 100644 --- a/test_agent/python/testagent.py +++ b/test_agent/python/testagent.py @@ -35,7 +35,11 @@ from google.protobuf.message import Message from google.protobuf.descriptor import FieldDescriptor from google.protobuf.wrappers_pb2 import StringValue -from uprotocol.proto.uattributes_pb2 import UPriority, UMessageType, CallOptions +from uprotocol.proto.uattributes_pb2 import ( + UPriority, + UMessageType, + CallOptions, +) from uprotocol.proto.umessage_pb2 import UMessage from uprotocol.proto.upayload_pb2 import UPayload, UPayloadFormat from uprotocol.proto.ustatus_pb2 import UStatus @@ -53,8 +57,10 @@ sys.path.append("../") from up_client_socket.python.socket_transport import SocketUTransport -logging.basicConfig(format='%(levelname)s| %(filename)s:%(lineno)s %(message)s') -logger = logging.getLogger('File:Line# Debugger') +logging.basicConfig( + format="%(levelname)s| %(filename)s:%(lineno)s %(message)s" +) +logger = logging.getLogger("File:Line# Debugger") logger.setLevel(logging.DEBUG) @@ -63,16 +69,26 @@ class SocketUListener(UListener): def on_receive(self, umsg: UMessage) -> None: logger.info("Listener received") if umsg.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST: - attributes = UAttributesBuilder.response(umsg.attributes.sink, umsg.attributes.source, - UPriority.UPRIORITY_CS4, umsg.attributes.id).build() + attributes = UAttributesBuilder.response( + umsg.attributes.sink, + umsg.attributes.source, + UPriority.UPRIORITY_CS4, + umsg.attributes.id, + ).build() any_obj = any_pb2.Any() any_obj.Pack(StringValue(value="SuccessRPCResponse")) - res_msg = UMessage(attributes=attributes, payload=UPayload(value=any_obj.SerializeToString(), - format=UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY)) + res_msg = UMessage( + attributes=attributes, + payload=UPayload( + value=any_obj.SerializeToString(), + format=UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY, + ), + ) transport.send(res_msg) else: send_to_test_manager(umsg, CONSTANTS.RESPONSE_ON_RECEIVE) + def message_to_dict(message: Message) -> Dict[str, Any]: """Converts protobuf Message to Dict and keeping respective data types @@ -83,39 +99,51 @@ def message_to_dict(message: Message) -> Dict[str, Any]: Dict[str, Any]: Dict/JSON version of the Message """ result: Dict[str, Any] = {} - + all_fields: List[FieldDescriptor] = message.DESCRIPTOR.fields for field in all_fields: - + value = getattr(message, field.name, field.default_value) if isinstance(value, bytes): value: str = value.decode() - if hasattr(value, 'DESCRIPTOR'): + if hasattr(value, "DESCRIPTOR"): result[field.name] = message_to_dict(value) elif field.label == FieldDescriptor.LABEL_REPEATED: repeated = [] for sub_msg in value: - if hasattr(sub_msg, 'DESCRIPTOR'): + if hasattr(sub_msg, "DESCRIPTOR"): repeated.append(message_to_dict(sub_msg)) else: repeated.append(value) result[field.name] = repeated - elif field.label == FieldDescriptor.LABEL_REQUIRED or field.label == FieldDescriptor.LABEL_OPTIONAL: + elif ( + field.label == FieldDescriptor.LABEL_REQUIRED + or field.label == FieldDescriptor.LABEL_OPTIONAL + ): result[field.name] = value return result -def send_to_test_manager(response: Union[Message, str, dict], action: str, received_test_id: str = ""): +def send_to_test_manager( + response: Union[Message, str, dict], + action: str, + received_test_id: str = "", +): if not isinstance(response, (dict, str)): # converts protobuf to dict response = message_to_dict(response) # Create response as json/dict - response_dict = {'data': response, 'action': action, 'ue': 'python', 'test_id': received_test_id} - response_dict = json.dumps(response_dict).encode('utf-8') + response_dict = { + "data": response, + "action": action, + "ue": "python", + "test_id": received_test_id, + } + response_dict = json.dumps(response_dict).encode("utf-8") ta_socket.sendall(response_dict) logger.info(f"Sent to TM {response_dict}") @@ -123,9 +151,9 @@ def send_to_test_manager(response: Union[Message, str, dict], action: str, recei def dict_to_proto(parent_json_obj, parent_proto_obj): def populate_fields(json_obj, proto_obj): for field_name, value in json_obj.items(): - if 'BYTES:' in value: - value = value.replace('BYTES:', '') - value = value.encode('utf-8') + if "BYTES:" in value: + value = value.replace("BYTES:", "") + value = value.encode("utf-8") if hasattr(proto_obj, field_name): if isinstance(value, dict): # Recursively update the nested message object @@ -137,7 +165,7 @@ def populate_fields(json_obj, proto_obj): value = int(value) elif field_type == float: value = float(value) - except: + except Exception: pass setattr(proto_obj, field_name, value) return proto_obj @@ -167,11 +195,17 @@ def handle_unregister_listener_command(json_msg): def handle_invoke_method_command(json_msg): uri = dict_to_proto(json_msg["data"], UUri()) payload = dict_to_proto(json_msg["data"]["payload"], UPayload()) - res_future: Future = transport.invoke_method(uri, payload, CallOptions(ttl=10000)) + res_future: Future = transport.invoke_method( + uri, payload, CallOptions(ttl=10000) + ) def handle_response(message): message: Message = message.result() - send_to_test_manager(message, CONSTANTS.INVOKE_METHOD_COMMAND, received_test_id=json_msg["test_id"]) + send_to_test_manager( + message, + CONSTANTS.INVOKE_METHOD_COMMAND, + received_test_id=json_msg["test_id"], + ) res_future.add_done_callback(handle_response) @@ -179,29 +213,41 @@ def handle_response(message): def send_longserialize_uuri(json_msg: Dict[str, Any]): uri: UUri = dict_to_proto(json_msg["data"], UUri()) serialized_uuri: str = LongUriSerializer().serialize(uri) - send_to_test_manager(serialized_uuri, CONSTANTS.SERIALIZE_URI, received_test_id=json_msg["test_id"]) + send_to_test_manager( + serialized_uuri, + CONSTANTS.SERIALIZE_URI, + received_test_id=json_msg["test_id"], + ) def send_longdeserialize_uri(json_msg: Dict[str, Any]): uuri: UUri = LongUriSerializer().deserialize(json_msg["data"]) - send_to_test_manager(uuri, CONSTANTS.DESERIALIZE_URI, received_test_id=json_msg["test_id"]) + send_to_test_manager( + uuri, CONSTANTS.DESERIALIZE_URI, received_test_id=json_msg["test_id"] + ) def send_longdeserialize_uuid(json_msg: Dict[str, Any]): uuid: UUID = LongUuidSerializer().deserialize(json_msg["data"]) - send_to_test_manager(uuid, CONSTANTS.DESERIALIZE_UUID, received_test_id=json_msg["test_id"]) + send_to_test_manager( + uuid, CONSTANTS.DESERIALIZE_UUID, received_test_id=json_msg["test_id"] + ) def send_longserialize_uuid(json_msg: Dict[str, Any]): uuid: UUID = dict_to_proto(json_msg["data"], UUID()) serialized_uuid: str = LongUuidSerializer().serialize(uuid) - send_to_test_manager(serialized_uuid, CONSTANTS.SERIALIZE_UUID, received_test_id=json_msg["test_id"]) + send_to_test_manager( + serialized_uuid, + CONSTANTS.SERIALIZE_UUID, + received_test_id=json_msg["test_id"], + ) def handle_uri_validate_command(json_msg): val_type = json_msg["data"]["type"] uri = LongUriSerializer().deserialize(json_msg["data"].get("uri")) - + validator_func = { "uri": UriValidator.validate, "rpc_response": UriValidator.validate_rpc_response, @@ -211,7 +257,7 @@ def handle_uri_validate_command(json_msg): "is_micro_form": UriValidator.is_micro_form, "is_long_form_uuri": UriValidator.is_long_form, "is_long_form_uauthority": UriValidator.is_long_form, - "is_local": UriValidator.is_local + "is_local": UriValidator.is_local, }.get(val_type) if validator_func: @@ -222,19 +268,24 @@ def handle_uri_validate_command(json_msg): else: result = str(status.is_success()) message = status.get_message() - send_to_test_manager({"result": result, "message": message}, CONSTANTS.VALIDATE_URI, received_test_id=json_msg["test_id"]) - - -action_handlers = {CONSTANTS.SEND_COMMAND: handle_send_command, - CONSTANTS.REGISTER_LISTENER_COMMAND: handle_register_listener_command, - CONSTANTS.UNREGISTER_LISTENER_COMMAND: handle_unregister_listener_command, - CONSTANTS.INVOKE_METHOD_COMMAND: handle_invoke_method_command, - CONSTANTS.SERIALIZE_URI: send_longserialize_uuri, - CONSTANTS.DESERIALIZE_URI: send_longdeserialize_uri, - CONSTANTS.SERIALIZE_UUID: send_longserialize_uuid, - CONSTANTS.DESERIALIZE_UUID: send_longdeserialize_uuid, - CONSTANTS.VALIDATE_URI: handle_uri_validate_command, - } + send_to_test_manager( + {"result": result, "message": message}, + CONSTANTS.VALIDATE_URI, + received_test_id=json_msg["test_id"], + ) + + +action_handlers = { + CONSTANTS.SEND_COMMAND: handle_send_command, + CONSTANTS.REGISTER_LISTENER_COMMAND: handle_register_listener_command, + CONSTANTS.UNREGISTER_LISTENER_COMMAND: handle_unregister_listener_command, + CONSTANTS.INVOKE_METHOD_COMMAND: handle_invoke_method_command, + CONSTANTS.SERIALIZE_URI: send_longserialize_uuri, + CONSTANTS.DESERIALIZE_URI: send_longdeserialize_uri, + CONSTANTS.SERIALIZE_UUID: send_longserialize_uuid, + CONSTANTS.DESERIALIZE_UUID: send_longdeserialize_uuid, + CONSTANTS.VALIDATE_URI: handle_uri_validate_command, +} def process_message(json_data): @@ -245,7 +296,9 @@ def process_message(json_data): # For UTransport interface methods if status is not None: - send_to_test_manager(status, action, received_test_id=json_data["test_id"]) + send_to_test_manager( + status, action, received_test_id=json_data["test_id"] + ) def receive_from_tm(): @@ -254,16 +307,16 @@ def receive_from_tm(): if not recv_data or recv_data == b"": return # Deserialize the JSON data - json_data = json.loads(recv_data.decode('utf-8')) - logger.info('Received data from test manager: %s', json_data) + json_data = json.loads(recv_data.decode("utf-8")) + logger.info("Received data from test manager: %s", json_data) process_message(json_data) -if __name__ == '__main__': +if __name__ == "__main__": listener = SocketUListener() transport = SocketUTransport() ta_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ta_socket.connect(CONSTANTS.TEST_MANAGER_ADDR) thread = Thread(target=receive_from_tm) thread.start() - send_to_test_manager({'SDK_name': "python"}, "initialize") + send_to_test_manager({"SDK_name": "python"}, "initialize") diff --git a/test_manager/features/environment.py b/test_manager/features/environment.py index 132ab351..5c5dd54b 100644 --- a/test_manager/features/environment.py +++ b/test_manager/features/environment.py @@ -32,7 +32,6 @@ from typing import List import git -from behave import formatter from behave.runner import Context sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) @@ -40,35 +39,50 @@ from utils import loggerutils PYTHON_TA_PATH = "/test_agent/python/testagent.py" -JAVA_TA_PATH = "/test_agent/java/target/tck-test-agent-java-jar-with-dependencies.jar" +JAVA_TA_PATH = ( + "/test_agent/java/target/tck-test-agent-java-jar-with-dependencies.jar" +) DISPATCHER_PATH = "/dispatcher/dispatcher.py" -repo = git.Repo('.', search_parent_directories=True) +repo = git.Repo(".", search_parent_directories=True) sys.path.append(repo.working_tree_dir) from dispatcher.dispatcher import Dispatcher + def create_command(filepath_from_root_repo: str) -> List[str]: command: List[str] = [] - if filepath_from_root_repo.endswith('.jar'): + if filepath_from_root_repo.endswith(".jar"): command.append("java") command.append("-jar") - elif filepath_from_root_repo.endswith('.py'): + elif filepath_from_root_repo.endswith(".py"): if sys.platform == "win32": command.append("python") - elif sys.platform == "linux" or sys.platform == "linux2" or sys.platform == "darwin": + elif ( + sys.platform == "linux" + or sys.platform == "linux2" + or sys.platform == "darwin" + ): command.append("python3") else: raise Exception("only accept .jar and .py files") - command.append(os.path.abspath(os.path.dirname(os.getcwd()) + "/" + filepath_from_root_repo)) + command.append( + os.path.abspath( + os.path.dirname(os.getcwd()) + "/" + filepath_from_root_repo + ) + ) return command def create_subprocess(command: List[str]) -> subprocess.Popen: if sys.platform == "win32": process = subprocess.Popen(command, shell=True) - elif sys.platform == "linux" or sys.platform == "linux2" or sys.platform == "darwin": + elif ( + sys.platform == "linux" + or sys.platform == "linux2" + or sys.platform == "darwin" + ): process = subprocess.Popen(command) else: print(sys.platform) diff --git a/test_manager/features/steps/tck_step_implementations.py b/test_manager/features/steps/tck_step_implementations.py index ae8baf3c..11dd015c 100644 --- a/test_manager/features/steps/tck_step_implementations.py +++ b/test_manager/features/steps/tck_step_implementations.py @@ -27,16 +27,14 @@ import codecs from typing import Any, Dict from uprotocol.proto.ustatus_pb2 import UCode -import time -import re from behave import when, then, given from behave.runner import Context from hamcrest import assert_that, equal_to -@given(u'"{sdk_name}" creates data for "{command}"') -@when(u'"{sdk_name}" creates data for "{command}"') +@given('"{sdk_name}" creates data for "{command}"') +@when('"{sdk_name}" creates data for "{command}"') def create_sdk_data(context, sdk_name: str, command: str): context.json_dict = {} context.status_json = None @@ -48,44 +46,51 @@ def create_sdk_data(context, sdk_name: str, command: str): context.action = command -@then(u'receives uri serialization "{expected_uri}"') +@then('receives uri serialization "{expected_uri}"') def serialized_uri_received(context, expected_uri: str): try: actual_uuid: str = context.response_data assert_that(expected_uri, equal_to(actual_uuid)) - except AssertionError as ae: - raise AssertionError(f"Assertion error. Expected is {expected_uri} but " - f"received {actual_uuid}") + except AssertionError: + raise AssertionError( + f"Assertion error. Expected is {expected_uri} but " + f"received {actual_uuid}" + ) except Exception as ae: raise ValueError(f"Expection occured. {ae}") -@then(u'receives uuid serialization "{expected_uuid}"') +@then('receives uuid serialization "{expected_uuid}"') def serialized_uuid_received(context, expected_uuid: str): try: actual_uuid: str = context.response_data assert_that(expected_uuid, equal_to(actual_uuid)) - except AssertionError as ae: - raise AssertionError(f"Assertion error. Expected is {expected_uuid} but " - f"received {actual_uuid}") + except AssertionError: + raise AssertionError( + f"Assertion error. Expected is {expected_uuid} but " + f"received {actual_uuid}" + ) except Exception as ae: raise ValueError(f"Expection occured. {ae}") - -@then(u'receives validation result as "{expected_result}"') + + +@then('receives validation result as "{expected_result}"') def receive_validation_result(context, expected_result): try: expected_result = expected_result.strip() actual_val_res = context.response_data["result"] assert_that(expected_result, equal_to(actual_val_res)) - except AssertionError as ae: - raise AssertionError(f"Assertion error. Expected is {expected_result} but " - f"received {repr(actual_val_res)}") + except AssertionError: + raise AssertionError( + f"Assertion error. Expected is {expected_result} but " + f"received {repr(actual_val_res)}" + ) except Exception as ae: raise ValueError(f"Expection occured. {ae}") - -@then(u'receives validation message as "{expected_message}"') + +@then('receives validation message as "{expected_message}"') def receive_validation_result(context, expected_message): if expected_message == "none": return @@ -93,69 +98,94 @@ def receive_validation_result(context, expected_message): expected_message = expected_message.strip() actual_val_msg = context.response_data["message"] assert_that(expected_message, equal_to(actual_val_msg)) - except AssertionError as ae: - raise AssertionError(f"Assertion error. Expected is {expected_message} but " - f"received {repr(actual_val_msg)}") + except AssertionError: + raise AssertionError( + f"Assertion error. Expected is {expected_message} but " + f"received {repr(actual_val_msg)}" + ) except Exception as ae: raise ValueError(f"Expection occured. {ae}") -@when(u'sends a "{command}" request with serialized input "{serialized}"') +@when('sends a "{command}" request with serialized input "{serialized}"') def send_serialized_command(context, command: str, serialized: str): context.logger.info(f"Json request for {command} -> {serialized}") - response_json: Dict[str, Any] = context.tm.request(context.ue, context.action, serialized) + response_json: Dict[str, Any] = context.tm.request( + context.ue, context.action, serialized + ) context.logger.info(f"Response Json {command} -> {response_json}") - + if response_json is None: raise AssertionError("Response from Test Manager is None") - elif 'data' not in response_json: - raise AssertionError("\"data\" field name doesn't exist on top response JSON level") - context.response_data = response_json['data'] + elif "data" not in response_json: + raise AssertionError( + '"data" field name doesn\'t exist on top response JSON level' + ) + context.response_data = response_json["data"] -@then(u'the deserialized uri received should have the following properties') + +@then("the deserialized uri received should have the following properties") def verify_uri_received_properties(context): deserialized_uri: Dict[str, Any] = flatten_dict(context.response_data) context.logger.info(f"deserialized_uri_dict -> {deserialized_uri}") - - # Iterate over the rows of the table and verify the received properties - int_type_fields = set(["entity.id", "entity.version_major", "entity.version_minor", 'resource.id']) + + # Iterate over the rows of the table and verify the received properties + int_type_fields = set( + [ + "entity.id", + "entity.version_major", + "entity.version_minor", + "resource.id", + ] + ) bytes_type_fields = set(["authority.id", "authority.ip"]) try: for row in context.table: - field: str = row['Field'] - expected_value: str = row['Value'] - context.logger.info(f"field {field}; {deserialized_uri[field]} vs. {expected_value}") + field: str = row["Field"] + expected_value: str = row["Value"] + context.logger.info( + f"field {field}; {deserialized_uri[field]} vs. {expected_value}" + ) if len(expected_value) > 0: if field in int_type_fields: expected_value = int(expected_value) elif field in bytes_type_fields: expected_value: bytes = expected_value.encode() - deserialized_uri[field] = str(deserialized_uri[field]).encode() + deserialized_uri[field] = str( + deserialized_uri[field] + ).encode() assert_that(deserialized_uri[field], equal_to(expected_value)) else: - assert_that(len(deserialized_uri[field]) > 0, equal_to(len(expected_value) > 0)) + assert_that( + len(deserialized_uri[field]) > 0, + equal_to(len(expected_value) > 0), + ) except AssertionError as ae: raise AssertionError(f"Assertion error. {ae}") -@then(u'the deserialized uuid received should have the following properties') +@then("the deserialized uuid received should have the following properties") def verify_uuid_received_properties(context): - context.logger.info(f"deserialized context.response_data -> {context.response_data}") + context.logger.info( + f"deserialized context.response_data -> {context.response_data}" + ) - deserialized_uuid: Dict[str, int]= flatten_dict(context.response_data) + deserialized_uuid: Dict[str, int] = flatten_dict(context.response_data) context.logger.info(f"deserialized_uuid_dict -> {deserialized_uuid}") - + # Iterate over the rows of the table and verify the received properties int_type_fields = set(["msb", "lsb"]) try: for row in context.table: - field = row['Field'] - expected_value = row['Value'] - assert_that(field in deserialized_uuid, equal_to(len(expected_value) > 0)) - + field = row["Field"] + expected_value = row["Value"] + assert_that( + field in deserialized_uuid, equal_to(len(expected_value) > 0) + ) + if len(expected_value) > 0: if field in int_type_fields: expected_value: int = int(expected_value) @@ -164,82 +194,111 @@ def verify_uuid_received_properties(context): raise AssertionError(f"Assertion error. {ae}") -@given(u'sets "{key}" to "{value}"') -@when(u'sets "{key}" to "{value}"') -def set_key_to_val(context: Context, key: str, value: str): +@given('sets "{key}" to "{value}"') +@when('sets "{key}" to "{value}"') +def set_key_to_val(context: Context, key: str, value: str): if key not in context.json_dict: context.json_dict[key] = value -@given(u'sets "{key}" to ""') +@given('sets "{key}" to ""') def set_blank_key(context, key): pass -@given(u'sets "{key}" to b"{value}"') -@when(u'sets "{key}" to b"{value}"') +@given('sets "{key}" to b"{value}"') +@when('sets "{key}" to b"{value}"') def set_key_to_bytes(context, key: str, value: str): if key not in context.json_dict: context.json_dict[key] = "BYTES:" + value -@given(u'sends "{command}" request') -@when(u'sends "{command}" request') +@given('sends "{command}" request') +@when('sends "{command}" request') def send_command_request(context, command: str): context.json_dict = unflatten_dict(context.json_dict) - context.logger.info(f"Json request for {command} -> {str(context.json_dict)}") + context.logger.info( + f"Json request for {command} -> {str(context.json_dict)}" + ) - response_json: Dict[str, Any] = context.tm.request(context.ue, command, context.json_dict) + response_json: Dict[str, Any] = context.tm.request( + context.ue, command, context.json_dict + ) context.logger.info(f"Response Json {command} -> {response_json}") - context.response_data = response_json['data'] + context.response_data = response_json["data"] + -@then(u'the status received with "{field_name}" is "{expected_value}"') +@then('the status received with "{field_name}" is "{expected_value}"') def receive_status(context, field_name: str, expected_value: str): try: actual_value: str = context.response_data[field_name] expected_value: int = getattr(UCode, expected_value) assert_that(expected_value, equal_to(actual_value)) - except AssertionError as ae: - raise AssertionError(f"Assertion error. Expected is {expected_value} but " - f"received {context.response_data[field_name]}") + except AssertionError: + raise AssertionError( + f"Assertion error. Expected is {expected_value} but " + f"received {context.response_data[field_name]}" + ) except Exception as ae: raise ValueError(f"Expection occured. {ae}") -@then(u'"{sender_sdk_name}" sends onreceive message with field "{field_name}" as b"{expected_value}"') -def receive_value_as_bytes(context, sender_sdk_name: str, field_name: str, expected_value: str): +@then( + '"{sender_sdk_name}" sends onreceive message with field "{field_name}" as b"{expected_value}"' +) +def receive_value_as_bytes( + context, sender_sdk_name: str, field_name: str, expected_value: str +): try: expected_value = expected_value.strip() context.logger.info(f"getting on_receive_msg from {sender_sdk_name}") - on_receive_msg: Dict[str, Any] = context.tm.get_onreceive(sender_sdk_name) + on_receive_msg: Dict[str, Any] = context.tm.get_onreceive( + sender_sdk_name + ) context.logger.info(f"got on_receive_msg: {on_receive_msg}") val = access_nested_dict(on_receive_msg["data"], field_name) context.logger.info(f"val {field_name}: {val}") - rec_field_value = val.encode('utf-8') - assert rec_field_value.split(b'googleapis.com/')[1] == expected_value.encode('utf-8').split(b'googleapis.com/')[1] - - except AssertionError as ae: - raise AssertionError(f"Assertion error. Expected is {expected_value.encode('utf-8')} but " - f"received {rec_field_value}") + rec_field_value = val.encode("utf-8") + assert ( + rec_field_value.split(b"googleapis.com/")[1] + == expected_value.encode("utf-8").split(b"googleapis.com/")[1] + ) + + except AssertionError: + raise AssertionError( + f"Assertion error. Expected is {expected_value.encode('utf-8')} but " + f"received {rec_field_value}" + ) except Exception as ae: raise ValueError(f"Expection occured. {ae}") -@then(u'"{sdk_name}" receives data field "{field_name}" as b"{expected_value}"') -def receive_rpc_response_as_bytes(context, sdk_name, field_name: str, expected_value: str): +@then('"{sdk_name}" receives data field "{field_name}" as b"{expected_value}"') +def receive_rpc_response_as_bytes( + context, sdk_name, field_name: str, expected_value: str +): try: - actual_value: str = access_nested_dict(context.response_data, field_name) - actual_value: bytes = actual_value.encode('utf-8') - + actual_value: str = access_nested_dict( + context.response_data, field_name + ) + actual_value: bytes = actual_value.encode("utf-8") + # Convert bytes to byte string with escape sequences - actual_value = codecs.encode(actual_value.decode('utf-8'), 'unicode_escape') - assert actual_value.split(b'googleapis.com/')[1] == expected_value.encode('utf-8').split(b'googleapis.com/')[1] - except KeyError as ke: + actual_value = codecs.encode( + actual_value.decode("utf-8"), "unicode_escape" + ) + assert ( + actual_value.split(b"googleapis.com/")[1] + == expected_value.encode("utf-8").split(b"googleapis.com/")[1] + ) + except KeyError: raise KeyError(f"Key error. {sdk_name} has not received rpc response.") - except AssertionError as ae: - raise AssertionError(f"Assertion error. Expected is {expected_value.encode('utf-8')} but " - f"received {repr(actual_value)}") + except AssertionError: + raise AssertionError( + f"Assertion error. Expected is {expected_value.encode('utf-8')} but " + f"received {repr(actual_value)}" + ) except Exception as ae: raise ValueError(f"Expection occured. {ae}") @@ -247,53 +306,71 @@ def receive_rpc_response_as_bytes(context, sdk_name, field_name: str, expected_v def bytes_to_base64_str(b: bytes) -> str: return base64.b64encode(b).decode("ascii") + def base64_str_to_bytes(base64_str: str) -> bytes: - base64_bytes: bytes = base64_str.encode("ascii") + base64_bytes: bytes = base64_str.encode("ascii") return base64.b64decode(base64_bytes) -@then(u'receives micro serialized uri "{expected_bytes_as_base64_str}"') + +@then('receives micro serialized uri "{expected_bytes_as_base64_str}"') def receive_micro_serialized_uuri(context, expected_bytes_as_base64_str: str): if expected_bytes_as_base64_str == "": expected_bytes_as_base64_str = "" - + expected_bytes: bytes = base64_str_to_bytes(expected_bytes_as_base64_str) context.logger.info(f"expected_bytes: {expected_bytes}") try: actual_bytes_as_str: str = context.response_data actual_bytes: bytes = actual_bytes_as_str.encode("iso-8859-1") - - context.logger.info(f"actual: {actual_bytes} | expect: {expected_bytes}") + + context.logger.info( + f"actual: {actual_bytes} | expect: {expected_bytes}" + ) assert_that(expected_bytes, equal_to(actual_bytes)) - except AssertionError as ae: - raise AssertionError(f"Assertion error. Expected is {expected_bytes} but " - f"received {actual_bytes}") + except AssertionError: + raise AssertionError( + f"Assertion error. Expected is {expected_bytes} but " + f"received {actual_bytes}" + ) except Exception as ae: raise ValueError(f"Expection occured. {ae}") -@when(u'sends a "{command}" request with micro serialized input "{micro_serialized_uri_as_base64_str}"') -def send_micro_serialized_command(context, command: str, micro_serialized_uri_as_base64_str: str): + +@when( + 'sends a "{command}" request with micro serialized input "{micro_serialized_uri_as_base64_str}"' +) +def send_micro_serialized_command( + context, command: str, micro_serialized_uri_as_base64_str: str +): if micro_serialized_uri_as_base64_str == "": micro_serialized_uri_as_base64_str = "" - - micro_serialized_uri: bytes = base64_str_to_bytes(micro_serialized_uri_as_base64_str) - context.logger.info(f"Json request for {command} -> {micro_serialized_uri}") - + + micro_serialized_uri: bytes = base64_str_to_bytes( + micro_serialized_uri_as_base64_str + ) + context.logger.info( + f"Json request for {command} -> {micro_serialized_uri}" + ) + micro_serialized_uri_as_str = micro_serialized_uri.decode("iso-8859-1") - response_json: Dict[str, Any] = context.tm.request(context.ue, command, micro_serialized_uri_as_str) - + response_json: Dict[str, Any] = context.tm.request( + context.ue, command, micro_serialized_uri_as_str + ) + context.logger.info(f"Response Json {command} -> {response_json}") - context.response_data = response_json['data'] + context.response_data = response_json["data"] + def access_nested_dict(dictionary, keys): - keys = keys.split('.') + keys = keys.split(".") value = dictionary for key in keys: value = value[key] return value -def flatten_dict(nested_dict, parent_key='', sep='.'): +def flatten_dict(nested_dict, parent_key="", sep="."): flattened = {} for k, v in nested_dict.items(): new_key = parent_key + sep + k if parent_key else k @@ -304,7 +381,7 @@ def flatten_dict(nested_dict, parent_key='', sep='.'): return flattened -def unflatten_dict(d, delimiter='.'): +def unflatten_dict(d, delimiter="."): unflattened = {} for key, value in d.items(): parts = key.split(delimiter) diff --git a/test_manager/features/utils/loggerutils.py b/test_manager/features/utils/loggerutils.py index 247e4642..05e23611 100644 --- a/test_manager/features/utils/loggerutils.py +++ b/test_manager/features/utils/loggerutils.py @@ -26,12 +26,10 @@ # -*- coding: UTF-8 -*- -import os import logging import time from logging.config import dictConfig -from time import strftime # Log folder directory LOG_DIR = "logs" diff --git a/test_manager/testmanager.py b/test_manager/testmanager.py index ef5034ad..16aed277 100644 --- a/test_manager/testmanager.py +++ b/test_manager/testmanager.py @@ -35,8 +35,10 @@ from multimethod import multimethod import sys -logging.basicConfig(format='%(levelname)s| %(filename)s:%(lineno)s %(message)s') -logger = logging.getLogger('File:Line# Debugger') +logging.basicConfig( + format="%(levelname)s| %(filename)s:%(lineno)s %(message)s" +) +logger = logging.getLogger("File:Line# Debugger") logger.setLevel(logging.DEBUG) BYTES_MSG_LENGTH: int = 32767 @@ -44,29 +46,38 @@ def convert_json_to_jsonstring(j: Dict[str, AnyType]) -> str: return json.dumps(j) + def convert_str_to_bytes(string: str) -> bytes: return str.encode(string) + def send_socket_data(s: socket.socket, msg: bytes): s.sendall(msg) - + + def is_close_socket_signal(received_data: bytes) -> bool: - return received_data == b'' + return received_data == b"" class TestAgentConnectionDatabase: def __init__(self) -> None: - self.test_agent_address_to_name: Dict[tuple[str, int], str] = defaultdict(str) - self.test_agent_name_to_address: Dict[str, socket.socket] = {} + self.test_agent_address_to_name: Dict[tuple[str, int], str] = ( + defaultdict(str) + ) + self.test_agent_name_to_address: Dict[str, socket.socket] = {} self.lock = Lock() - + def add(self, test_agent_socket: socket.socket, test_agent_name: str): test_agent_address: tuple[str, int] = test_agent_socket.getpeername() - + with self.lock: - self.test_agent_address_to_name[test_agent_address] = test_agent_name - self.test_agent_name_to_address[test_agent_name] = test_agent_socket - + self.test_agent_address_to_name[test_agent_address] = ( + test_agent_name + ) + self.test_agent_name_to_address[test_agent_name] = ( + test_agent_socket + ) + @multimethod def get(self, address: Tuple[str, int]) -> socket.socket: test_agent_name: str = self.test_agent_address_to_name[address] @@ -75,59 +86,67 @@ def get(self, address: Tuple[str, int]) -> socket.socket: @multimethod def get(self, name: str) -> socket.socket: return self.test_agent_name_to_address[name] - + def contains(self, test_agent_name: str): return test_agent_name in self.test_agent_name_to_address - + @multimethod def close(self, test_agent_name: str): if test_agent_name is None or test_agent_name == "": return test_agent_socket: socket.socket = self.get(test_agent_name) self.close(test_agent_socket) - + @multimethod def close(self, test_agent_socket: socket.socket): test_agent_address: tuple[str, int] = test_agent_socket.getpeername() - test_agent_name: str = self.test_agent_address_to_name.get(test_agent_address, None) - + test_agent_name: str = self.test_agent_address_to_name.get( + test_agent_address, None + ) + if test_agent_name is None: return with self.lock: del self.test_agent_address_to_name[test_agent_address] del self.test_agent_name_to_address[test_agent_name] - + test_agent_socket.close() class DictWithQueue: def __init__(self) -> None: - self.key_to_queue: Dict[str, Deque[Dict[str, Any]]] = defaultdict(deque) + self.key_to_queue: Dict[str, Deque[Dict[str, Any]]] = defaultdict( + deque + ) self.lock = Lock() - + def append(self, key: str, msg: Dict[str, Any]) -> None: with self.lock: self.key_to_queue[key].append(msg) - logger.info(f'self.key_to_queue append {self.key_to_queue}') - - def contains(self, key: str, inner_key: str, inner_expected_value: str) -> bool: + logger.info(f"self.key_to_queue append {self.key_to_queue}") + + def contains( + self, key: str, inner_key: str, inner_expected_value: str + ) -> bool: queue: Deque[Dict[str, Any]] = self.key_to_queue[key] if len(queue) == 0: return False - + response_json: Dict[str, Any] = queue[0] incoming_req_id: str = response_json[inner_key] - - return incoming_req_id == inner_expected_value - + + return incoming_req_id == inner_expected_value + def popleft(self, key: str) -> Any: with self.lock: onreceive: Any = self.key_to_queue[key].popleft() - logger.info(f'self.key_to_queue popleft {onreceive["action"]} {self.key_to_queue}') + logger.info( + f'self.key_to_queue popleft {onreceive["action"]} {self.key_to_queue}' + ) return onreceive - - + + class TestManager: def __init__(self, bdd_context, ip_addr: str, port: int): self.exit_manager = False @@ -137,7 +156,7 @@ def __init__(self, bdd_context, ip_addr: str, port: int): self.action_type_to_response_queue = DictWithQueue() self.lock = Lock() self.bdd_context = bdd_context - + # Create server socket self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if sys.platform != "win32": @@ -149,7 +168,9 @@ def __init__(self, bdd_context, ip_addr: str, port: int): logger.info("TM server is running/listening") # Register server socket for accepting connections - self.socket_event_receiver.register(self.server, selectors.EVENT_READ, self._accept_client_conn) + self.socket_event_receiver.register( + self.server, selectors.EVENT_READ, self._accept_client_conn + ) def _accept_client_conn(self, server: socket.socket): """ @@ -158,10 +179,12 @@ def _accept_client_conn(self, server: socket.socket): :param server: The server socket. """ ta_socket, _ = server.accept() - logger.info(f'accepted conn. {ta_socket.getpeername()}') + logger.info(f"accepted conn. {ta_socket.getpeername()}") # Register socket for receiving data - self.socket_event_receiver.register(ta_socket, selectors.EVENT_READ, self._receive_from_test_agent) + self.socket_event_receiver.register( + ta_socket, selectors.EVENT_READ, self._receive_from_test_agent + ) def _receive_from_test_agent(self, test_agent: socket.socket): """ @@ -174,20 +197,24 @@ def _receive_from_test_agent(self, test_agent: socket.socket): if is_close_socket_signal(recv_data): self.close_test_agent(test_agent) return - json_data = json.loads(recv_data.decode('utf-8')) - logger.info('Received from test agent: %s', json_data) + json_data = json.loads(recv_data.decode("utf-8")) + logger.info("Received from test agent: %s", json_data) # self._process_message(json_data, test_agent) - self._process_receive_message(json_data,test_agent ) + self._process_receive_message(json_data, test_agent) - def _process_receive_message(self, response_json: Dict[str, Any], ta_socket: socket.socket): - if response_json['action'] == 'initialize': - test_agent_sdk: str = response_json['data']["SDK_name"].lower().strip() + def _process_receive_message( + self, response_json: Dict[str, Any], ta_socket: socket.socket + ): + if response_json["action"] == "initialize": + test_agent_sdk: str = ( + response_json["data"]["SDK_name"].lower().strip() + ) self.test_agent_database.add(ta_socket, test_agent_sdk) return - + action_type: str = response_json["action"] self.action_type_to_response_queue.append(action_type, response_json) - + def has_sdk_connection(self, test_agent_name: str) -> bool: return self.test_agent_database.contains(test_agent_name) @@ -195,65 +222,80 @@ def listen_for_incoming_events(self): """ Listens for Test Agent connections and messages, then creates a thread to start the init process """ - + while not self.exit_manager: # Wait until some registered file objects or sockets become ready, or the timeout expires. events = self.socket_event_receiver.select(timeout=0) for key, mask in events: callback = key.data callback(key.fileobj) - - def request(self, test_agent_name: str, action: str, data: Dict[str, AnyType], payload: Dict[str, AnyType]=None): - """Sends a blocking request message to sdk Test Agent (ex: Java, Rust, C++ Test Agent) - """ + + def request( + self, + test_agent_name: str, + action: str, + data: Dict[str, AnyType], + payload: Dict[str, AnyType] = None, + ): + """Sends a blocking request message to sdk Test Agent (ex: Java, Rust, C++ Test Agent)""" # Get Test Agent's socket test_agent_name = test_agent_name.lower().strip() - test_agent_socket: socket.socket = self.test_agent_database.get(test_agent_name) + test_agent_socket: socket.socket = self.test_agent_database.get( + test_agent_name + ) # Create a request json to send to specific Test Agent test_id: str = str(uuid.uuid4()) - request_json = {'data': data, 'action': action, "test_id": test_id} + request_json = {"data": data, "action": action, "test_id": test_id} if payload is not None: - request_json['payload'] = payload - + request_json["payload"] = payload + # Pack json as binary request_str: str = convert_json_to_jsonstring(request_json) request_bytes: bytes = convert_str_to_bytes(request_str) - + send_socket_data(test_agent_socket, request_bytes) logger.info(f"Sent to TestAgent{request_json}") - + # Wait until get response logger.info(f"Waiting test_id {test_id}") - while not self.action_type_to_response_queue.contains(action, "test_id", test_id): + while not self.action_type_to_response_queue.contains( + action, "test_id", test_id + ): pass logger.info(f"Received test_id {test_id}") # Get response - response_json: Dict[str, Any] = self.action_type_to_response_queue.popleft(action) + response_json: Dict[str, Any] = ( + self.action_type_to_response_queue.popleft(action) + ) return response_json - + def _wait_for_onreceive(self, test_agent_name: str): - while not self.action_type_to_response_queue.contains('onreceive', 'ue', test_agent_name): + while not self.action_type_to_response_queue.contains( + "onreceive", "ue", test_agent_name + ): pass - + def get_onreceive(self, test_agent_name: str) -> Dict[str, Any]: self._wait_for_onreceive(test_agent_name) - - return self.action_type_to_response_queue.popleft('onreceive') - + + return self.action_type_to_response_queue.popleft("onreceive") + @multimethod def close_test_agent(self, test_agent_socket: socket.socket): # Stop monitoring socket/fileobj. A file object shall be unregistered prior to being closed. self.socket_event_receiver.unregister(test_agent_socket) self.test_agent_database.close(test_agent_socket) - + @multimethod def close_test_agent(self, test_agent_name: str): if self.test_agent_database.contains(test_agent_name): - test_agent: socket.socket = self.test_agent_database.get(test_agent_name) + test_agent: socket.socket = self.test_agent_database.get( + test_agent_name + ) self.close_test_agent(test_agent) - + def close(self): """Close the selector / test manager's server, BUT need to free its individual SDK TA connections using self.close_ta(sdk) first diff --git a/up_client_socket/python/socket_transport.py b/up_client_socket/python/socket_transport.py index 73aff24b..c7310c16 100644 --- a/up_client_socket/python/socket_transport.py +++ b/up_client_socket/python/socket_transport.py @@ -42,7 +42,6 @@ from uprotocol.transport.ulistener import UListener from uprotocol.transport.utransport import UTransport from uprotocol.uri.factory.uresource_builder import UResourceBuilder -from uprotocol.uri.serializer.longuriserializer import LongUriSerializer from uprotocol.uri.validator.urivalidator import UriValidator from uprotocol.uuid.serializer.longuuidserializer import LongUuidSerializer @@ -153,7 +152,7 @@ def send(self, message: UMessage) -> UStatus: umsg_serialized: bytes = message.SerializeToString() try: self.socket.sendall(umsg_serialized) - logger.info(f"uMessage Sent to dispatcher from python socket transport") + logger.info("uMessage Sent to dispatcher from python socket transport") except OSError as e: logger.exception(f"INTERNAL ERROR: {e}") return UStatus(code=UCode.INTERNAL, message=f"INTERNAL ERROR: {e}")