Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use device loopback IP address to send SNMP query from neighboring ceos or vsonic #8802

Merged
merged 13 commits into from
Jul 10, 2023
Merged
45 changes: 45 additions & 0 deletions tests/common/helpers/snmp_helpers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import ipaddress

from tests.common.utilities import wait_until
from tests.common.errors import RunAnsibleModuleFail
from tests.common.helpers.assertions import pytest_assert
from tests.common.devices.eos import EosHost

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -42,3 +44,46 @@ def get_snmp_facts(localhost, host, version, community, is_dell=False, module_ig
pytest_assert(wait_until(timeout, interval, 0, _update_snmp_facts, localhost, host, version,
community, is_dell, include_swap), "Timeout waiting for SNMP facts")
return global_snmp_facts


def get_snmp_output(ip, duthost, nbr, creds_all_duts, oid='.1.3.6.1.2.1.1.1.0'):
"""
Get snmp output from duthost using specific ip to query
snmp query is sent from neighboring ceos/vsonic

Args:
ip(str): IP of dut to be used to send SNMP query
duthost: duthost
nbr: from where the snmp query should be executed
creds_all_duts: creds to get snmp_rocommunity of duthost
oid: to query

Returns:
SNMP result
"""
ipaddr = ipaddress.ip_address(ip)
iptables_cmd = "iptables"

# TODO : Fix snmp query over loopback v6 and remove this check and add IPv6 ACL table/rule.
if isinstance(ipaddr, ipaddress.IPv6Address):
iptables_cmd = "ip6tables"
return None

ip_tbl_rule_add = "sudo {} -I INPUT 1 -p udp --dport 161 -d {} -j ACCEPT".format(
iptables_cmd, ip)
duthost.shell(ip_tbl_rule_add)

if isinstance(nbr["host"], EosHost):
eos_snmpget = "bash snmpget -v2c -c {} {} {}".format(
creds_all_duts[duthost.hostname]['snmp_rocommunity'], ip, oid)
out = nbr['host'].eos_command(commands=[eos_snmpget])
else:
command = "docker exec snmp snmpwalk -v 2c -c {} {} {}".format(
creds_all_duts[duthost.hostname]['snmp_rocommunity'], ip, oid)
out = nbr['host'].command(command)

ip_tbl_rule_del = "sudo {} -D INPUT -p udp --dport 161 -d {} -j ACCEPT".format(
iptables_cmd, ip)
duthost.shell(ip_tbl_rule_del)

return out
34 changes: 19 additions & 15 deletions tests/macsec/test_interop_protocol.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import pytest
import logging
import re
import ipaddress

from tests.common.utilities import wait_until
from tests.common.devices.eos import EosHost
from .macsec_helper import getns_prefix
from .macsec_config_helper import disable_macsec_port, enable_macsec_port
from .macsec_platform_helper import find_portchannel_from_member, get_portchannel, get_lldp_list, sonic_db_cli
from tests.common.helpers.snmp_helpers import get_snmp_output

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -119,24 +119,28 @@ def check_bgp_established(ctrl_port, up_link):
assert wait_until(BGP_TIMEOUT, BGP_KEEPALIVE, BGP_HOLDTIME,
check_bgp_established, ctrl_port, upstream_links[ctrl_port])

def test_snmp(self, duthost, ctrl_links, upstream_links, creds, wait_mka_establish):
def test_snmp(self, duthost, ctrl_links, upstream_links, creds_all_duts, wait_mka_establish):
'''
Verify SNMP request/response works across interface with macsec configuration
'''
if duthost.is_multi_asic:
pytest.skip("The test is for Single ASIC devices")

loopback0_ips = duthost.config_facts(host=duthost.hostname,
source="running")[
"ansible_facts"].get(
"LOOPBACK_INTERFACE",
{}).get('Loopback0', {})
for ip in loopback0_ips:
if isinstance(ipaddress.ip_network(ip),
ipaddress.IPv4Network):
dut_loip = ip.split('/')[0]
break
else:
pytest.fail("No Loopback0 IPv4 address for {}".
format(duthost.hostname))
for ctrl_port, nbr in list(ctrl_links.items()):
if isinstance(nbr["host"], EosHost):
result = nbr["host"].eos_command(
commands=['show snmp community | include name'])
community = re.search(r'Community name: (\S+)',
result['stdout'][0]).groups()[0]
else: # vsonic neighbour
community = creds['snmp_rocommunity']

up_link = upstream_links[ctrl_port]
sysDescr = ".1.3.6.1.2.1.1.1.0"
command = "docker exec snmp snmpwalk -v 2c -c {} {} {}".format(
community, up_link["local_ipv4_addr"], sysDescr)
assert not duthost.command(command)["failed"]
result = get_snmp_output(dut_loip, duthost, nbr,
creds_all_duts, sysDescr)
assert not result["failed"]
41 changes: 9 additions & 32 deletions tests/snmp/test_snmp_loopback.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,14 @@
import pytest
import ipaddress
from tests.common.helpers.snmp_helpers import get_snmp_facts
try: # python3
from shlex import quote
except ImportError: # python2
from pipes import quote
from tests.common.helpers.snmp_helpers import get_snmp_facts, get_snmp_output
from tests.common.devices.eos import EosHost

pytestmark = [
pytest.mark.topology('t0', 't1', 't2', 'm0', 'mx'),
pytest.mark.device_type('vs')
]


def get_snmp_output(ip, duthost, nbr, creds_all_duts):
ipaddr = ipaddress.ip_address(ip)
iptables_cmd = "iptables"

# TODO : Fix snmp query over loopback v6 and remove this check and add IPv6 ACL table/rule.
if isinstance(ipaddr, ipaddress.IPv6Address):
iptables_cmd = "ip6tables"
return None

ip_tbl_rule_add = "sudo {} -I INPUT 1 -p udp --dport 161 -d {} -j ACCEPT".format(
iptables_cmd, ip)
duthost.shell(ip_tbl_rule_add)

eos_snmpget = "bash snmpget -v2c -c {} {} 1.3.6.1.2.1.1.1.0".format(
quote(creds_all_duts[duthost.hostname]['snmp_rocommunity']), ip)
out = nbr['host'].eos_command(commands=[eos_snmpget])

ip_tbl_rule_del = "sudo {} -D INPUT -p udp --dport 161 -d {} -j ACCEPT".format(
iptables_cmd, ip)
duthost.shell(ip_tbl_rule_del)

return out


@pytest.mark.bsl
def test_snmp_loopback(duthosts, enum_rand_one_per_hwsku_frontend_hostname,
nbrhosts, tbinfo, localhost, creds_all_duts):
Expand Down Expand Up @@ -67,7 +40,11 @@ def test_snmp_loopback(duthosts, enum_rand_one_per_hwsku_frontend_hostname,
result = get_snmp_output(loip, duthost, nbr, creds_all_duts)
assert result is not None, 'No result from snmpget'
assert len(result['stdout_lines']) > 0, 'No result from snmpget'
assert "SONiC Software Version" in result['stdout_lines'][0][0],\
if isinstance(nbr["host"], EosHost):
stdout_lines = result['stdout_lines'][0][0]
else:
stdout_lines = result['stdout_lines'][0]
assert "SONiC Software Version" in stdout_lines,\
"Sysdescr not found in SNMP result from IP {}".format(ip)
assert snmp_facts['ansible_sysdescr'] in result['stdout_lines'][0][
0], "Sysdescr from IP{} not matching with result from Mgmt IPv4.".format(ip)
assert snmp_facts['ansible_sysdescr'] in stdout_lines,\
"Sysdescr from IP{} not matching with result from Mgmt IPv4.".format(ip)