From ec84af4a5adfeac65b4d87d7c0dc9d24ef1459af Mon Sep 17 00:00:00 2001 From: vdahiya12 <67608553+vdahiya12@users.noreply.github.com> Date: Thu, 16 Jun 2022 12:15:16 -0700 Subject: [PATCH] [sonic-ycabled] fix grpc logic for timeout,cli HWSTATUS value retrival logic for active-active cable (#264) Signed-off-by: vaibhav-dahiya vdahiya@microsoft.com This PR adds some improvement logic to ycabled daemon correct the loopback IP's for active-active cable type for libra gRPC read_side establishment improves the code for timeout value injection for different RPC's for gRPC for cli HW_STATUS the daemon tries to use cached old value, if not present try to do RPC for getting ForwardingState with timeout of 0.1 ms Description Motivation and Context Required for gRPC logic improvement in few active-active scenario's How Has This Been Tested? UT and deploying the changes in actual testbed Signed-off-by: vaibhav-dahiya --- sonic-ycabled/tests/test_y_cable_helper.py | 62 ++++++++++++++++++- .../ycable/ycable_utilities/y_cable_helper.py | 58 +++++++++++------ 2 files changed, 100 insertions(+), 20 deletions(-) diff --git a/sonic-ycabled/tests/test_y_cable_helper.py b/sonic-ycabled/tests/test_y_cable_helper.py index bf7d187e4ac9..bb66be89a987 100644 --- a/sonic-ycabled/tests/test_y_cable_helper.py +++ b/sonic-ycabled/tests/test_y_cable_helper.py @@ -4699,7 +4699,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_no_instance(self, mock_s @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-standby"))) @patch('os.path.isfile', MagicMock(return_value=True)) @patch('time.sleep', MagicMock(return_value=True)) - def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto(self, mock_swsscommon_table, platform_sfputil): + def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto_active_standby(self, mock_swsscommon_table, platform_sfputil): mock_table = MagicMock() mock_table.get = MagicMock( @@ -4747,6 +4747,66 @@ def get_mux_direction(self): fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) assert(rc == None) + @patch('swsscommon.swsscommon.Table') + @patch('ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil') + @patch('ycable.ycable_utilities.y_cable_helper.get_ycable_physical_port_from_logical_port', MagicMock(return_value=(0))) + @patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('ycable.ycable_utilities.y_cable_helper.y_cable_wrapper_get_presence', MagicMock(return_value=True)) + @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-active"))) + def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto_active_active_none(self, mock_swsscommon_table, platform_sfputil): + + mock_table = MagicMock() + mock_table.get = MagicMock( + side_effect=[(True, (('state', "auto"), ("soc_ipv4", "192.168.0.1/32"))), (True, (('index', 2), ))]) + mock_swsscommon_table.return_value = mock_table + + xcvrd_config_hwmode_state_cmd_sts_tbl = mock_swsscommon_table + xcvrd_config_hwmode_state_rsp_tbl = mock_swsscommon_table + xcvrd_show_hwmode_dir_cmd_sts_tbl = mock_swsscommon_table + xcvrd_show_hwmode_dir_rsp_tbl = mock_swsscommon_table + xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table + port_tbl = mock_swsscommon_table + asic_index = 0 + port = "Ethernet0" + platform_sfputil.get_asic_id_for_logical_port = 0 + fvp = {"state": "active"} + swsscommon.Table.return_value.get.return_value = ( + True, {"read_side": "2", "state": "active"}) + + rc = handle_show_hwmode_state_cmd_arg_tbl_notification( + fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + assert(rc == None) + + @patch('swsscommon.swsscommon.Table') + @patch('ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil') + @patch('ycable.ycable_utilities.y_cable_helper.get_ycable_physical_port_from_logical_port', MagicMock(return_value=(0))) + @patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('ycable.ycable_utilities.y_cable_helper.y_cable_wrapper_get_presence', MagicMock(return_value=True)) + @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-active"))) + def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto_active_active(self, mock_swsscommon_table, platform_sfputil): + + mock_table = MagicMock() + mock_table.get = MagicMock( + side_effect=[(True, (('state', "auto"), ("soc_ipv4", "192.168.0.1/32"))), (True, (('index', 2), ))]) + mock_swsscommon_table.return_value = mock_table + + xcvrd_config_hwmode_state_cmd_sts_tbl = mock_swsscommon_table + xcvrd_config_hwmode_state_rsp_tbl = mock_swsscommon_table + xcvrd_show_hwmode_dir_cmd_sts_tbl = mock_swsscommon_table + xcvrd_show_hwmode_dir_rsp_tbl = mock_swsscommon_table + xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table + port_tbl = mock_swsscommon_table + asic_index = 0 + port = "Ethernet0" + platform_sfputil.get_asic_id_for_logical_port = 0 + fvp = {"state": "active"} + swsscommon.Table.return_value.get.return_value = ( + False, {"read_side": "2", "state": "active"}) + + rc = handle_show_hwmode_state_cmd_arg_tbl_notification( + fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + assert(rc == -1) + def test_retry_setup_grpc_channel_for_port_incorrect(self): status = True diff --git a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py index da247650603a..027bcae851d4 100644 --- a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py +++ b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py @@ -29,6 +29,10 @@ SELECT_TIMEOUT = 1000 +#gRPC timeouts for RPC +QUERY_ADMIN_FORWARDING_TIMEOUT = 0.1 +SET_ADMIN_FORWARDING_TIMEOUT = 0.5 + y_cable_platform_sfputil = None y_cable_platform_chassis = None y_cable_is_platform_vs = None @@ -46,8 +50,8 @@ LOOPBACK_INTERFACE_T0 = "10.212.64.1/32" LOOPBACK_INTERFACE_LT0 = "10.212.64.2/32" -LOOPBACK_INTERFACE_T0_NIC = "10.1.0.37/32" -LOOPBACK_INTERFACE_LT0_NIC = "10.1.0.38/32" +LOOPBACK_INTERFACE_T0_NIC = "10.1.0.38/32" +LOOPBACK_INTERFACE_LT0_NIC = "10.1.0.39/32" # rename and put in right place # port id 0 -> maps to T0 # port id 1 -> maps to LT0 @@ -321,6 +325,9 @@ def wrapper(*args, **kwargs): def retry_setup_grpc_channel_for_port(port, asic_index): + global grpc_port_stubs + global grpc_port_channels + config_db, port_tbl = {}, {} namespaces = multi_asic.get_front_end_namespaces() for namespace in namespaces: @@ -354,7 +361,6 @@ def retry_setup_grpc_channel_for_port(port, asic_index): return True def setup_grpc_channel_for_port(port, soc_ip): - "TODO make these configurable like RESTAPI" """ root_cert = open('/etc/sonic/credentials/ca-chain-bundle.cert.pem', 'rb').read() key = open('/etc/sonic/credentials/client.key.pem', 'rb').read() @@ -386,9 +392,9 @@ def setup_grpc_channel_for_port(port, soc_ip): stub = None if stub is None: - helper_logger.log_warning("stub was not setup gRPC ip {} port {}, no gRPC server running ".format(soc_ip, port)) + helper_logger.log_warning("stub was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port)) if channel is None: - helper_logger.log_warning("channel was not setup gRPC ip {} port {}, no gRPC server running".format(soc_ip, port)) + helper_logger.log_warning("channel was not setup for gRPC soc ip {} port {}, no gRPC server running ?".format(soc_ip, port)) return channel, stub @@ -536,10 +542,11 @@ def setup_grpc_channels(stop_event): "Could not retreive port inside config_db PORT table {} for gRPC channel initiation".format(logical_port_name)) -def try_grpc(callback, *args, **kwargs): +def try_grpc(callback, rpc_timeout, *args, **kwargs): """ Handy function to invoke the callback and catch NotImplementedError :param callback: Callback to be invoked + :param rpc_timeout: timeout for RPC in seconds :param args: Arguments to be passed to callback :param kwargs: Default return value if exception occur :return: Default return value if exception occur else return value of the callback @@ -547,7 +554,11 @@ def try_grpc(callback, *args, **kwargs): return_val = True try: - resp = callback(*args) + if rpc_timeout is not None: + resp = callback(*args, timeout=rpc_timeout) + else: + resp = callback(*args) + if resp is None: return_val = False except grpc.RpcError as e: @@ -557,7 +568,9 @@ def try_grpc(callback, *args, **kwargs): elif e.code() == grpc.StatusCode.UNAVAILABLE: helper_logger.log_notice("rpc unavailable for port= {}".format(str(e.code()))) elif e.code() == grpc.StatusCode.INVALID_ARGUMENT: - helper_logger.log_notice("rpc invalid for port= {}".format(str(e.code()))) + helper_logger.log_notice("rpc invalid arguement for port= {}".format(str(e.code()))) + elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: + helper_logger.log_notice("rpc timeout exceeded for port= {} timeout = {}".format(str(e.code()), rpc_timeout)) else: helper_logger.log_notice("rpc exception error for port= {}".format(str(e.code()))) resp = None @@ -2919,9 +2932,15 @@ def handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, port_tbl, xcvrd_show_ helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table while responding to cli cmd show mux status {}".format( port, hw_mux_cable_tbl[asic_index].getTableName())) set_result_and_delete_port('state', 'unknown', xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + return -1 mux_port_dict = dict(fv) read_side = mux_port_dict.get("read_side", None) + state = mux_port_dict.get("state", None) + if state is not None: + set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + return + helper_logger.log_debug("Y_CABLE_DEBUG:before invoking RPC fwd_state read_side = {}".format(read_side)) # TODO state only for dummy value in this request MSG remove this request = linkmgr_grpc_driver_pb2.AdminRequest(portid=[int(read_side), 1 - int(read_side)], state=[0, 0]) @@ -2930,16 +2949,12 @@ def handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, port_tbl, xcvrd_show_ stub = grpc_port_stubs.get(port, None) if stub is None: - helper_logger.log_notice("stub is None for getting forwarding state RPC port {}".format(port)) - retry_setup_grpc_channel_for_port(port, asic_index) - stub = grpc_port_stubs.get(port, None) - if stub is None: - helper_logger.log_warning( - "stub was None for performing cli fwd mux RPC port {}, setting it up again did not work".format(port)) + # no need to retry setup channels for mux cli hw mode command + helper_logger.log_warning("stub is None for getting forwarding state RPC port for cli query {}".format(port)) set_result_and_delete_port('state', 'unknown', xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) return - ret, response = try_grpc(stub.QueryAdminForwardingPortState, request, timeout=0.1) + ret, response = try_grpc(stub.QueryAdminForwardingPortState, QUERY_ADMIN_FORWARDING_TIMEOUT , request) (self_state, peer_state) = parse_grpc_response_forwarding_state(ret, response, read_side) state = self_state @@ -3041,7 +3056,7 @@ def handle_fwd_state_command_grpc_notification(fvp_m, hw_mux_cable_tbl, fwd_stat peer_state = "unknown" stub = grpc_port_stubs.get(port, None) if stub is None: - helper_logger.log_notice("stub is None for getting forwarding state RPC port {}".format(port)) + helper_logger.log_notice("stub is None for getting admin port forwarding state RPC port {}".format(port)) retry_setup_grpc_channel_for_port(port, asic_index) stub = grpc_port_stubs.get(port, None) if stub is None: @@ -3052,7 +3067,7 @@ def handle_fwd_state_command_grpc_notification(fvp_m, hw_mux_cable_tbl, fwd_stat fwd_state_response_tbl[asic_index].set(port, fvs_updated) return - ret, response = try_grpc(stub.QueryAdminForwardingPortState, request, timeout=0.1) + ret, response = try_grpc(stub.QueryAdminForwardingPortState, QUERY_ADMIN_FORWARDING_TIMEOUT, request) (self_state, peer_state) = parse_grpc_response_forwarding_state(ret, response, read_side) if response is not None: @@ -3117,7 +3132,7 @@ def handle_hw_mux_cable_table_grpc_notification(fvp, hw_mux_cable_tbl, asic_inde state_req = 0 helper_logger.log_notice( - "calling RPC for hw mux_cable set state state peer = {} portid {} Ethernet port".format(peer, port)) + "calling RPC for hw mux_cable set state state peer = {} portid Ethernet port {}".format(peer, port)) request = linkmgr_grpc_driver_pb2.AdminRequest(portid=[curr_read_side], state=[state_req]) @@ -3131,7 +3146,7 @@ def handle_hw_mux_cable_table_grpc_notification(fvp, hw_mux_cable_tbl, asic_inde "stub was None for performing hw mux RPC port {}, setting it up again did not work".format(port)) return - ret, response = try_grpc(stub.SetAdminForwardingPortState, request, timeout=0.1) + ret, response = try_grpc(stub.SetAdminForwardingPortState, SET_ADMIN_FORWARDING_TIMEOUT, request) if response is not None: # Debug only, remove this section once Server side is Finalized hw_response_port_ids = response.portid @@ -3737,6 +3752,11 @@ def task_cli_worker(self): fvp_dict = dict(fvp_m) + (status, cable_type) = check_mux_cable_port_type(port, port_tbl, asic_index) + + if status is False or cable_type != "active-standby": + break + if "state" in fvp_dict: # check if xcvrd got a probe command port_instance = get_ycable_port_instance_from_logical_port(port)