From 0efd297fc4249c764997d9f128b04e12bc0eda65 Mon Sep 17 00:00:00 2001 From: "sabari@broadcom" <45473507+gitsabari@users.noreply.github.com> Date: Fri, 16 Jul 2021 10:07:36 -0700 Subject: [PATCH] mclag enhancements as per HLD at Azure/SONIC#596 (#1138) * mclagsyncd enhancements as per HLD at Azure/SONIC#596 * addressed LGTM alert * UT Fix unique IP configuration * modified ip address validate function for mclag config verication * Add soft-reboot reboot type (#1453) What I did Add a new reboot named as soft-reboot which can be performed by "kexec -e" How I did it Replace the platform reboot with "kexec -e" for the cold reboot case. How to verify it Verified the reboot on DUT and check the reboot-cause * [warm-reboot] Check if warm restart flag is set when issuing a warm-reboot (#1460) Check if any warm restart flag is set when issuing a warm-reboot. This check avoids starting a warm reboot while another warm restart is in progress. In the scenario where a warm reboot is issued with another warm restart in progress, the warm restart flag may be reset and part of the components have a risk of doing cold reboot. * Added mclag config commands * removed unwanted imports * added mclag tests * fixed build issue * corrected mclag test * corrected mclag test * corrected mclag test case * updated testcase for mclag * updated mclag config * updated mclag test cases * updated mclag test case * updated mclag test cases * fixed alert * updated mclag test cases * updated mclag test cases * updated mclag config * modified mclag test cases * updated mclag test case * updated mclag test case * updated mclag test cases * updated mclag test cases * updated mclag test cases * updated mclag test case * updated mclag test case * updated mclag test cases * updated mclag test cases * updated mclag test cases * updated mclag test cases * updated mclag test case * updated mclag test cases * updated mclag test cases * updated mclag test cases * updated mclag test cases * updated mclag test cases * updated mclag test cases * updated mclag test cases * updated mclag test cases * updated mclag test case * updated mclag test cases * updated mclag test case * updated mclag config to use swsscommon instead of swssdk * updated mclag config to use swsscommon * updated mclag config script file * fixed mclag test cases to verify config db * updated mclag test case with config db verify function * fixed build issue * updated test case * updated mclag test case * addressed review comments Co-authored-by: Tapash Das Co-authored-by: Tapash Das <48195098+tapashdas@users.noreply.github.com> Co-authored-by: Sujin Kang Co-authored-by: Shi Su <67605788+shi-su@users.noreply.github.com> --- config/main.py | 6 + config/mclag.py | 347 ++++++++++++++++++++++++ tests/mclag_test.py | 641 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 994 insertions(+) create mode 100644 config/mclag.py create mode 100644 tests/mclag_test.py diff --git a/config/main.py b/config/main.py index b4f65da6df1b..320ce72beed5 100644 --- a/config/main.py +++ b/config/main.py @@ -41,6 +41,7 @@ from . import vxlan from . import plugins from .config_mgmt import ConfigMgmtDPB +from . import mclag # mock masic APIs for unit test try: @@ -978,6 +979,11 @@ def config(ctx): config.add_command(vlan.vlan) config.add_command(vxlan.vxlan) +#add mclag commands +config.add_command(mclag.mclag) +config.add_command(mclag.mclag_member) +config.add_command(mclag.mclag_unique_ip) + @config.command() @click.option('-y', '--yes', is_flag=True, callback=_abort_if_false, expose_value=False, prompt='Existing files will be overwritten, continue?') diff --git a/config/mclag.py b/config/mclag.py new file mode 100644 index 000000000000..2ab0d0ca754a --- /dev/null +++ b/config/mclag.py @@ -0,0 +1,347 @@ + +import click +from swsscommon.swsscommon import ConfigDBConnector +import ipaddress + +CFG_PORTCHANNEL_PREFIX = "PortChannel" +CFG_PORTCHANNEL_PREFIX_LEN = 11 +CFG_PORTCHANNEL_MAX_VAL = 9999 +CFG_PORTCHANNEL_NAME_TOTAL_LEN_MAX = 15 +CFG_PORTCHANNEL_NO="<0-9999>" + +def mclag_domain_id_valid(domain_id): + """Check if the domain id is in acceptable range (between 1 and 4095) + """ + + if domain_id<1 or domain_id>4095: + return False + + return True + +def mclag_ka_session_dep_check(ka, session_tmout): + """Check if the MCLAG Keepalive timer and session timeout values are multiples of each other and keepalive is < session timeout value + """ + if not session_tmout >= ( 3 * ka): + return False, "MCLAG Keepalive:{} Session_timeout:{} values not satisfying session_timeout >= (3 * KA) ".format(session_tmout, ka) + + if session_tmout % ka: + return False, "MCLAG keepalive:{} Session_timeout:{} Values not satisfying session_timeout should be a multiple of KA".format(ka, session_tmout) + + return True, "" + + +def mclag_ka_interval_valid(ka): + """Check if the MCLAG Keepalive timer is in acceptable range (between 1 and 60) + """ + if ka < 1 or ka > 60: + return False, "Keepalive %s not in valid range[1-60]" % ka + return True, "" + +def mclag_session_timeout_valid(session_tmout): + """Check if the MCLAG session timeout in valid range (between 3 and 3600) + """ + if session_tmout < 3 or session_tmout > 3600: + return False, "Session timeout %s not in valid range[3-3600]" % session_tmout + return True, "" + + +def is_portchannel_name_valid(portchannel_name): + """Port channel name validation + """ + # Return True if Portchannel name is PortChannelXXXX (XXXX can be 0-9999) + if portchannel_name[:CFG_PORTCHANNEL_PREFIX_LEN] != CFG_PORTCHANNEL_PREFIX : + return False + if (portchannel_name[CFG_PORTCHANNEL_PREFIX_LEN:].isdigit() is False or + int(portchannel_name[CFG_PORTCHANNEL_PREFIX_LEN:]) > CFG_PORTCHANNEL_MAX_VAL) : + return False + if len(portchannel_name) > CFG_PORTCHANNEL_NAME_TOTAL_LEN_MAX: + return False + return True + +def is_ipv4_addr_valid(addr): + v4_invalid_list = [ipaddress.IPv4Address(str('0.0.0.0')), ipaddress.IPv4Address(str('255.255.255.255'))] + try: + ip = ipaddress.ip_address(str(addr)) + if (ip.version == 4): + if (ip.is_reserved): + click.echo ("{} Not Valid, Reason: IPv4 reserved address range.".format(addr)) + return False + elif (ip.is_multicast): + click.echo ("{} Not Valid, Reason: IPv4 Multicast address range.".format(addr)) + return False + elif (ip in v4_invalid_list): + click.echo ("{} Not Valid.".format(addr)) + return False + else: + return True + + else: + click.echo ("{} Not Valid, Reason: Not an IPv4 address".format(addr)) + return False + + except ValueError: + return False + + + +def check_if_interface_is_valid(db, interface_name): + from main import interface_name_is_valid + if interface_name_is_valid(db,interface_name) is False: + ctx.fail("Interface name is invalid. Please enter a valid interface name!!") + +def get_intf_vrf_bind_unique_ip(db, interface_name, interface_type): + intfvrf = db.get_table(interface_type) + if interface_name in intfvrf: + if 'vrf_name' in intfvrf[interface_name]: + return intfvrf[interface_name]['vrf_name'] + else: + return "" + else: + return "" + + +###### +# +# 'mclag' group ('config mclag ...') +# +@click.group() +@click.pass_context +def mclag(ctx): + config_db = ConfigDBConnector() + config_db.connect() + ctx.obj = {'db': config_db} + + +#mclag domain add +@mclag.command('add') +@click.argument('domain_id', metavar='', required=True, type=int) +@click.argument('source_ip_addr', metavar='', required=True) +@click.argument('peer_ip_addr', metavar='', required=True) +@click.argument('peer_ifname', metavar='', required=False) +@click.pass_context +def add_mclag_domain(ctx, domain_id, source_ip_addr, peer_ip_addr, peer_ifname): + """Add MCLAG Domain""" + + if not mclag_domain_id_valid(domain_id): + ctx.fail("{} invalid domain ID, valid range is 1 to 4095".format(domain_id)) + if not is_ipv4_addr_valid(source_ip_addr): + ctx.fail("{} invalid local ip address".format(source_ip_addr)) + if not is_ipv4_addr_valid(peer_ip_addr): + ctx.fail("{} invalid peer ip address".format(peer_ip_addr)) + + db = ctx.obj['db'] + fvs = {} + fvs['source_ip'] = str(source_ip_addr) + fvs['peer_ip'] = str(peer_ip_addr) + if peer_ifname is not None: + if (peer_ifname.startswith("Ethernet") is False) and (peer_ifname.startswith("PortChannel") is False): + ctx.fail("peer interface is invalid, should be Ethernet interface or portChannel !!") + if (peer_ifname.startswith("Ethernet") is True) and (check_if_interface_is_valid(db, peer_ifname) is False): + ctx.fail("peer Ethernet interface name is invalid. it is not present in port table of configDb!!") + if (peer_ifname.startswith("PortChannel")) and (is_portchannel_name_valid(peer_ifname) is False): + ctx.fail("peer PortChannel interface name is invalid !!") + fvs['peer_link'] = str(peer_ifname) + mclag_domain_keys = db.get_table('MCLAG_DOMAIN').keys() + if len(mclag_domain_keys) == 0: + db.set_entry('MCLAG_DOMAIN', domain_id, fvs) + else: + if domain_id in mclag_domain_keys: + db.mod_entry('MCLAG_DOMAIN', domain_id, fvs) + else: + ctx.fail("only one mclag Domain can be configured. Already one domain {} configured ".format(mclag_domain_keys[0])) + + +#mclag domain delete +#MCLAG Domain del involves deletion of associated MCLAG Ifaces also +@mclag.command('del') +@click.argument('domain_id', metavar='', required=True, type=int) +@click.pass_context +def del_mclag_domain(ctx, domain_id): + """Delete MCLAG Domain""" + + if not mclag_domain_id_valid(domain_id): + ctx.fail("{} invalid domain ID, valid range is 1 to 4095".format(domain_id)) + + db = ctx.obj['db'] + entry = db.get_entry('MCLAG_DOMAIN', domain_id) + if entry is None: + ctx.fail("MCLAG Domain {} not configured ".format(domain_id)) + return + + click.echo("MCLAG Domain delete takes care of deleting all associated MCLAG Interfaces") + + #get all MCLAG Interface associated with this domain and delete + interface_table_keys = db.get_table('MCLAG_INTERFACE').keys() + + #delete associated mclag interfaces + for iface_domain_id, iface_name in interface_table_keys: + if (int(iface_domain_id) == domain_id): + db.set_entry('MCLAG_INTERFACE', (iface_domain_id, iface_name), None ) + + #delete mclag domain + db.set_entry('MCLAG_DOMAIN', domain_id, None) + + +#keepalive timeout config +@mclag.command('keepalive-interval') +@click.argument('domain_id', metavar='', required=True) +@click.argument('time_in_secs', metavar='', required=True, type=int) +@click.pass_context +def config_mclag_keepalive_timer(ctx, domain_id, time_in_secs): + """Configure MCLAG Keepalive timer value in secs""" + db = ctx.obj['db'] + + entry = db.get_entry('MCLAG_DOMAIN', domain_id) + if len(entry) == 0: + ctx.fail("MCLAG Domain " + domain_id + " not configured, configure mclag domain first") + + status, error_info = mclag_ka_interval_valid(time_in_secs) + if status is not True: + ctx.fail(error_info) + + session_timeout_value = entry.get('session_timeout') + + if session_timeout_value is None: + # assign default value + int_sess_tmout = 15 + else: + int_sess_tmout = int(session_timeout_value) + + status, error_info = mclag_ka_session_dep_check(time_in_secs, int_sess_tmout) + if status is not True: + ctx.fail(error_info) + + fvs = {} + fvs['keepalive_interval'] = str(time_in_secs) + db.mod_entry('MCLAG_DOMAIN', domain_id, fvs) + + +#session timeout config +@mclag.command('session-timeout') +@click.argument('domain_id', metavar='', required=True) +@click.argument('time_in_secs', metavar='', required=True, type=int) +@click.pass_context +def config_mclag_session_timeout(ctx, domain_id, time_in_secs): + """Configure MCLAG Session timeout value in secs""" + db = ctx.obj['db'] + entry = db.get_entry('MCLAG_DOMAIN', domain_id) + if len(entry) == 0: + ctx.fail("MCLAG Domain " + domain_id + " not configured, configure mclag domain first") + + status, error_info = mclag_session_timeout_valid(time_in_secs) + if status is not True: + ctx.fail(error_info) + + ka = entry.get('keepalive_interval') + if ka is None: + # assign default value + int_ka = 1 + else: + int_ka = int(ka) + + status, error_info = mclag_ka_session_dep_check(int_ka, time_in_secs) + if status is not True: + ctx.fail(error_info) + + fvs = {} + fvs['session_timeout'] = str(time_in_secs) + db.mod_entry('MCLAG_DOMAIN', domain_id, fvs) + + +#mclag interface config +@mclag.group('member') +@click.pass_context +def mclag_member(ctx): + pass + +@mclag_member.command('add') +@click.argument('domain_id', metavar='', required=True) +@click.argument('portchannel_names', metavar='', required=True) +@click.pass_context +def add_mclag_member(ctx, domain_id, portchannel_names): + """Add member MCLAG interfaces from MCLAG Domain""" + db = ctx.obj['db'] + entry = db.get_entry('MCLAG_DOMAIN', domain_id) + if len(entry) == 0: + ctx.fail("MCLAG Domain " + domain_id + " not configured, configure mclag domain first") + + portchannel_list = portchannel_names.split(",") + for portchannel_name in portchannel_list: + if is_portchannel_name_valid(portchannel_name) != True: + ctx.fail("{} is invalid!, name should have prefix '{}' and suffix '{}'" .format(portchannel_name, CFG_PORTCHANNEL_PREFIX, CFG_PORTCHANNEL_NO)) + db.set_entry('MCLAG_INTERFACE', (domain_id, portchannel_name), {'if_type':"PortChannel"} ) + +@mclag_member.command('del') +@click.argument('domain_id', metavar='', required=True) +@click.argument('portchannel_names', metavar='', required=True) +@click.pass_context +def del_mclag_member(ctx, domain_id, portchannel_names): + """Delete member MCLAG interfaces from MCLAG Domain""" + db = ctx.obj['db'] + #split comma seperated portchannel names + portchannel_list = portchannel_names.split(",") + for portchannel_name in portchannel_list: + if is_portchannel_name_valid(portchannel_name) != True: + ctx.fail("{} is invalid!, name should have prefix '{}' and suffix '{}'" .format(portchannel_name, CFG_PORTCHANNEL_PREFIX, CFG_PORTCHANNEL_NO)) + db.set_entry('MCLAG_INTERFACE', (domain_id, portchannel_name), None ) + +#mclag unique ip config +@mclag.group('unique-ip') +@click.pass_context +def mclag_unique_ip(ctx): + """Configure Unique IP on MCLAG Vlan interface""" + pass + +@mclag_unique_ip.command('add') +@click.argument('interface_names', metavar='', required=True) +@click.pass_context +def add_mclag_unique_ip(ctx, interface_names): + """Add Unique IP on MCLAG Vlan interface""" + db = ctx.obj['db'] + mclag_domain_keys = db.get_table('MCLAG_DOMAIN').keys() + if len(mclag_domain_keys) == 0: + ctx.fail("MCLAG not configured. MCLAG should be configured.") + + #split comma seperated interface names + interface_list = interface_names.split(",") + for interface_name in interface_list: + if not interface_name.startswith("Vlan"): + ctx.fail("{} is invalid!, name should have prefix '{}' and suffix '{}'" .format(interface_name, "Vlan", "vlan id")) + #VRF should be configured after unique IP configuration + intf_vrf = get_intf_vrf_bind_unique_ip(db, interface_name, "VLAN_INTERFACE") + if intf_vrf: + ctx.fail("%s is configured with Non default VRF, remove the VRF configuration and reconfigure after enabling unique IP configuration."%(str(interface_name))) + + #IP should be configured after unique IP configuration + for k,v in db.get_table('VLAN_INTERFACE').items(): + if type(k) == tuple: + (intf_name, ip) = k + if intf_name == interface_name and ip != 0: + ctx.fail("%s is configured with IP %s, remove the IP configuration and reconfigure after enabling unique IP configuration."%(str(intf_name), str(ip))) + db.set_entry('MCLAG_UNIQUE_IP', (interface_name), {'unique_ip':"enable"} ) + +@mclag_unique_ip.command('del') +@click.argument('interface_names', metavar='', required=True) +@click.pass_context +def del_mclag_unique_ip(ctx, interface_names): + """Delete Unique IP from MCLAG Vlan interface""" + db = ctx.obj['db'] + #split comma seperated interface names + interface_list = interface_names.split(",") + for interface_name in interface_list: + if not interface_name.startswith("Vlan"): + ctx.fail("{} is invalid!, name should have prefix '{}' and suffix '{}'" .format(interface_name, "Vlan", "vlan id")) + #VRF should be configured after removing unique IP configuration + intf_vrf = get_intf_vrf_bind_unique_ip(db, interface_name, "VLAN_INTERFACE") + if intf_vrf: + ctx.fail("%s is configured with Non default VRF, remove the VRF configuration and reconfigure after disabling unique IP configuration."%(str(interface_name))) + #IP should be configured after removing unique IP configuration + for k,v in db.get_table('VLAN_INTERFACE').items(): + if type(k) == tuple: + (intf_name, ip) = k + if intf_name == interface_name and ip != 0: + ctx.fail("%s is configured with IP %s, remove the IP configuration and reconfigure after disabling unique IP configuration."%(str(intf_name), str(ip))) + db.set_entry('MCLAG_UNIQUE_IP', (interface_name), None ) + +####### + diff --git a/tests/mclag_test.py b/tests/mclag_test.py new file mode 100644 index 000000000000..2d10c0ae0779 --- /dev/null +++ b/tests/mclag_test.py @@ -0,0 +1,641 @@ +import os +import traceback + +from click.testing import CliRunner + +import config.main as config +import show.main as show +from utilities_common.db import Db + + +MCLAG_DOMAIN_ID = "123" +MCLAG_INVALID_DOMAIN_ID1 = "-1" +MCLAG_INVALID_DOMAIN_ID2 = "5000" +MCLAG_DOMAIN_ID2 = "500" +MCLAG_DOMAIN_ID3 = "1000" +MCLAG_SRC_IP = "12.1.1.1" +RESERVED_IP = "0.0.0.0" +INVALID_IP = "255.255.255.255" +NOT_IP = "abcd" +MCLAG_PEER_IP = "12.1.1.2" +MCLAG_KEEPALIVE_TIMER = "5" +MCLAG_SESSION_TIMEOUT = "20" +MCLAG_MEMBER_PO = "PortChannel10" +MCLAG_MEMBER_PO2 = "PortChannel20" +MCLAG_UNIQUE_IP_VLAN = "Vlan100" + +MCLAG_PEER_LINK = "PortChannel12" +MCLAG_PEER_LINK2 = "PortChannel13" +MCLAG_INVALID_SRC_IP1 = "12::1111" +MCLAG_INVALID_SRC_IP2 = "224.1.1.1" +MCLAG_INVALID_PEER_IP1 = "12::1112" +MCLAG_INVALID_PEER_IP2 = "224.1.1.2" +MCLAG_INVALID_PEER_LINK1 = "Eth1/3" +MCLAG_INVALID_PEER_LINK2 = "Ethernet257" +MCLAG_INVALID_PEER_LINK3 = "PortChannel123456" +MCLAG_INVALID_PEER_LINK4 = "Lag111" +MCLAG_INVALID_PEER_LINK5 = "Ethernet123456789" +MCLAG_INVALID_KEEPALIVE_TIMER = "11" +MCLAG_INVALID_SESSION_TIMEOUT = "31" +MCLAG_INVALID_KEEPALIVE_TIMER_LBOUND = "0" +MCLAG_INVALID_KEEPALIVE_TIMER_UBOUND = "61" +MCLAG_INVALID_SESSION_TMOUT_LBOUND = "2" +MCLAG_INVALID_SESSION_TMOUT_UBOUND = "4000" + +MCLAG_INVALID_MCLAG_MEMBER = "Ethernet4" +MCLAG_INVALID_PORTCHANNEL1 = "portchannel" +MCLAG_INVALID_PORTCHANNEL2 = "PortChannelabcd" +MCLAG_INVALID_PORTCHANNEL3 = "PortChannel10000" +MCLAG_INVALID_PORTCHANNEL4 = "PortChannel00111" + + +MCLAG_UNIQUE_IP_INTF_INVALID1 = "Ethernet100" +MCLAG_UNIQUE_IP_INTF_INVALID2 = "Ethernet100" + +class TestMclag(object): + @classmethod + def setup_class(cls): + os.environ['UTILITIES_UNIT_TESTING'] = "1" + print("SETUP") + + def verify_mclag_domain_cfg(self, db, domain_id, src_ip="", peer_ip="", peer_link=""): + mclag_entry = db.cfgdb.get_entry("MCLAG_DOMAIN", MCLAG_DOMAIN_ID) + if len(mclag_entry) == 0: + return False + + if src_ip is not None: + temp = mclag_entry.get("source_ip") + if temp is not None and temp != src_ip: + return False + if peer_ip is not None: + temp = mclag_entry.get("peer_ip") + if temp is not None and temp != peer_ip: + return False + if peer_link is not None: + temp = mclag_entry.get("peer_link") + if temp is not None and temp != peer_link: + return False + return True + + def verify_mclag_interface(self, db, domain_id, intf_str): + keys = db.cfgdb.get_entry('MCLAG_INTERFACE', (domain_id, intf_str)) + if len(keys) != 0: + return True + return False + + def test_add_mclag_with_invalid_src_ip(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add mclag with invalid src + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_SRC_IP1, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag invalid src ip test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + def test_add_mclag_with_invalid_src_mcast_ip(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add mclag with invalid src + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_SRC_IP2, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag invalid src ip test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, RESERVED_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag invalid src ip test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert "" in result.output + + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, INVALID_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag invalid src ip test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, NOT_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag invalid src ip test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + def test_add_mclag_with_invalid_peer_ip(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add mclag with invalid peer ip + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_INVALID_PEER_IP1, MCLAG_PEER_LINK], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0, "mclag invalid peer ip test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, RESERVED_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag invalid peer ip test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, INVALID_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag invalid peer ip test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, NOT_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag invalid peer ip test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + + def test_add_mclag_with_invalid_peer_mcast_ip(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add mclag with invalid peer ip mcast + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_INVALID_PEER_IP2, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag invalid peer ip mcast test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + def test_add_mclag_with_invalid_peer_link(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add mclag with invalid peer link + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_INVALID_PEER_LINK1], obj=obj) + assert result.exit_code != 0, "mclag invalid peer link test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_INVALID_PEER_LINK2], obj=obj) + assert result.exit_code != 0, "mclag invalid peer link test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_INVALID_PEER_LINK3], obj=obj) + assert result.exit_code != 0, "mclag invalid peer link test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_INVALID_PEER_LINK4], obj=obj) + assert result.exit_code != 0, "mclag invalid peer link test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_INVALID_PEER_LINK5], obj=obj) + assert result.exit_code != 0, "mclag invalid peer link test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + def test_add_invalid_mclag_domain(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add invalid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["add"], [0, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag invalid domain test case with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + # add invalid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["add"], [5000, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag invalid domain test case with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + + + def test_add_mclag_domain(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add valid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code == 0, "mclag creation failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + # add valid mclag domain agai = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID2, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "test_mclag_domain_add_again with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + #verify config db for the mclag domain config + assert self.verify_mclag_domain_cfg(db, MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK) == True, "mclag config not found" + + def test_add_invalid_mclag_domain(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add invalid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["add"], [0, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag invalid domain test case with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + # add invalid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["add"], [5000, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag invalid domain test case with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + + + def test_add_mclag_domain(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add valid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code == 0, "mclag creation failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + #verify config db for the mclag domain config + assert self.verify_mclag_domain_cfg(db, MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK) == True, "mclag config not found" + + # add valid mclag domain again + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID2, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "test_mclag_domain_add_again with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + #verify config db for the mclag domain config + assert self.verify_mclag_domain_cfg(db, MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK) == True, "mclag config not found" + + def test_mclag_invalid_keepalive_timer(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + + # add valid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code == 0, "mclag creation failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_domain_cfg(db, MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK) == True, "mclag config not found" + + # configure non multiple keepalive timer + result = runner.invoke(config.config.commands["mclag"].commands["keepalive-interval"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_KEEPALIVE_TIMER], obj=obj) + assert result.exit_code != 0, "failed testing of invalid keepalive timer with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + # add invalid keepalive values + result = runner.invoke(config.config.commands["mclag"].commands["keepalive-interval"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_KEEPALIVE_TIMER_LBOUND], obj=obj) + assert result.exit_code != 0, "mclag invalid keepalive failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + # add invalid keepalive values + result = runner.invoke(config.config.commands["mclag"].commands["keepalive-interval"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_KEEPALIVE_TIMER_UBOUND], obj=obj) + assert result.exit_code != 0, "mclag creation keepalive failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + def test_mclag_keepalive_timer(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + + # add valid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0, "mclag creation failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_domain_cfg(db, MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK) == True, "mclag config not found" + + # configure valid keepalive timer + result = runner.invoke(config.config.commands["mclag"].commands["keepalive-interval"], [MCLAG_DOMAIN_ID, MCLAG_KEEPALIVE_TIMER], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0, "failed test for setting valid keepalive timer with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + print(result.output) + assert result.exit_code == 0, "mclag creation failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + # configure valid keepalive timer + result = runner.invoke(config.config.commands["mclag"].commands["keepalive-interval"], [MCLAG_DOMAIN_ID, MCLAG_KEEPALIVE_TIMER], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0, "failed test for setting valid keepalive timer with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + mclag_entry = db.cfgdb.get_entry("MCLAG_DOMAIN", MCLAG_DOMAIN_ID) + temp = mclag_entry.get("keepalive_interval") + assert temp is not None, "session timeout not found" + assert temp == MCLAG_KEEPALIVE_TIMER, "keepalive timer value not set" + + # configure non multiple session timeout + result = runner.invoke(config.config.commands["mclag"].commands["session-timeout"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_SESSION_TIMEOUT], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0, "failed invalid session timeout setting case" + + result = runner.invoke(config.config.commands["mclag"].commands["session-timeout"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_SESSION_TMOUT_LBOUND], obj=obj) + assert result.exit_code != 0, "mclag session timeout invalid failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + result = runner.invoke(config.config.commands["mclag"].commands["session-timeout"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_SESSION_TMOUT_UBOUND], obj=obj) + assert result.exit_code != 0, "mclag session timeout invalid failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + def test_mclag_session_timeout(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add valid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0, "mclag creation failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + # configure valid session timeout + result = runner.invoke(config.config.commands["mclag"].commands["session-timeout"], [MCLAG_DOMAIN_ID, MCLAG_SESSION_TIMEOUT], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0, "failed test for setting valid session timeout with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + mclag_entry = db.cfgdb.get_entry("MCLAG_DOMAIN", MCLAG_DOMAIN_ID) + temp = mclag_entry.get("session_timeout") + assert temp is not None, "session timeout not found" + assert temp == MCLAG_SESSION_TIMEOUT, "keepalive timer value not set" + + + def test_mclag_add_mclag_member_to_nonexisting_domain(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + mclag_cfg = db.cfgdb.get_table('MCLAG_DOMAIN') + keys = [ (k, v) for k, v in mclag_cfg if k == MCLAG_DOMAIN_ID2 ] + assert len(keys) == 0, "found mclag domain which is not expected" + + # add mclag member to non existing domain + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["add"], [MCLAG_DOMAIN_ID2, MCLAG_MEMBER_PO], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0, "testing of adding mclag member to nonexisting domain failed" + + + def test_mclag_add_invalid_member(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add valid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0, "mclag creation failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_domain_cfg(db, MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK) == True, "mclag config not found" + + # add invaid mclag member Ethernet instead of PortChannel + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_MCLAG_MEMBER], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0, "testing of adding invalid member failed" + + # add invaid mclag member Ethernet instead of PortChannel + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_PORTCHANNEL1], obj=obj) + assert result.exit_code != 0, "mclag invalid member add case failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + # add invaid mclag member Ethernet instead of PortChannel + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_PORTCHANNEL2], obj=obj) + assert result.exit_code != 0, "mclag invalid member add case failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + # add invaid mclag member Ethernet instead of PortChannel + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_PORTCHANNEL3], obj=obj) + assert result.exit_code != 0, "mclag invalid member add case failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + # add invaid mclag member Ethernet instead of PortChannel + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_PORTCHANNEL4], obj=obj) + assert result.exit_code != 0, "mclag invalid member add case failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + def test_mclag_add_member(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + + # add valid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code == 0, "mclag creation failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_domain_cfg(db, MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK) == True, "mclag config not found" + + # add valid mclag member + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO], obj=obj) + assert result.exit_code == 0, "failed adding valid mclag member with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_interface(db, MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO) == True, "mclag member not present" + + # add mclag member again + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0, "testing of adding mclag member again failed" + + # delete mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["del"], [MCLAG_DOMAIN_ID], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0, "testing of delete of mclag domain failed" + assert self.verify_mclag_interface(db, MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO) == False, "mclag member not deleted" + assert self.verify_mclag_domain_cfg(db, MCLAG_DOMAIN_ID) == False, "mclag domain not deleted" + + # add valid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code == 0, "mclag creation failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_domain_cfg(db, MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK) == True, "mclag config not found" + + # add valid mclag member + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO], obj=obj) + assert result.exit_code == 0, "mclag member add with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_interface(db, MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO) == True, "mclag member not present" + + # add valid mclag member2 + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO2], obj=obj) + assert result.exit_code == 0, "mclag member add with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_interface(db, MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO2) == True, "mclag member not present" + + + # del valid mclag member + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["del"], [MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO], obj=obj) + assert result.exit_code == 0, "mclag member deletion failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_interface(db, MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO) == False, "mclag member not deleted " + + # del mclag member + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["del"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_MCLAG_MEMBER], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0, "testing of deleting valid mclag member failed" + + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["del"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_PORTCHANNEL1], obj=obj) + assert result.exit_code != 0, "mclag invalid member del case failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["del"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_PORTCHANNEL2], obj=obj) + assert result.exit_code != 0, "mclag invalid member del case failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["del"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_PORTCHANNEL3], obj=obj) + + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["del"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_PORTCHANNEL4], obj=obj) + assert result.exit_code != 0, "mclag invalid member del case failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + + + def test_mclag_add_unique_ip(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add valid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code == 0, "mclag creation failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_domain_cfg(db, MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK) == True, "mclag config not found" + + # add mclag unique ip + result = runner.invoke(config.config.commands["mclag"].commands["unique-ip"].commands["add"], [MCLAG_UNIQUE_IP_VLAN], obj=obj) + assert result.exit_code == 0, "mclag unique ip add with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + keys = db.cfgdb.get_keys('MCLAG_UNIQUE_IP') + assert len(keys) != 0, "unique ip not conifgured" + + + # add mclag unique ip for vlan interface which already has ip + result = runner.invoke(config.config.commands["vlan"].commands["add"], ["111"], obj=db) + assert result.exit_code == 0, "add vlan for unique ip failed {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Vlan111", "111.11.11.1/24"], obj=obj) + assert result.exit_code != 0, "ip config for unique ip vlan failed {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + result = runner.invoke(config.config.commands["mclag"].commands["unique-ip"].commands["add"], ["Vlan111"], obj=obj) + assert result.exit_code == 0, "unique ip config for vlan with ip address case failed {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + keys = db.cfgdb.get_keys('MCLAG_UNIQUE_IP') + assert "Vlan111" in keys, "unique ip present config shouldn't be allowed" + + # delete mclag unique ip + result = runner.invoke(config.config.commands["mclag"].commands["unique-ip"].commands["del"], [MCLAG_UNIQUE_IP_VLAN], obj=obj) + assert result.exit_code == 0, "mclag unique ip delete case failed {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + keys = db.cfgdb.get_keys('MCLAG_UNIQUE_IP') + assert MCLAG_UNIQUE_IP_VLAN not in keys, "unique ip not conifgured" + + def test_mclag_add_unique_ip_non_default_vrf(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + result = runner.invoke(config.config.commands["vlan"].commands["add"], ["1001"], obj=db) + assert result.exit_code == 0, "add vlan for unique ip failed {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + db.cfgdb.set_entry("VLAN_INTERFACE", "Vlan1001", {"vrf_name": "vrf-red"}) + + # add mclag unique ip for non-default vrf + result = runner.invoke(config.config.commands["mclag"].commands["unique-ip"].commands["add"], ["Vlan1001"], obj=obj) + assert result.exit_code != 0, "mclag unique ip add with non default vlan interface{}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + keys = db.cfgdb.get_keys('MCLAG_UNIQUE_IP') + assert len(keys) == 0, "non default vrf unique ip goes through, config shouldn't be allowed" + + def test_mclag_not_present_domain(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # delete mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["del"], [MCLAG_DOMAIN_ID], obj=obj) + assert result.exit_code == 0, "testing non-existing domain deletion{}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + # delete invalid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["del"], [0], obj=obj) + assert result.exit_code != 0, "mclag invalid domain delete test case with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + # delete invalid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["del"], [5000], obj=obj) + assert result.exit_code != 0, "mclag invalid domain delete test case with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + + + def test_add_unique_ip_for_nonexisting_domain(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add unique_ip witout mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["unique-ip"].commands["add"], [MCLAG_UNIQUE_IP_VLAN], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0, "testing of adding uniqueip nonexisting mclag domain ailed" + + result = runner.invoke(config.config.commands["mclag"].commands["unique-ip"].commands["add"], [MCLAG_UNIQUE_IP_INTF_INVALID1], obj=obj) + assert result.exit_code != 0, "mclag invalid unique ip test case with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + runner.invoke(config.config.commands["mclag"].commands["unique-ip"].commands["add"], [MCLAG_UNIQUE_IP_INTF_INVALID2], obj=obj) + assert result.exit_code != 0, "mclag invalid unique ip test case with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + def test_add_mclag_with_invalid_domain_id(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add mclag with invalid domain_id + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_INVALID_DOMAIN_ID1, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag invalid src ip test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_INVALID_DOMAIN_ID2, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag invalid src ip test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + def test_del_mclag_with_invalid_domain_id(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # del mclag with invalid domain_id + result = runner.invoke(config.config.commands["mclag"].commands["del"], [MCLAG_INVALID_DOMAIN_ID1], obj=obj) + assert result.exit_code != 0, "mclag invalid domain id test case with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + result = runner.invoke(config.config.commands["mclag"].commands["del"], [MCLAG_INVALID_DOMAIN_ID2], obj=obj) + assert result.exit_code != 0, "mclag invalid domain id test case with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + result = runner.invoke(config.config.commands["mclag"].commands["del"], [MCLAG_DOMAIN_ID3], obj=obj) + assert result.exit_code == 0, "mclag invalid domain id test case with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + + + def test_modify_mclag_domain(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add mclag domain entry in db + db.cfgdb.set_entry("MCLAG_DOMAIN", MCLAG_DOMAIN_ID, {"source_ip": MCLAG_SRC_IP}) + + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "mclag add domain peer ip test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_domain_cfg(db, MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK) == True, "mclag config not found" + + + # modify mclag config + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code != 0, "test_mclag_domain_add_again with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_domain_cfg(db, MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK2) == True, "mclag config not modified" + + + def test_add_mclag_domain_no_peer_link(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, ""], obj=obj) + assert result.exit_code != 0, "mclag add domain peer ip test caase with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_domain_cfg(db, MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP) == False, "mclag config not found" + + def test_del_mclag_domain_with_members(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add valid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code == 0, "mclag creation failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_domain_cfg(db, MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK) == True, "mclag config not found" + + # add valid mclag member + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO], obj=obj) + assert result.exit_code == 0, "mclag member add with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_interface(db, MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO) == True, "mclag member not present" + + # add valid mclag member2 + result = runner.invoke(config.config.commands["mclag"].commands["member"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO2], obj=obj) + assert result.exit_code == 0, "mclag member add with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_interface(db, MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO2) == True, "mclag member not present" + + # delete mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["del"], [MCLAG_DOMAIN_ID], obj=obj) + assert result.exit_code == 0, "testing non-existing domain deletion{}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + assert self.verify_mclag_interface(db, MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO2) == False, "mclag member not deleted" + assert self.verify_mclag_interface(db, MCLAG_DOMAIN_ID, MCLAG_MEMBER_PO) == False, "mclag member not deleted" + assert self.verify_mclag_domain_cfg(db, MCLAG_DOMAIN_ID) == False, "mclag domain not present" + + + def test_mclag_keepalive_for_non_existent_domain(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # configure keepalive timer for non-existing domain + result = runner.invoke(config.config.commands["mclag"].commands["keepalive-interval"], [MCLAG_DOMAIN_ID, MCLAG_INVALID_KEEPALIVE_TIMER], obj=obj) + assert result.exit_code != 0, "failed testing of keepalive timer for nonexisting domain {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + + def test_mclag_keepalive_config_with_nondefault_sess_tmout(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add valid mclag domain + result = runner.invoke(config.config.commands["mclag"].commands["add"], [MCLAG_DOMAIN_ID, MCLAG_SRC_IP, MCLAG_PEER_IP, MCLAG_PEER_LINK], obj=obj) + assert result.exit_code == 0, "mclag creation failed with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + #configure valid session timeout + result = runner.invoke(config.config.commands["mclag"].commands["session-timeout"], [MCLAG_DOMAIN_ID, MCLAG_SESSION_TIMEOUT], obj=obj) + assert result.exit_code == 0, "failed test for setting valid keepalive timer with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + # configure valid keepalive timer + result = runner.invoke(config.config.commands["mclag"].commands["keepalive-interval"], [MCLAG_DOMAIN_ID, MCLAG_KEEPALIVE_TIMER], obj=obj) + assert result.exit_code == 0, "failed test for setting valid keepalive timer with code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + + def test_mclag_session_tmout_for_nonexistent_domain(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + result = runner.invoke(config.config.commands["mclag"].commands["session-timeout"], [MCLAG_DOMAIN_ID, MCLAG_SESSION_TIMEOUT], obj=obj) + assert result.exit_code != 0, "failed test for session timeout with non existent dmain code {}:{} Output:{}".format(type(result.exit_code), result.exit_code, result.output) + + + @classmethod + def teardown_class(cls): + os.environ['UTILITIES_UNIT_TESTING'] = "0" + print("TEARDOWN")