Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[202205]Use device loopback IP address to send SNMP query from neighboring ceos or vsonic (#8802) #8972

Merged
merged 2 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions tests/common/helpers/snmp_helpers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import ipaddress

from tests.common.utilities import wait_until
from tests.common.errors import RunAnsibleModuleFail
from tests.common.helpers.assertions import pytest_assert
from tests.common.devices.eos import EosHost

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,3 +42,46 @@ def get_snmp_facts(localhost, host, version, community, is_dell=False, module_ig
pytest_assert(wait_until(timeout, interval, 0, _update_snmp_facts, localhost, host, version,
community, is_dell, include_swap), "Timeout waiting for SNMP facts")
return global_snmp_facts


def get_snmp_output(ip, duthost, nbr, creds_all_duts, oid='.1.3.6.1.2.1.1.1.0'):
"""
Get snmp output from duthost using specific ip to query
snmp query is sent from neighboring ceos/vsonic

Args:
ip(str): IP of dut to be used to send SNMP query
duthost: duthost
nbr: from where the snmp query should be executed
creds_all_duts: creds to get snmp_rocommunity of duthost
oid: to query

Returns:
SNMP result
"""
ipaddr = ipaddress.ip_address(ip)
iptables_cmd = "iptables"

# TODO : Fix snmp query over loopback v6 and remove this check and add IPv6 ACL table/rule.
if isinstance(ipaddr, ipaddress.IPv6Address):
iptables_cmd = "ip6tables"
return None

ip_tbl_rule_add = "sudo {} -I INPUT 1 -p udp --dport 161 -d {} -j ACCEPT".format(
iptables_cmd, ip)
duthost.shell(ip_tbl_rule_add)

if isinstance(nbr["host"], EosHost):
eos_snmpget = "bash snmpget -v2c -c {} {} {}".format(
creds_all_duts[duthost.hostname]['snmp_rocommunity'], ip, oid)
out = nbr['host'].eos_command(commands=[eos_snmpget])
else:
command = "docker exec snmp snmpwalk -v 2c -c {} {} {}".format(
creds_all_duts[duthost.hostname]['snmp_rocommunity'], ip, oid)
out = nbr['host'].command(command)

ip_tbl_rule_del = "sudo {} -D INPUT -p udp --dport 161 -d {} -j ACCEPT".format(
iptables_cmd, ip)
duthost.shell(ip_tbl_rule_del)

return out
36 changes: 20 additions & 16 deletions tests/macsec/test_interop_protocol.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from time import sleep
import pytest
import logging
import re
import scapy.all as scapy
import ptf.testutils as testutils
from collections import Counter
import ipaddress

from tests.common.utilities import wait_until
from tests.common.devices.eos import EosHost
from tests.common import config_reload
from macsec_helper import *
from macsec_config_helper import *
from macsec_platform_helper import *
from tests.common.helpers.snmp_helpers import get_snmp_output

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -121,24 +121,28 @@ def check_bgp_established(ctrl_port, up_link):
assert wait_until(BGP_TIMEOUT, BGP_KEEPALIVE, BGP_HOLDTIME,
check_bgp_established, ctrl_port, upstream_links[ctrl_port])

def test_snmp(self, duthost, ctrl_links, upstream_links, creds, wait_mka_establish):
def test_snmp(self, duthost, ctrl_links, upstream_links, creds_all_duts, wait_mka_establish):
'''
Verify SNMP request/response works across interface with macsec configuration
'''
if duthost.is_multi_asic:
pytest.skip("The test is for Single ASIC devices")

for ctrl_port, nbr in ctrl_links.items():
if isinstance(nbr["host"], EosHost):
result = nbr["host"].eos_command(
commands=['show snmp community | include name'])
community = re.search(r'Community name: (\S+)',
result['stdout'][0]).groups()[0]
else: # vsonic neighbour
community = creds['snmp_rocommunity']

up_link = upstream_links[ctrl_port]
loopback0_ips = duthost.config_facts(host=duthost.hostname,
source="running")[
"ansible_facts"].get(
"LOOPBACK_INTERFACE",
{}).get('Loopback0', {})
for ip in loopback0_ips:
if isinstance(ipaddress.ip_network(ip),
ipaddress.IPv4Network):
dut_loip = ip.split('/')[0]
break
else:
pytest.fail("No Loopback0 IPv4 address for {}".
format(duthost.hostname))
for ctrl_port, nbr in list(ctrl_links.items()):
sysDescr = ".1.3.6.1.2.1.1.1.0"
command = "docker exec snmp snmpwalk -v 2c -c {} {} {}".format(
community, up_link["local_ipv4_addr"], sysDescr)
assert not duthost.command(command)["failed"]
result = get_snmp_output(dut_loip, duthost, nbr,
creds_all_duts, sysDescr)
assert not result["failed"]
40 changes: 11 additions & 29 deletions tests/snmp/test_snmp_loopback.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,14 @@
import pytest
import ipaddress
from tests.common.helpers.snmp_helpers import get_snmp_facts
try: # python3
from shlex import quote
except ImportError: # python2
from pipes import quote
from tests.common.helpers.snmp_helpers import get_snmp_facts, get_snmp_output
from tests.common.devices.eos import EosHost

pytestmark = [
pytest.mark.topology('any'),
pytest.mark.device_type('vs')
]


def get_snmp_output(ip, duthost, nbr, creds_all_duts):
ipaddr = ipaddress.ip_address(ip)
iptables_cmd = "iptables"

# TODO : Fix snmp query over loopback v6 and remove this check and add IPv6 ACL table/rule.
if isinstance(ipaddr, ipaddress.IPv6Address):
iptables_cmd = "ip6tables"
return None

ip_tbl_rule_add = "sudo {} -I INPUT 1 -p udp --dport 161 -d {} -j ACCEPT".format(iptables_cmd, ip)
duthost.shell(ip_tbl_rule_add)

eos_snmpget = "bash snmpget -v2c -c {} {} 1.3.6.1.2.1.1.1.0".format(quote(creds_all_duts[duthost.hostname]['snmp_rocommunity']), ip)
out = nbr['host'].eos_command(commands=[eos_snmpget])

ip_tbl_rule_del = "sudo {} -D INPUT -p udp --dport 161 -d {} -j ACCEPT".format(iptables_cmd, ip)
duthost.shell(ip_tbl_rule_del)

return out


@pytest.mark.bsl
def test_snmp_loopback(duthosts, enum_rand_one_per_hwsku_frontend_hostname, nbrhosts, tbinfo, localhost, creds_all_duts):
"""
Expand All @@ -58,6 +34,12 @@ def test_snmp_loopback(duthosts, enum_rand_one_per_hwsku_frontend_hostname, nbrh
continue
result = get_snmp_output(loip, duthost, nbr, creds_all_duts)
assert result is not None, 'No result from snmpget'
assert len(result[u'stdout_lines']) > 0, 'No result from snmpget'
assert "SONiC Software Version" in result[u'stdout_lines'][0][0], "Sysdescr not found in SNMP result from IP {}".format(ip)
assert snmp_facts['ansible_sysdescr'] in result[u'stdout_lines'][0][0], "Sysdescr from IP{} not matching with result from Mgmt IPv4.".format(ip)
assert len(result['stdout_lines']) > 0, 'No result from snmpget'
if isinstance(nbr["host"], EosHost):
stdout_lines = result['stdout_lines'][0][0]
else:
stdout_lines = result['stdout_lines'][0]
assert "SONiC Software Version" in stdout_lines,\
"Sysdescr not found in SNMP result from IP {}".format(ip)
assert snmp_facts['ansible_sysdescr'] in stdout_lines,\
"Sysdescr from IP{} not matching with result from Mgmt IPv4.".format(ip)