Skip to content

Commit

Permalink
Clean up Lint
Browse files Browse the repository at this point in the history
Resolve issues related to lint issues. Went through issues returned on flake8 and resolved.
  • Loading branch information
matthewd0123 committed Apr 16, 2024
1 parent 8b044f0 commit 9c071b0
Show file tree
Hide file tree
Showing 8 changed files with 425 additions and 232 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tck-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
flake8 --ignore E203,E402,W503,W504,F811 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Get Behave Scripts
uses: actions/github-script@v6
id: check-env
Expand Down
32 changes: 21 additions & 11 deletions dispatcher/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
from threading import Lock
from typing import Set

logging.basicConfig(format='%(levelname)s| %(filename)s:%(lineno)s %(message)s')
logger = logging.getLogger('File:Line# Debugger')
logging.basicConfig(
format="%(levelname)s| %(filename)s:%(lineno)s %(message)s"
)
logger = logging.getLogger("File:Line# Debugger")
logger.setLevel(logging.DEBUG)
DISPATCHER_ADDR = ("127.0.0.1", 44444)
BYTES_MSG_LENGTH: int = 32767
Expand Down Expand Up @@ -64,8 +66,10 @@ def __init__(self):
logger.info("Dispatcher server is running/listening")

# Register server socket for accepting connections
self.selector.register(self.server, selectors.EVENT_READ, self._accept_client_conn)

self.selector.register(
self.server, selectors.EVENT_READ, self._accept_client_conn
)

# Cleanup essentials
self.dispatcher_exit = False

Expand All @@ -77,13 +81,17 @@ def _accept_client_conn(self, server: socket.socket):
"""

up_client_socket, _ = server.accept()
logger.info(f'accepted conn. {up_client_socket.getpeername()}')
logger.info(f"accepted conn. {up_client_socket.getpeername()}")

with self.lock:
self.connected_sockets.add(up_client_socket)

# Register socket for receiving data
self.selector.register(up_client_socket, selectors.EVENT_READ, self._receive_from_up_client)
self.selector.register(
up_client_socket,
selectors.EVENT_READ,
self._receive_from_up_client,
)

def _receive_from_up_client(self, up_client_socket: socket.socket):
"""
Expand All @@ -101,7 +109,7 @@ def _receive_from_up_client(self, up_client_socket: socket.socket):

logger.info(f"received data: {recv_data}")
self._flood_to_sockets(recv_data)
except:
except Exception:
logger.error("Received error while reading data from up-client")
self._close_connected_socket(up_client_socket)

