diff --git a/src/ax_interface/util.py b/src/ax_interface/util.py index e5122328b927..afe369115de6 100644 --- a/src/ax_interface/util.py +++ b/src/ax_interface/util.py @@ -1,3 +1,4 @@ +import ipaddress import re from ax_interface import constants @@ -98,10 +99,12 @@ def mac_decimals(mac): """ return tuple(int(h, 16) for h in mac.split(":")) -def ip2tuple_v4(ip): +def ip2byte_tuple(ip): """ - >>> ip2tuple_v4("192.168.1.253") + >>> ip2byte_tuple("192.168.1.253") (192, 168, 1, 253) + >>> ip2byte_tuple("2001:db8::3") + (32, 1, 13, 184, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3) """ - return tuple(int(bs) for bs in str(ip).split('.')) + return tuple(i for i in ipaddress.ip_address(ip).packed) diff --git a/src/sonic_ax_impl/mibs/ieee802_1ab.py b/src/sonic_ax_impl/mibs/ieee802_1ab.py index ac1b3b642db8..852635236a2e 100644 --- a/src/sonic_ax_impl/mibs/ieee802_1ab.py +++ b/src/sonic_ax_impl/mibs/ieee802_1ab.py @@ -8,6 +8,7 @@ from swsssdk import port_util from sonic_ax_impl import mibs, logger from sonic_ax_impl.mibs import Namespace +from ax_interface.util import ip2byte_tuple from ax_interface import MIBMeta, SubtreeMIBEntry, MIBEntry, MIBUpdater, ValueType @@ -317,7 +318,8 @@ def reinit_data(self): mgmt_ip_sub_oid = None for mgmt_ip in self.mgmt_ip_str.split(','): if '.' in mgmt_ip: - mgmt_ip_sub_oid = (addr_subtype_sub_oid, *[int(i) for i in mgmt_ip.split('.')]) + mgmt_ip_tuple = ip2byte_tuple(mgmt_ip) + mgmt_ip_sub_oid = (addr_subtype_sub_oid, *mgmt_ip_tuple) break else: logger.error("Could not find IPv4 address in lldp_loc_man_addr") @@ -494,10 +496,8 @@ def __init__(self): # establish connection to application database. Namespace.connect_all_dbs(self.db_conn, mibs.APPL_DB) self.if_range = [] - self.mgmt_ips = {} self.oid_name_map = {} self.mgmt_oid_name_map = {} - self.mgmt_ip_str = None self.pubsub = [None] * len(self.db_conn) def update_rem_if_mgmt(self, if_oid, if_name): @@ -511,28 +511,28 @@ def update_rem_if_mgmt(self, if_oid, if_name): if len(mgmt_ip_str) == 0: # the peer advertise an emtpy mgmt address return - time_mark = int(lldp_kvs['lldp_rem_time_mark']) - remote_index = int(lldp_kvs['lldp_rem_index']) - subtype = self.get_subtype(mgmt_ip_str) - ip_hex = self.get_ip_hex(mgmt_ip_str, subtype) - if subtype == ManAddrConst.man_addr_subtype_ipv4: - addr_subtype_sub_oid = 4 - mgmt_ip_sub_oid = (addr_subtype_sub_oid, *[int(i) for i in mgmt_ip_str.split('.')]) - elif subtype == ManAddrConst.man_addr_subtype_ipv6: - addr_subtype_sub_oid = 6 - mgmt_ip_sub_oid = (addr_subtype_sub_oid, *[int(i, 16) if i else 0 for i in mgmt_ip_str.split(':')]) - else: - logger.warning("Invalid management IP {}".format(mgmt_ip_str)) - return - self.if_range.append((time_mark, - if_oid, - remote_index, - subtype, - *mgmt_ip_sub_oid)) - - self.mgmt_ips.update({if_name: {"ip_str": mgmt_ip_str, - "addr_subtype": subtype, - "addr_hex": ip_hex}}) + mgmt_ip_set=set() + for mgmt_ip in mgmt_ip_str.split(','): + time_mark = int(lldp_kvs['lldp_rem_time_mark']) + remote_index = int(lldp_kvs['lldp_rem_index']) + subtype = self.get_subtype(mgmt_ip) + if not subtype: + logger.warning("Invalid management IP {}".format(mgmt_ip)) + continue + mgmt_ip_tuple = ip2byte_tuple(mgmt_ip) + if mgmt_ip_tuple in mgmt_ip_set: + continue + elif subtype == ManAddrConst.man_addr_subtype_ipv4: + addr_subtype_sub_oid = 4 + else: + addr_subtype_sub_oid = 16 + mgmt_ip_set.add(mgmt_ip_tuple) + mgmt_ip_sub_oid = (addr_subtype_sub_oid, *mgmt_ip_tuple) + self.if_range.append((time_mark, + if_oid, + remote_index, + subtype, + *mgmt_ip_sub_oid)) except (KeyError, AttributeError) as e: logger.warning("Error updating remote mgmt addr: {}".format(e)) return @@ -562,7 +562,6 @@ def update_data(self): self.pubsub[i] = mibs.get_redis_pubsub(self.db_conn[i], self.db_conn[i].APPL_DB, pattern) self._update_per_namespace_data(self.pubsub[i]) - def reinit_data(self): """ Subclass reinit data routine. @@ -577,7 +576,6 @@ def reinit_data(self): Namespace.connect_all_dbs(self.db_conn, mibs.APPL_DB) self.if_range = [] - self.mgmt_ips = {} for if_oid, if_name in self.oid_name_map.items(): self.update_rem_if_mgmt(if_oid, if_name) @@ -588,25 +586,9 @@ def get_next(self, sub_id): return self.if_range[right] def lookup(self, sub_id, callable): - if len(sub_id) == 0: - return None - sub_id = sub_id[1] - if sub_id not in self.oid_name_map: - return None - if_name = self.oid_name_map[sub_id] - if if_name not in self.mgmt_ips: - # no data for this interface + if sub_id not in self.if_range: return None - return callable(sub_id, if_name) - - def get_ip_hex(self, mgmt_ip_str, subtype): - if subtype == ManAddrConst.man_addr_subtype_ipv4: - hex_ip = " ".join([format(int(i), '02X') for i in mgmt_ip_str.split('.')]) - elif subtype == ManAddrConst.man_addr_subtype_ipv6: - hex_ip = " ".join([format(int(i, 16), 'x') if i else "0" for i in mgmt_ip_str.split(':')]) - else: - hex_ip = None - return hex_ip + return callable(sub_id) def get_subtype(self, ip_str): try: @@ -623,24 +605,14 @@ def get_subtype(self, ip_str): logger.warning("Invalid mgmt IP {}".format(ip_str)) return None - def man_addr_subtype(self, sub_id, if_name): - return self.mgmt_ips[if_name]['addr_subtype'] - - def man_addr(self, sub_id, if_name): - """ - :param sub_id: - :return: MGMT IP in HEX - """ - return self.mgmt_ips[if_name]['addr_hex'] - @staticmethod - def man_addr_if_subtype(sub_id, _): return ManAddrConst.man_addr_if_subtype + def man_addr_if_subtype(sub_id): return ManAddrConst.man_addr_if_subtype @staticmethod - def man_addr_if_id(sub_id, _): return ManAddrConst.man_addr_if_id + def man_addr_if_id(sub_id): return ManAddrConst.man_addr_if_id @staticmethod - def man_addr_OID(sub_id, _): return ManAddrConst.man_addr_oid + def man_addr_OID(sub_id): return ManAddrConst.man_addr_oid class LLDPLocalSystemData(metaclass=MIBMeta, prefix='.1.0.8802.1.1.2.1.3'): diff --git a/src/sonic_ax_impl/mibs/ietf/rfc1213.py b/src/sonic_ax_impl/mibs/ietf/rfc1213.py index 75238b054e22..35c6e1f0c2a7 100644 --- a/src/sonic_ax_impl/mibs/ietf/rfc1213.py +++ b/src/sonic_ax_impl/mibs/ietf/rfc1213.py @@ -8,7 +8,7 @@ from sonic_ax_impl.mibs import Namespace from ax_interface.mib import MIBMeta, ValueType, MIBUpdater, MIBEntry, SubtreeMIBEntry, OverlayAdpaterMIBEntry, OidMIBEntry from ax_interface.encodings import ObjectIdentifier -from ax_interface.util import mac_decimals, ip2tuple_v4 +from ax_interface.util import mac_decimals, ip2byte_tuple @unique class DbTables(int, Enum): @@ -99,7 +99,7 @@ def _update_arp_info(self, dev, mac, ip): # if MAC is all zero #if not any(mac): continue - iptuple = ip2tuple_v4(ip) + iptuple = ip2byte_tuple(ip) subid = (if_index,) + iptuple self.arp_dest_map[subid] = machex @@ -154,7 +154,7 @@ def update_data(self): nexthops = ent["nexthop"] for nh in nexthops.split(','): # TODO: if ipn contains IP range, create more sub_id here - sub_id = ip2tuple_v4(ipn.network_address) + sub_id = ip2byte_tuple(ipn.network_address) self.route_list.append(sub_id) self.nexthop_map[sub_id] = ipaddress.ip_address(nh).packed break # Just need the first nexthop diff --git a/src/sonic_ax_impl/mibs/ietf/rfc4292.py b/src/sonic_ax_impl/mibs/ietf/rfc4292.py index 38ce35c6749d..de75c05cf2f4 100644 --- a/src/sonic_ax_impl/mibs/ietf/rfc4292.py +++ b/src/sonic_ax_impl/mibs/ietf/rfc4292.py @@ -3,7 +3,7 @@ from sonic_ax_impl import mibs from sonic_ax_impl.mibs import Namespace from ax_interface import MIBMeta, ValueType, MIBUpdater, SubtreeMIBEntry -from ax_interface.util import ip2tuple_v4 +from ax_interface.util import ip2byte_tuple from bisect import bisect_right from sonic_py_common import multi_asic @@ -46,7 +46,7 @@ def update_data(self): ## The nexthop for loopbacks should be all zero for loip in self.loips: - sub_id = ip2tuple_v4(loip) + (255, 255, 255, 255) + (self.tos,) + (0, 0, 0, 0) + sub_id = ip2byte_tuple(loip) + (255, 255, 255, 255) + (self.tos,) + (0, 0, 0, 0) self.route_dest_list.append(sub_id) self.route_dest_map[sub_id] = self.loips[loip].packed @@ -84,7 +84,7 @@ def update_data(self): port_table[ifn][multi_asic.PORT_ROLE] == multi_asic.INTERNAL_PORT): continue - sub_id = ip2tuple_v4(ipn.network_address) + ip2tuple_v4(ipn.netmask) + (self.tos,) + ip2tuple_v4(nh) + sub_id = ip2byte_tuple(ipn.network_address) + ip2byte_tuple(ipn.netmask) + (self.tos,) + ip2byte_tuple(nh) self.route_dest_list.append(sub_id) self.route_dest_map[sub_id] = ipn.network_address.packed diff --git a/src/sonic_ax_impl/mibs/vendor/cisco/bgp4.py b/src/sonic_ax_impl/mibs/vendor/cisco/bgp4.py index f5f3e3724cb9..87214707bf43 100644 --- a/src/sonic_ax_impl/mibs/vendor/cisco/bgp4.py +++ b/src/sonic_ax_impl/mibs/vendor/cisco/bgp4.py @@ -1,6 +1,7 @@ from bisect import bisect_right from sonic_ax_impl import mibs from ax_interface import MIBMeta, ValueType, MIBUpdater, SubtreeMIBEntry +from ax_interface.util import ip2byte_tuple from sonic_ax_impl.mibs import Namespace import ipaddress @@ -43,7 +44,7 @@ def update_data(self): oid_head = (1, 4) else: oid_head = (2, 16) - oid_ip = tuple(i for i in ip.packed) + oid_ip = ip2byte_tuple(neigh_str) if state.isdigit(): status = 6 diff --git a/tests/mock_tables/appl_db.json b/tests/mock_tables/appl_db.json index 784a5212e36a..69bdbf1ad3a8 100644 --- a/tests/mock_tables/appl_db.json +++ b/tests/mock_tables/appl_db.json @@ -11,7 +11,7 @@ "lldp_rem_chassis_id_subtype": "4", "lldp_rem_sys_name": "switch13", "lldp_rem_port_id": "Ethernet1", - "lldp_rem_man_addr": "10.224.25.100" + "lldp_rem_man_addr": "10.224.25.100,2603:10e2:290:5016::" }, "LLDP_ENTRY_TABLE:Ethernet4": { "lldp_rem_port_id_subtype": "5", @@ -25,7 +25,7 @@ "lldp_rem_chassis_id_subtype": "4", "lldp_rem_sys_name": "switch13", "lldp_rem_port_id": "Ethernet2", - "lldp_rem_man_addr": "fe80::268a:7ff:fe3f:834c" + "lldp_rem_man_addr": "fe80::268a::7ff:fe3f:834c,10.224.25.101" }, "LLDP_ENTRY_TABLE:Ethernet8": { "lldp_rem_port_id_subtype": "5", @@ -53,7 +53,7 @@ "lldp_rem_chassis_id_subtype": "4", "lldp_rem_sys_name": "switch13", "lldp_rem_port_id": "Ethernet4", - "lldp_rem_man_addr": "10.224.25.103" + "lldp_rem_man_addr": "fe80:268a::7ff:fe3f:834c" }, "LLDP_ENTRY_TABLE:Ethernet16": { "lldp_rem_port_id_subtype": "5", @@ -67,7 +67,7 @@ "lldp_rem_chassis_id_subtype": "4", "lldp_rem_sys_name": "switch13", "lldp_rem_port_id": "Ethernet5", - "lldp_rem_man_addr": "10.224.25.104" + "lldp_rem_man_addr": "" }, "LLDP_ENTRY_TABLE:Ethernet20": { "lldp_rem_port_id_subtype": "5", @@ -333,7 +333,7 @@ "lldp_rem_chassis_id_subtype": "4", "lldp_rem_sys_name": "switch13", "lldp_rem_port_id": "Ethernet24", - "lldp_rem_man_addr": "10.224.25.123" + "lldp_rem_man_addr": "10.224.25.123,2603:10e2:290:5016::" }, "LLDP_ENTRY_TABLE:Ethernet96": { "lldp_rem_port_id_subtype": "5", diff --git a/tests/mock_tables/asic0/appl_db.json b/tests/mock_tables/asic0/appl_db.json index af2167338896..d741bf6f8e57 100644 --- a/tests/mock_tables/asic0/appl_db.json +++ b/tests/mock_tables/asic0/appl_db.json @@ -5,13 +5,13 @@ "lldp_rem_index": "1", "lldp_rem_chassis_id": "00:11:22:33:44:55", "lldp_rem_sys_desc": "I'm a little teapot.", - "lldp_rem_time_mark": "18545", + "lldp_rem_time_mark": "0", "lldp_rem_sys_cap_enabled": "28 00", "lldp_rem_port_desc": " ", "lldp_rem_chassis_id_subtype": "4", "lldp_rem_sys_name": "switch13", "lldp_rem_port_id": "Ethernet1", - "lldp_rem_man_addr": "10.224.25.100" + "lldp_rem_man_addr": "10.224.25.123,2603:10e2:290:5016::" }, "LLDP_ENTRY_TABLE:Ethernet4": { "lldp_rem_port_id_subtype": "5", @@ -25,7 +25,7 @@ "lldp_rem_chassis_id_subtype": "4", "lldp_rem_sys_name": "switch13", "lldp_rem_port_id": "Ethernet2", - "lldp_rem_man_addr": "fe80::268a:7ff:fe3f:834c" + "lldp_rem_man_addr": "fe80::268a::7ff:fe3f:834c,10.224.25.102" }, "LLDP_LOC_CHASSIS": { "lldp_loc_chassis_id_subtype": "5", diff --git a/tests/mock_tables/asic1/appl_db.json b/tests/mock_tables/asic1/appl_db.json index 3c03bcc12a3c..b1389f441acd 100644 --- a/tests/mock_tables/asic1/appl_db.json +++ b/tests/mock_tables/asic1/appl_db.json @@ -11,7 +11,7 @@ "lldp_rem_chassis_id_subtype": "4", "lldp_rem_sys_name": "switch13", "lldp_rem_port_id": "Ethernet3", - "lldp_rem_man_addr": "fe80::268a:7ff:fe3f:834c" + "lldp_rem_man_addr": "10.224.25.102" }, "LLDP_ENTRY_TABLE:Ethernet12": { "lldp_rem_port_id_subtype": "5", @@ -25,7 +25,7 @@ "lldp_rem_chassis_id_subtype": "4", "lldp_rem_sys_name": "switch13", "lldp_rem_port_id": "Ethernet4", - "lldp_rem_man_addr": "10.224.25.102" + "lldp_rem_man_addr": "fe80:268a::7ff:fe3f:834c" }, "LLDP_LOC_CHASSIS": { "lldp_loc_chassis_id_subtype": "5", diff --git a/tests/mock_tables/asic2/appl_db.json b/tests/mock_tables/asic2/appl_db.json index 87a07cf314eb..c1cb8b996ccb 100644 --- a/tests/mock_tables/asic2/appl_db.json +++ b/tests/mock_tables/asic2/appl_db.json @@ -1,4 +1,18 @@ { + "LLDP_ENTRY_TABLE:Ethernet16": { + "lldp_rem_port_id_subtype": "5", + "lldp_rem_sys_cap_supported": "28 00", + "lldp_rem_index": "1", + "lldp_rem_chassis_id": "00:11:22:33:44:55", + "lldp_rem_sys_desc": "I'm a little teapot.", + "lldp_rem_time_mark": "18543", + "lldp_rem_sys_cap_enabled": "28 00", + "lldp_rem_port_desc": " ", + "lldp_rem_chassis_id_subtype": "4", + "lldp_rem_sys_name": "switch13", + "lldp_rem_port_id": "Ethernet5", + "lldp_rem_man_addr": "" + }, "LLDP_LOC_CHASSIS": { "lldp_loc_chassis_id_subtype": "5", "lldp_loc_chassis_id": "00:11:22:AB:CD:EF", @@ -8,6 +22,11 @@ "lldp_loc_sys_cap_supported": "28 00", "lldp_loc_man_addr": "fe80::ce37:abff:feec:de9c" }, + "PORT_TABLE:Ethernet16": { + "description": "snowflake", + "alias": "etp5", + "speed": 100000 + }, "PORT_TABLE:Ethernet-BP16": { "description": "backplane", "alias": "etp9", diff --git a/tests/namespace/test_lldp.py b/tests/namespace/test_lldp.py index 96e940fa703d..fbfd4e26026e 100644 --- a/tests/namespace/test_lldp.py +++ b/tests/namespace/test_lldp.py @@ -124,11 +124,96 @@ def test_subtype_lldp_loc_man_addr_table(self): def test_subtype_lldp_rem_man_addr_table(self): + + # Get the first entry of walk. We will get IPv4 Address associated with Ethernet0 Port + # Verfiy both valid ipv4 and ipv6 address exist for entry in range(3, 6): - mib_entry = self.lut[(1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, entry)] - ret = mib_entry(sub_id=(1, 1)) - self.assertIsNotNone(ret) - print(ret) + oid = ObjectIdentifier(11, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, entry)) + get_pdu = GetNextPDU( + header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), + oids=[oid] + ) + response = get_pdu.make_response(self.lut) + value0 = response.values[0] + self.assertEqual(str(value0.name), str(ObjectIdentifier(20, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, entry, 0, 1, 1, 1, 4, 10, 224, 25, 123)))) + if entry == 3: + self.assertEqual(value0.type_, ValueType.INTEGER) + self.assertEqual(value0.data, 2) + elif entry == 4: + self.assertEqual(value0.type_, ValueType.INTEGER) + self.assertEqual(value0.data, 0) + else: + self.assertEqual(value0.type_, ValueType.OBJECT_IDENTIFIER) + self.assertEqual(str(value0.data), str(ObjectIdentifier(5, 2, 0, 0, (1, 2, 2, 1, 1)))) + + # Get next on above to get IPv6 Entry. We will get IPv6 Address associated with Ethernet0 Port + oid = ObjectIdentifier(16, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, entry, 0, 1, 1, 1, 16)) + get_pdu = GetNextPDU( + header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), + oids=[oid] + ) + response = get_pdu.make_response(self.lut) + value0 = response.values[0] + self.assertEqual(str(value0.name), str(ObjectIdentifier(20, 0, 1, 0, + (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, entry, 0, 1, 1, 2, 16, 38, 3, 16, 226, 2, 144, 80, 22, 0, 0, 0, 0, 0, 0, 0, 0)))) + if entry == 3: + self.assertEqual(value0.type_, ValueType.INTEGER) + self.assertEqual(value0.data, 2) + elif entry == 4: + self.assertEqual(value0.type_, ValueType.INTEGER) + self.assertEqual(value0.data, 0) + else: + self.assertEqual(value0.type_, ValueType.OBJECT_IDENTIFIER) + self.assertEqual(str(value0.data), str(ObjectIdentifier(5, 2, 0, 0, (1, 2, 2, 1, 1)))) + + + # Verfiy both valid ipv4 and invalid ipv6 address exist. Ethernet5 has this config. + oid = ObjectIdentifier(20, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, 3, 18543, 5, 1, 1, 4, 10, 224, 25, 102)) + get_pdu = GetPDU( + header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), + oids=[oid] + ) + response = get_pdu.make_response(self.lut) + value0 = response.values[0] + self.assertEqual(str(value0.name), str(ObjectIdentifier(20, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, 3, 18543, 5, 1, 1, 4, 10, 224, 25, 102)))) + self.assertEqual(value0.type_, ValueType.INTEGER) + self.assertEqual(value0.data, 2) + + # Verfiy only valid ipv4 address exist. Ethernet8 has this config. + oid = ObjectIdentifier(20, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, 3, 18543, 9, 1, 1, 4, 10, 224, 25, 102)) + get_pdu = GetPDU( + header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), + oids=[oid] + ) + response = get_pdu.make_response(self.lut) + value0 = response.values[0] + self.assertEqual(str(value0.name), str(ObjectIdentifier(20, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, 3, 18543, 9, 1, 1, 4, 10, 224, 25, 102)))) + self.assertEqual(value0.type_, ValueType.INTEGER) + self.assertEqual(value0.data, 2) + + # Verfiy only valid ipv6 address exist. Ethernet12 has this config. + oid = ObjectIdentifier(20, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, 3, 18543, 13, 1, 2, 16, 254, 128, 38, 138, 0, 0, 0, 0, 0, 0, 7, 255, 254, 63, 131, 76)) + get_pdu = GetPDU( + header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), + oids=[oid] + ) + response = get_pdu.make_response(self.lut) + value0 = response.values[0] + self.assertEqual(str(value0.name), str(ObjectIdentifier(20, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, 3, 18543, + 13, 1, 2, 16, 254, 128, 38, 138, 0, 0, 0, 0, 0, 0, 7, 255, 254, 63, 131, 76)))) + self.assertEqual(value0.type_, ValueType.INTEGER) + self.assertEqual(value0.data, 2) + + # Verfiy no mgmt address exit. Ethernet16 has this config. + oid = ObjectIdentifier(11, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, 3, 18545, 17)) + get_pdu = GetNextPDU( + header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), + oids=[oid] + ) + response = get_pdu.make_response(self.lut) + value0 = response.values[0] + # Should result End of Mib View + self.assertEqual(value0.type_, ValueType.END_OF_MIB_VIEW) def test_local_port_identification(self): mib_entry = self.lut[(1, 0, 8802, 1, 1, 2, 1, 3, 7, 1, 3)] diff --git a/tests/test_lldp.py b/tests/test_lldp.py index a9ee5f2feee1..0ff3e10594bf 100644 --- a/tests/test_lldp.py +++ b/tests/test_lldp.py @@ -106,11 +106,96 @@ def test_subtype_lldp_loc_man_addr_table(self): def test_subtype_lldp_rem_man_addr_table(self): + # Get the first entry of walk. We will get IPv4 Address associated with Ethernet92 Port + # Verfiy both valid ipv4 and ipv6 address exist for entry in range(3, 6): - mib_entry = self.lut[(1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, entry)] - ret = mib_entry(sub_id=(1, 1)) - self.assertIsNotNone(ret) - print(ret) + oid = ObjectIdentifier(11, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, entry)) + get_pdu = GetNextPDU( + header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), + oids=[oid] + ) + response = get_pdu.make_response(self.lut) + value0 = response.values[0] + self.assertEqual(str(value0.name), str(ObjectIdentifier(20, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, entry, 32, 93, 1, 1, 4, 10, 224, 25, 123)))) + if entry == 3: + self.assertEqual(value0.type_, ValueType.INTEGER) + self.assertEqual(value0.data, 2) + elif entry == 4: + self.assertEqual(value0.type_, ValueType.INTEGER) + self.assertEqual(value0.data, 0) + else: + self.assertEqual(value0.type_, ValueType.OBJECT_IDENTIFIER) + self.assertEqual(str(value0.data), str(ObjectIdentifier(5, 2, 0, 0, (1, 2, 2, 1, 1)))) + + # Get next on above to get IPv6 Entry. We will get IPv6 Address associated with Ethernet92 Port + oid = ObjectIdentifier(16, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, entry, 32, 93, 1, 1, 16)) + get_pdu = GetNextPDU( + header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), + oids=[oid] + ) + response = get_pdu.make_response(self.lut) + value0 = response.values[0] + self.assertEqual(str(value0.name), str(ObjectIdentifier(20, 0, 1, 0, + (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, entry, 32, 93, 1, 2, 16, 38, 3, 16, 226, 2, 144, 80, 22, 0, 0, 0, 0, 0, 0, 0, 0)))) + if entry == 3: + self.assertEqual(value0.type_, ValueType.INTEGER) + self.assertEqual(value0.data, 2) + elif entry == 4: + self.assertEqual(value0.type_, ValueType.INTEGER) + self.assertEqual(value0.data, 0) + else: + self.assertEqual(value0.type_, ValueType.OBJECT_IDENTIFIER) + self.assertEqual(str(value0.data), str(ObjectIdentifier(5, 2, 0, 0, (1, 2, 2, 1, 1)))) + + # Verfiy both valid ipv4 and invalid ipv6 address exist. Ethernet5 has this config. + oid = ObjectIdentifier(20, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, 3, 18543, 5, 1, 1, 4, 10, 224, 25, 101)) + get_pdu = GetPDU( + header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), + oids=[oid] + ) + response = get_pdu.make_response(self.lut) + value0 = response.values[0] + self.assertEqual(str(value0.name), str(ObjectIdentifier(20, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, 3, 18543, 5, 1, 1, 4, 10, 224, 25, 101)))) + self.assertEqual(value0.type_, ValueType.INTEGER) + self.assertEqual(value0.data, 2) + + # Verfiy only valid ipv4 address exist. Ethernet8 has this config. + oid = ObjectIdentifier(20, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, 3, 18543, 9, 1, 1, 4, 10, 224, 25, 102)) + get_pdu = GetPDU( + header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), + oids=[oid] + ) + response = get_pdu.make_response(self.lut) + value0 = response.values[0] + self.assertEqual(str(value0.name), str(ObjectIdentifier(20, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, 3, 18543, 9, 1, 1, 4, 10, 224, 25, 102)))) + self.assertEqual(value0.type_, ValueType.INTEGER) + self.assertEqual(value0.data, 2) + + # Verfiy only valid ipv6 address exist. Ethernet12 has this config. + oid = ObjectIdentifier(20, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, 3, 18545, 13, 1, 2, 16, 254, 128, 38, 138, 0, 0, 0, 0, 0, 0, 7, 255, 254, 63, 131, 76)) + get_pdu = GetPDU( + header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), + oids=[oid] + ) + response = get_pdu.make_response(self.lut) + value0 = response.values[0] + self.assertEqual(str(value0.name), str(ObjectIdentifier(20, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, 3, 18545, + 13, 1, 2, 16, 254, 128, 38, 138, 0, 0, 0, 0, 0, 0, 7, 255, 254, 63, 131, 76)))) + self.assertEqual(value0.type_, ValueType.INTEGER) + self.assertEqual(value0.data, 2) + + # Verfiy no mgmt address exist. Ethernet16 has this config. + oid = ObjectIdentifier(11, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, 3, 18545, 17)) + get_pdu = GetNextPDU( + header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0), + oids=[oid] + ) + response = get_pdu.make_response(self.lut) + value0 = response.values[0] + # Should move to other interface. Ethernet22 + self.assertEqual(str(value0.name), str(ObjectIdentifier(20, 0, 0, 0, (1, 0, 8802, 1, 1, 2, 1, 4, 2, 1, 3, 18545, 21, 1, 1, 4, 10, 224, 25, 105)))) + self.assertEqual(value0.type_, ValueType.INTEGER) + self.assertEqual(value0.data, 2) def test_local_port_identification(self): mib_entry = self.lut[(1, 0, 8802, 1, 1, 2, 1, 3, 7, 1, 3)]