diff --git a/sonic-ycabled/tests/test_y_cable_helper.py b/sonic-ycabled/tests/test_y_cable_helper.py index bf7d187e4ac9..bb66be89a987 100644 --- a/sonic-ycabled/tests/test_y_cable_helper.py +++ b/sonic-ycabled/tests/test_y_cable_helper.py @@ -4699,7 +4699,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_no_instance(self, mock_s @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-standby"))) @patch('os.path.isfile', MagicMock(return_value=True)) @patch('time.sleep', MagicMock(return_value=True)) - def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto(self, mock_swsscommon_table, platform_sfputil): + def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto_active_standby(self, mock_swsscommon_table, platform_sfputil): mock_table = MagicMock() mock_table.get = MagicMock( @@ -4747,6 +4747,66 @@ def get_mux_direction(self): fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) assert(rc == None) + @patch('swsscommon.swsscommon.Table') + @patch('ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil') + @patch('ycable.ycable_utilities.y_cable_helper.get_ycable_physical_port_from_logical_port', MagicMock(return_value=(0))) + @patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('ycable.ycable_utilities.y_cable_helper.y_cable_wrapper_get_presence', MagicMock(return_value=True)) + @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-active"))) + def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto_active_active_none(self, mock_swsscommon_table, platform_sfputil): + + mock_table = MagicMock() + mock_table.get = MagicMock( + side_effect=[(True, (('state', "auto"), ("soc_ipv4", "192.168.0.1/32"))), (True, (('index', 2), ))]) + mock_swsscommon_table.return_value = mock_table + + xcvrd_config_hwmode_state_cmd_sts_tbl = mock_swsscommon_table + xcvrd_config_hwmode_state_rsp_tbl = mock_swsscommon_table + xcvrd_show_hwmode_dir_cmd_sts_tbl = mock_swsscommon_table + xcvrd_show_hwmode_dir_rsp_tbl = mock_swsscommon_table + xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table + port_tbl = mock_swsscommon_table + asic_index = 0 + port = "Ethernet0" + platform_sfputil.get_asic_id_for_logical_port = 0 + fvp = {"state": "active"} + swsscommon.Table.return_value.get.return_value = ( + True, {"read_side": "2", "state": "active"}) + + rc = handle_show_hwmode_state_cmd_arg_tbl_notification( + fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + assert(rc == None) + + @patch('swsscommon.swsscommon.Table') + @patch('ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil') + @patch('ycable.ycable_utilities.y_cable_helper.get_ycable_physical_port_from_logical_port', MagicMock(return_value=(0))) + @patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('ycable.ycable_utilities.y_cable_helper.y_cable_wrapper_get_presence', MagicMock(return_value=True)) + @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-active"))) + def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto_active_active(self, mock_swsscommon_table, platform_sfputil): + + mock_table = MagicMock() + mock_table.get = MagicMock( + side_effect=[(True, (('state', "auto"), ("soc_ipv4", "192.168.0.1/32"))), (True, (('index', 2), ))]) + mock_swsscommon_table.return_value = mock_table + + xcvrd_config_hwmode_state_cmd_sts_tbl = mock_swsscommon_table + xcvrd_config_hwmode_state_rsp_tbl = mock_swsscommon_table + xcvrd_show_hwmode_dir_cmd_sts_tbl = mock_swsscommon_table + xcvrd_show_hwmode_dir_rsp_tbl = mock_swsscommon_table + xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table + port_tbl = mock_swsscommon_table + asic_index = 0 + port = "Ethernet0" + platform_sfputil.get_asic_id_for_logical_port = 0 + fvp = {"state": "active"} + swsscommon.Table.return_value.get.return_value = ( + False, {"read_side": "2", "state": "active"}) + + rc = handle_show_hwmode_state_cmd_arg_tbl_notification( + fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + assert(rc == -1) + def test_retry_setup_grpc_channel_for_port_incorrect(self): status = True diff --git a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py index da247650603a..027bcae851d4 100644 --- a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py +++ b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py @@ -29,6 +29,10 @@ SELECT_TIMEOUT = 1000 +#gRPC timeouts for RPC +QUERY_ADMIN_FORWARDING_TIMEOUT = 0.1 +SET_ADMIN_FORWARDING_TIMEOUT = 0.5 + y_cable_platform_sfputil = None y_cable_platform_chassis = None y_cable_is_platform_vs = None @@ -46,8 +50,8 @@ LOOPBACK_INTERFACE_T0 = "10.212.64.1/32" LOOPBACK_INTERFACE_LT0 = "10.212.64.2/32" -LOOPBACK_INTERFACE_T0_NIC = "10.1.0.37/32" -LOOPBACK_INTERFACE_LT0_NIC = "10.1.0.38/32" +LOOPBACK_INTERFACE_T0_NIC = "10.1.0.38/32" +LOOPBACK_INTERFACE_LT0_NIC = "10.1.0.39/32" # rename and put in right place # port id 0 -> maps to T0 # port id 1 -> maps to LT0 @@ -321,6 +325,9 @@ def wrapper(*args, **kwargs): def retry_setup_grpc_channel_for_port(port, asic_index): + global grpc_port_stubs + global grpc_port_channels + config_db, port_tbl = {}, {} namespaces = multi_asic.get_front_end_namespaces() for namespace in namespaces: @@ -354,7 +361,6 @@ def retry_setup_grpc_channel_for_port(port, asic_index): return True def setup_grpc_channel_for_port(port, soc_ip): - "TODO make these configurable like RESTAPI" """ root_cert = open('/etc/sonic/credentials/ca-chain-bundle.cert.pem', 'rb').read() key = open('/etc/sonic/credentials/client.key.pem', 'rb').read() @@ -386,9 +392,9 @@ def setup_grpc_channel_for_port(port, soc_ip): stub = None if stub is None: - helper_logger.log_warning("stub was not setup gRPC ip {} port {}, no gRPC server running ".format(soc_ip, port)) + helper_logger.log_warning("stub was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port)) if channel is None: - helper_logger.log_warning("channel was not setup gRPC ip {} port {}, no gRPC server running".format(soc_ip, port)) + helper_logger.log_warning("channel was not setup for gRPC soc ip {} port {}, no gRPC server running ?".format(soc_ip, port)) return channel, stub @@ -536,10 +542,11 @@ def setup_grpc_channels(stop_event): "Could not retreive port inside config_db PORT table {} for gRPC channel initiation".format(logical_port_name)) -def try_grpc(callback, *args, **kwargs): +def try_grpc(callback, rpc_timeout, *args, **kwargs): """ Handy function to invoke the callback and catch NotImplementedError :param callback: Callback to be invoked + :param rpc_timeout: timeout for RPC in seconds :param args: Arguments to be passed to callback :param kwargs: Default return value if exception occur :return: Default return value if exception occur else return value of the callback @@ -547,7 +554,11 @@ def try_grpc(callback, *args, **kwargs): return_val = True try: - resp = callback(*args) + if rpc_timeout is not None: + resp = callback(*args, timeout=rpc_timeout) + else: + resp = callback(*args) + if resp is None: return_val = False except grpc.RpcError as e: @@ -557,7 +568,9 @@ def try_grpc(callback, *args, **kwargs): elif e.code() == grpc.StatusCode.UNAVAILABLE: helper_logger.log_notice("rpc unavailable for port= {}".format(str(e.code()))) elif e.code() == grpc.StatusCode.INVALID_ARGUMENT: - helper_logger.log_notice("rpc invalid for port= {}".format(str(e.code()))) + helper_logger.log_notice("rpc invalid arguement for port= {}".format(str(e.code()))) + elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: + helper_logger.log_notice("rpc timeout exceeded for port= {} timeout = {}".format(str(e.code()), rpc_timeout)) else: helper_logger.log_notice("rpc exception error for port= {}".format(str(e.code()))) resp = None @@ -2919,9 +2932,15 @@ def handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, port_tbl, xcvrd_show_ helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table while responding to cli cmd show mux status {}".format( port, hw_mux_cable_tbl[asic_index].getTableName())) set_result_and_delete_port('state', 'unknown', xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + return -1 mux_port_dict = dict(fv) read_side = mux_port_dict.get("read_side", None) + state = mux_port_dict.get("state", None) + if state is not None: + set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + return + helper_logger.log_debug("Y_CABLE_DEBUG:before invoking RPC fwd_state read_side = {}".format(read_side)) # TODO state only for dummy value in this request MSG remove this request = linkmgr_grpc_driver_pb2.AdminRequest(portid=[int(read_side), 1 - int(read_side)], state=[0, 0]) @@ -2930,16 +2949,12 @@ def handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, port_tbl, xcvrd_show_ stub = grpc_port_stubs.get(port, None) if stub is None: - helper_logger.log_notice("stub is None for getting forwarding state RPC port {}".format(port)) - retry_setup_grpc_channel_for_port(port, asic_index) - stub = grpc_port_stubs.get(port, None) - if stub is None: - helper_logger.log_warning( - "stub was None for performing cli fwd mux RPC port {}, setting it up again did not work".format(port)) + # no need to retry setup channels for mux cli hw mode command + helper_logger.log_warning("stub is None for getting forwarding state RPC port for cli query {}".format(port)) set_result_and_delete_port('state', 'unknown', xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) return - ret, response = try_grpc(stub.QueryAdminForwardingPortState, request, timeout=0.1) + ret, response = try_grpc(stub.QueryAdminForwardingPortState, QUERY_ADMIN_FORWARDING_TIMEOUT , request) (self_state, peer_state) = parse_grpc_response_forwarding_state(ret, response, read_side) state = self_state @@ -3041,7 +3056,7 @@ def handle_fwd_state_command_grpc_notification(fvp_m, hw_mux_cable_tbl, fwd_stat peer_state = "unknown" stub = grpc_port_stubs.get(port, None) if stub is None: - helper_logger.log_notice("stub is None for getting forwarding state RPC port {}".format(port)) + helper_logger.log_notice("stub is None for getting admin port forwarding state RPC port {}".format(port)) retry_setup_grpc_channel_for_port(port, asic_index) stub = grpc_port_stubs.get(port, None) if stub is None: @@ -3052,7 +3067,7 @@ def handle_fwd_state_command_grpc_notification(fvp_m, hw_mux_cable_tbl, fwd_stat fwd_state_response_tbl[asic_index].set(port, fvs_updated) return - ret, response = try_grpc(stub.QueryAdminForwardingPortState, request, timeout=0.1) + ret, response = try_grpc(stub.QueryAdminForwardingPortState, QUERY_ADMIN_FORWARDING_TIMEOUT, request) (self_state, peer_state) = parse_grpc_response_forwarding_state(ret, response, read_side) if response is not None: @@ -3117,7 +3132,7 @@ def handle_hw_mux_cable_table_grpc_notification(fvp, hw_mux_cable_tbl, asic_inde state_req = 0 helper_logger.log_notice( - "calling RPC for hw mux_cable set state state peer = {} portid {} Ethernet port".format(peer, port)) + "calling RPC for hw mux_cable set state state peer = {} portid Ethernet port {}".format(peer, port)) request = linkmgr_grpc_driver_pb2.AdminRequest(portid=[curr_read_side], state=[state_req]) @@ -3131,7 +3146,7 @@ def handle_hw_mux_cable_table_grpc_notification(fvp, hw_mux_cable_tbl, asic_inde "stub was None for performing hw mux RPC port {}, setting it up again did not work".format(port)) return - ret, response = try_grpc(stub.SetAdminForwardingPortState, request, timeout=0.1) + ret, response = try_grpc(stub.SetAdminForwardingPortState, SET_ADMIN_FORWARDING_TIMEOUT, request) if response is not None: # Debug only, remove this section once Server side is Finalized hw_response_port_ids = response.portid @@ -3737,6 +3752,11 @@ def task_cli_worker(self): fvp_dict = dict(fvp_m) + (status, cable_type) = check_mux_cable_port_type(port, port_tbl, asic_index) + + if status is False or cable_type != "active-standby": + break + if "state" in fvp_dict: # check if xcvrd got a probe command port_instance = get_ycable_port_instance_from_logical_port(port)