Expand All @@ -113,11 +121,13 @@ def _flood_to_sockets(self, data: bytes):
:param data: The data to be sent.
"""
# for up_client_socket in self.connected_sockets.copy(): # copy() to avoid RuntimeError
for up_client_socket in self.connected_sockets:
for up_client_socket in self.connected_sockets:
try:
up_client_socket.sendall(data)
except ConnectionAbortedError as e:
logger.error(f"Error sending data to {up_client_socket.getpeername()}: {e}")
logger.error(
f"Error sending data to {up_client_socket.getpeername()}: {e}"
)
self._close_connected_socket(up_client_socket)

def listen_for_client_connections(self):
Expand All @@ -139,10 +149,10 @@ def _close_connected_socket(self, up_client_socket: socket.socket):
logger.info(f"closing socket {up_client_socket.getpeername()}")
with self.lock:
self.connected_sockets.remove(up_client_socket)

self.selector.unregister(up_client_socket)
up_client_socket.close()

def close(self):
self.dispatcher_exit = True
for utransport_socket in self.connected_sockets.copy():
Expand Down
143 changes: 98 additions & 45 deletions test_agent/python/testagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@
from google.protobuf.message import Message
from google.protobuf.descriptor import FieldDescriptor
from google.protobuf.wrappers_pb2 import StringValue
from uprotocol.proto.uattributes_pb2 import UPriority, UMessageType, CallOptions
from uprotocol.proto.uattributes_pb2 import (
UPriority,
UMessageType,
CallOptions,
)
from uprotocol.proto.umessage_pb2 import UMessage
from uprotocol.proto.upayload_pb2 import UPayload, UPayloadFormat
from uprotocol.proto.ustatus_pb2 import UStatus
Expand All @@ -53,8 +57,10 @@
sys.path.append("../")
from up_client_socket.python.socket_transport import SocketUTransport

logging.basicConfig(format='%(levelname)s| %(filename)s:%(lineno)s %(message)s')
logger = logging.getLogger('File:Line# Debugger')
logging.basicConfig(
format="%(levelname)s| %(filename)s:%(lineno)s %(message)s"
)
logger = logging.getLogger("File:Line# Debugger")
logger.setLevel(logging.DEBUG)


Expand All @@ -63,16 +69,26 @@ class SocketUListener(UListener):
def on_receive(self, umsg: UMessage) -> None:
logger.info("Listener received")
if umsg.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST:
attributes = UAttributesBuilder.response(umsg.attributes.sink, umsg.attributes.source,
UPriority.UPRIORITY_CS4, umsg.attributes.id).build()
attributes = UAttributesBuilder.response(
umsg.attributes.sink,
umsg.attributes.source,
UPriority.UPRIORITY_CS4,
umsg.attributes.id,
).build()
any_obj = any_pb2.Any()
any_obj.Pack(StringValue(value="SuccessRPCResponse"))
res_msg = UMessage(attributes=attributes, payload=UPayload(value=any_obj.SerializeToString(),
format=UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY))
res_msg = UMessage(
attributes=attributes,
payload=UPayload(
value=any_obj.SerializeToString(),
format=UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
),
)
transport.send(res_msg)
else:
send_to_test_manager(umsg, CONSTANTS.RESPONSE_ON_RECEIVE)


def message_to_dict(message: Message) -> Dict[str, Any]:
"""Converts protobuf Message to Dict and keeping respective data types
Expand All @@ -83,49 +99,61 @@ def message_to_dict(message: Message) -> Dict[str, Any]:
Dict[str, Any]: Dict/JSON version of the Message
"""
result: Dict[str, Any] = {}

all_fields: List[FieldDescriptor] = message.DESCRIPTOR.fields
for field in all_fields:

value = getattr(message, field.name, field.default_value)
if isinstance(value, bytes):
value: str = value.decode()

if hasattr(value, 'DESCRIPTOR'):
if hasattr(value, "DESCRIPTOR"):
result[field.name] = message_to_dict(value)
elif field.label == FieldDescriptor.LABEL_REPEATED:
repeated = []
for sub_msg in value:
if hasattr(sub_msg, 'DESCRIPTOR'):
if hasattr(sub_msg, "DESCRIPTOR"):
repeated.append(message_to_dict(sub_msg))
else:
repeated.append(value)
result[field.name] = repeated

elif field.label == FieldDescriptor.LABEL_REQUIRED or field.label == FieldDescriptor.LABEL_OPTIONAL:
elif (
field.label == FieldDescriptor.LABEL_REQUIRED
or field.label == FieldDescriptor.LABEL_OPTIONAL
):
result[field.name] = value

return result


def send_to_test_manager(response: Union[Message, str, dict], action: str, received_test_id: str = ""):
def send_to_test_manager(
response: Union[Message, str, dict],
action: str,
received_test_id: str = "",
):
if not isinstance(response, (dict, str)):
# converts protobuf to dict
response = message_to_dict(response)

# Create response as json/dict
response_dict = {'data': response, 'action': action, 'ue': 'python', 'test_id': received_test_id}
response_dict = json.dumps(response_dict).encode('utf-8')
response_dict = {
"data": response,
"action": action,
"ue": "python",
"test_id": received_test_id,
}
response_dict = json.dumps(response_dict).encode("utf-8")
ta_socket.sendall(response_dict)
logger.info(f"Sent to TM {response_dict}")


def dict_to_proto(parent_json_obj, parent_proto_obj):
def populate_fields(json_obj, proto_obj):
for field_name, value in json_obj.items():
if 'BYTES:' in value:
value = value.replace('BYTES:', '')
value = value.encode('utf-8')
if "BYTES:" in value:
value = value.replace("BYTES:", "")
value = value.encode("utf-8")
if hasattr(proto_obj, field_name):
if isinstance(value, dict):
# Recursively update the nested message object
Expand All @@ -137,7 +165,7 @@ def populate_fields(json_obj, proto_obj):
value = int(value)
elif field_type == float:
value = float(value)
except:
except Exception:
pass
setattr(proto_obj, field_name, value)
return proto_obj
Expand Down Expand Up @@ -167,41 +195,59 @@ def handle_unregister_listener_command(json_msg):
def handle_invoke_method_command(json_msg):
uri = dict_to_proto(json_msg["data"], UUri())
payload = dict_to_proto(json_msg["data"]["payload"], UPayload())
res_future: Future = transport.invoke_method(uri, payload, CallOptions(ttl=10000))
res_future: Future = transport.invoke_method(
uri, payload, CallOptions(ttl=10000)
)

def handle_response(message):
message: Message = message.result()
send_to_test_manager(message, CONSTANTS.INVOKE_METHOD_COMMAND, received_test_id=json_msg["test_id"])
send_to_test_manager(
message,
CONSTANTS.INVOKE_METHOD_COMMAND,
received_test_id=json_msg["test_id"],
)

res_future.add_done_callback(handle_response)


def send_longserialize_uuri(json_msg: Dict[str, Any]):
uri: UUri = dict_to_proto(json_msg["data"], UUri())
serialized_uuri: str = LongUriSerializer().serialize(uri)
send_to_test_manager(serialized_uuri, CONSTANTS.SERIALIZE_URI, received_test_id=json_msg["test_id"])
send_to_test_manager(
serialized_uuri,
CONSTANTS.SERIALIZE_URI,
received_test_id=json_msg["test_id"],
)


def send_longdeserialize_uri(json_msg: Dict[str, Any]):
uuri: UUri = LongUriSerializer().deserialize(json_msg["data"])
send_to_test_manager(uuri, CONSTANTS.DESERIALIZE_URI, received_test_id=json_msg["test_id"])
send_to_test_manager(
uuri, CONSTANTS.DESERIALIZE_URI, received_test_id=json_msg["test_id"]
)


def send_longdeserialize_uuid(json_msg: Dict[str, Any]):
uuid: UUID = LongUuidSerializer().deserialize(json_msg["data"])
send_to_test_manager(uuid, CONSTANTS.DESERIALIZE_UUID, received_test_id=json_msg["test_id"])
send_to_test_manager(
uuid, CONSTANTS.DESERIALIZE_UUID, received_test_id=json_msg["test_id"]
)


def send_longserialize_uuid(json_msg: Dict[str, Any]):
uuid: UUID = dict_to_proto(json_msg["data"], UUID())
serialized_uuid: str = LongUuidSerializer().serialize(uuid)
send_to_test_manager(serialized_uuid, CONSTANTS.SERIALIZE_UUID, received_test_id=json_msg["test_id"])
send_to_test_manager(
serialized_uuid,
CONSTANTS.SERIALIZE_UUID,
received_test_id=json_msg["test_id"],
)


def handle_uri_validate_command(json_msg):
val_type = json_msg["data"]["type"]
uri = LongUriSerializer().deserialize(json_msg["data"].get("uri"))

validator_func = {
"uri": UriValidator.validate,
"rpc_response": UriValidator.validate_rpc_response,
Expand All @@ -211,7 +257,7 @@ def handle_uri_validate_command(json_msg):
"is_micro_form": UriValidator.is_micro_form,
"is_long_form_uuri": UriValidator.is_long_form,
"is_long_form_uauthority": UriValidator.is_long_form,
"is_local": UriValidator.is_local
"is_local": UriValidator.is_local,
}.get(val_type)

if validator_func:
Expand All @@ -222,19 +268,24 @@ def handle_uri_validate_command(json_msg):
else:
result = str(status.is_success())
message = status.get_message()
send_to_test_manager({"result": result, "message": message}, CONSTANTS.VALIDATE_URI, received_test_id=json_msg["test_id"])


action_handlers = {CONSTANTS.SEND_COMMAND: handle_send_command,
CONSTANTS.REGISTER_LISTENER_COMMAND: handle_register_listener_command,
CONSTANTS.UNREGISTER_LISTENER_COMMAND: handle_unregister_listener_command,
CONSTANTS.INVOKE_METHOD_COMMAND: handle_invoke_method_command,
CONSTANTS.SERIALIZE_URI: send_longserialize_uuri,
CONSTANTS.DESERIALIZE_URI: send_longdeserialize_uri,
CONSTANTS.SERIALIZE_UUID: send_longserialize_uuid,
CONSTANTS.DESERIALIZE_UUID: send_longdeserialize_uuid,
CONSTANTS.VALIDATE_URI: handle_uri_validate_command,
}
send_to_test_manager(
{"result": result, "message": message},
CONSTANTS.VALIDATE_URI,
received_test_id=json_msg["test_id"],
)


action_handlers = {
CONSTANTS.SEND_COMMAND: handle_send_command,
CONSTANTS.REGISTER_LISTENER_COMMAND: handle_register_listener_command,
CONSTANTS.UNREGISTER_LISTENER_COMMAND: handle_unregister_listener_command,
CONSTANTS.INVOKE_METHOD_COMMAND: handle_invoke_method_command,
CONSTANTS.SERIALIZE_URI: send_longserialize_uuri,
CONSTANTS.DESERIALIZE_URI: send_longdeserialize_uri,
CONSTANTS.SERIALIZE_UUID: send_longserialize_uuid,
CONSTANTS.DESERIALIZE_UUID: send_longdeserialize_uuid,
CONSTANTS.VALIDATE_URI: handle_uri_validate_command,
}


def process_message(json_data):
Expand All @@ -245,7 +296,9 @@ def process_message(json_data):

# For UTransport interface methods
if status is not None:
send_to_test_manager(status, action, received_test_id=json_data["test_id"])
send_to_test_manager(
status, action, received_test_id=json_data["test_id"]
)


def receive_from_tm():
Expand All @@ -254,16 +307,16 @@ def receive_from_tm():
if not recv_data or recv_data == b"":
return
# Deserialize the JSON data
json_data = json.loads(recv_data.decode('utf-8'))
logger.info('Received data from test manager: %s', json_data)
json_data = json.loads(recv_data.decode("utf-8"))
logger.info("Received data from test manager: %s", json_data)
process_message(json_data)


if __name__ == '__main__':
if __name__ == "__main__":
listener = SocketUListener()
transport = SocketUTransport()
ta_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ta_socket.connect(CONSTANTS.TEST_MANAGER_ADDR)
thread = Thread(target=receive_from_tm)
thread.start()
send_to_test_manager({'SDK_name': "python"}, "initialize")
send_to_test_manager({"SDK_name": "python"}, "initialize")
Loading

0 comments on commit 9c071b0

Please sign in to comment.