From 2e462ef6220fc901acaf1a7e06cb3d50bf75a4a0 Mon Sep 17 00:00:00 2001 From: Prince George <45705344+prgeor@users.noreply.github.com> Date: Tue, 7 Dec 2021 08:53:34 +0530 Subject: [PATCH] [sfputil] Firmware download/upgrade CLI support for QSFP-DD (#1947) * Firmware download/upgrade CLI support for QSFP-DD Signed-off-by: Prince George --- sfputil/main.py | 324 ++++++++++++++++++++++++++++++++++++++++++ tests/sfputil_test.py | 52 +++++++ 2 files changed, 376 insertions(+) diff --git a/sfputil/main.py b/sfputil/main.py index 55818d15ccf2..5eb72195d729 100644 --- a/sfputil/main.py +++ b/sfputil/main.py @@ -9,6 +9,8 @@ import sys import natsort import ast +import time +import datetime import subprocess import click @@ -26,12 +28,20 @@ PLATFORM_JSON = 'platform.json' PORT_CONFIG_INI = 'port_config.ini' +EXIT_FAIL = -1 +EXIT_SUCCESS = 0 ERROR_PERMISSIONS = 1 ERROR_CHASSIS_LOAD = 2 ERROR_SFPUTILHELPER_LOAD = 3 ERROR_PORT_CONFIG_LOAD = 4 ERROR_NOT_IMPLEMENTED = 5 ERROR_INVALID_PORT = 6 +SMBUS_BLOCK_WRITE_SIZE = 32 +# Default host password as per CMIS spec: +# http://www.qsfp-dd.com/wp-content/uploads/2021/05/CMIS5p0.pdf +CDB_DEFAULT_HOST_PASSWORD = 0x00001011 + +MAX_LPL_FIRMWARE_BLOCK_SIZE = 116 #Bytes # TODO: We should share these maps and the formatting functions between sfputil and sfpshow QSFP_DATA_MAP = { @@ -225,6 +235,17 @@ # Global logger instance log = logger.Logger(SYSLOG_IDENTIFIER) +def is_sfp_present(port_name): + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + + try: + presence = sfp.get_presence() + except NotImplementedError: + click.echo("sfp get_presence() NOT implemented!", err=True) + sys.exit(ERROR_NOT_IMPLEMENTED) + + return bool(presence) # ========================== Methods for formatting output ========================== @@ -409,6 +430,19 @@ def logical_port_name_to_physical_port_list(port_name): else: return [int(port_name)] +def logical_port_to_physical_port_index(port_name): + if not platform_sfputil.is_logical_port(port_name): + click.echo("Error: invalid port '{}'\n".format(port_name)) + print_all_valid_port_values() + sys.exit(ERROR_INVALID_PORT) + + physical_port = logical_port_name_to_physical_port_list(port_name)[0] + if physical_port is None: + click.echo("Error: No physical port found for logical port '{}'".format(port_name)) + sys.exit(EXIT_FAIL) + + return physical_port + def print_all_valid_port_values(): click.echo("Valid values for port: {}\n".format(str(platform_sfputil.logical))) @@ -806,6 +840,37 @@ def lpmode(port): click.echo(tabulate(output_table, table_header, tablefmt='simple')) +def show_firmware_version(physical_port): + try: + sfp = platform_chassis.get_sfp(physical_port) + api = sfp.get_xcvr_api() + out = api.get_module_fw_info() + click.echo(out['info']) + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) + +# 'fwversion' subcommand +@show.command() +@click.argument('port_name', metavar='', required=True) +def fwversion(port_name): + """Show firmware version of the transceiver""" + + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + + try: + presence = sfp.get_presence() + except NotImplementedError: + click.echo("sfp get_presence() NOT implemented!") + sys.exit(EXIT_FAIL) + + if not presence: + click.echo("{}: SFP EEPROM not detected\n".format(port_name)) + sys.exit(EXIT_FAIL) + + show_firmware_version(physical_port) + sys.exit(EXIT_SUCCESS) # 'lpmode' subgroup @cli.group() @@ -904,6 +969,265 @@ def reset(port_name): i += 1 +# 'firmware' subgroup +@cli.group() +def firmware(): + """Download/Upgrade firmware on the transceiver""" + pass + +def run_firmware(port_name, mode): + """ + Make the inactive firmware as the current running firmware + @port_name: + @mode: 0, 1, 2, 3 different modes to run the firmware + Returns 1 on success, and exit_code = -1 on failure + """ + status = 0 + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + + try: + api = sfp.get_xcvr_api() + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) + + if mode == 0: + click.echo("Running firmware: Non-hitless Reset to Inactive Image") + elif mode == 1: + click.echo("Running firmware: Hitless Reset to Inactive Image") + elif mode == 2: + click.echo("Running firmware: Attempt non-hitless Reset to Running Image") + elif mode == 3: + click.echo("Running firmware: Attempt Hitless Reset to Running Image") + else: + click.echo("Running firmware: Unknown mode {}".format(mode)) + sys.exit(EXIT_FAIL) + + try: + status = api.cdb_run_firmware(mode) + except NotImplementedError: + click.echo("This functionality is not applicable for this transceiver") + sys.exit(EXIT_FAIL) + + return status + +def commit_firmware(port_name): + status = 0 + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + + try: + api = sfp.get_xcvr_api() + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) + + try: + status = api.cdb_commit_firmware() + except NotImplementedError: + click.echo("This functionality is not applicable for this transceiver") + + return status + +def download_firmware(port_name, filepath): + """Download firmware on the transceiver""" + try: + fd = open(filepath, 'rb') + fd.seek(0, 2) + file_size = fd.tell() + fd.seek(0, 0) + except FileNotFoundError: + click.echo("Firmware file {} NOT found".format(filepath)) + sys.exit(EXIT_FAIL) + + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + try: + api = sfp.get_xcvr_api() + except NotImplementedError: + click.echo("This functionality is NOT applicable to this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) + + try: + fwinfo = api.get_module_fw_mgmt_feature() + if fwinfo['status'] == True: + startLPLsize, maxblocksize, lplonly_flag, autopaging_flag, writelength = fwinfo['feature'] + else: + click.echo("Failed to fetch CDB Firmware management features") + sys.exit(EXIT_FAIL) + except NotImplementedError: + click.echo("This functionality is NOT applicable for this transceiver") + sys.exit(ERROR_NOT_IMPLEMENTED) + + click.echo('CDB: Starting firmware download') + startdata = fd.read(startLPLsize) + status = api.cdb_start_firmware_download(startLPLsize, startdata, file_size) + if status != 1: + click.echo('CDB: Start firmware download failed - status {}'.format(status)) + sys.exit(EXIT_FAIL) + + # Increase the optoe driver's write max to speed up firmware download + sfp.set_optoe_write_max(SMBUS_BLOCK_WRITE_SIZE) + + with click.progressbar(length=file_size, label="Downloading ...") as bar: + address = 0 + BLOCK_SIZE = MAX_LPL_FIRMWARE_BLOCK_SIZE if lplonly_flag else maxblocksize + remaining = file_size - startLPLsize + while remaining > 0: + count = BLOCK_SIZE if remaining >= BLOCK_SIZE else remaining + data = fd.read(count) + if len(data) != count: + click.echo("Firmware file read failed!") + sys.exit(EXIT_FAIL) + + if lplonly_flag: + status = api.cdb_lpl_block_write(address, data) + else: + status = api.cdb_epl_block_write(address, data, autopaging_flag, writelength) + if (status != 1): + click.echo("CDB: firmware download failed! - status {}".format(status)) + sys.exit(EXIT_FAIL) + + bar.update(count) + address += count + remaining -= count + + # Restore the optoe driver's write max to '1' (default value) + sfp.set_optoe_write_max(1) + + status = api.cdb_firmware_download_complete() + click.echo('CDB: firmware download complete') + return status + +# 'run' subcommand +@firmware.command() +@click.argument('port_name', required=True, default=None) +@click.option('--mode', default="1", type=click.Choice(["0", "1", "2", "3"]), show_default=True, + help="0 = Non-hitless Reset to Inactive Image\n \ + 1 = Hitless Reset to Inactive Image (Default)\n \ + 2 = Attempt non-hitless Reset to Running Image\n \ + 3 = Attempt Hitless Reset to Running Image\n") +def run(port_name, mode): + """Run the firmware with default mode=1""" + + if not is_sfp_present(port_name): + click.echo("{}: SFP EEPROM not detected\n".format(port_name)) + sys.exit(EXIT_FAIL) + + status = run_firmware(port_name, int(mode)) + if status != 1: + click.echo('Failed to run firmware in mode={}! CDB status: {}'.format(mode, status)) + sys.exit(EXIT_FAIL) + + click.echo("Firmware run in mode={} success".format(mode)) + +# 'commit' subcommand +@firmware.command() +@click.argument('port_name', required=True, default=None) +def commit(port_name): + """Commit the running firmware""" + + if not is_sfp_present(port_name): + click.echo("{}: SFP EEPROM not detected\n".format(port_name)) + sys.exit(EXIT_FAIL) + + status = commit_firmware(port_name) + if status != 1: + click.echo('Failed to commit firmware! CDB status: {}'.format(status)) + sys.exit(EXIT_FAIL) + + click.echo("Firmware commit successful") + +# 'upgrade' subcommand +@firmware.command() +@click.argument('port_name', required=True, default=None) +@click.argument('filepath', required=True, default=None) +def upgrade(port_name, filepath): + """Upgrade firmware on the transceiver""" + + physical_port = logical_port_to_physical_port_index(port_name) + + if not is_sfp_present(port_name): + click.echo("{}: SFP EEPROM not detected\n".format(port_name)) + sys.exit(EXIT_FAIL) + + show_firmware_version(physical_port) + + status = download_firmware(port_name, filepath) + if status == 1: + click.echo("Firmware download complete success") + else: + click.echo("Firmware download complete failed! CDB status = {}".format(status)) + sys.exit(EXIT_FAIL) + + status = run_firmware(port_name, 1) + if status != 1: + click.echo('Failed to run firmware in mode=1 ! CDB status: {}'.format(status)) + sys.exit(EXIT_FAIL) + + click.echo("Firmware run in mode 1 successful") + + status = commit_firmware(port_name) + if status != 1: + click.echo('Failed to commit firmware! CDB status: {}'.format(status)) + sys.exit(EXIT_FAIL) + + click.echo("Firmware commit successful") + +# 'download' subcommand +@firmware.command() +@click.argument('port_name', required=True, default=None) +@click.argument('filepath', required=True, default=None) +def download(port_name, filepath): + """Download firmware on the transceiver""" + + if not is_sfp_present(port_name): + click.echo("{}: SFP EEPROM not detected\n".format(port_name)) + sys.exit(EXIT_FAIL) + + start = time.time() + status = download_firmware(port_name, filepath) + if status == 1: + click.echo("Firmware download complete success") + else: + click.echo("Firmware download complete failed! status = {}".format(status)) + sys.exit(EXIT_FAIL) + end = time.time() + click.echo("Total download Time: {}".format(str(datetime.timedelta(seconds=end-start)))) + + +# 'unlock' subcommand +@firmware.command() +@click.argument('port_name', required=True, default=None) +@click.option('--password', type=click.INT, help="Password in integer\n") +def unlock(port_name, password): + """Unlock the firmware download feature via CDB host password""" + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + + if not is_sfp_present(port_name): + click.echo("{}: SFP EEPROM not detected\n".format(port_name)) + sys.exit(EXIT_FAIL) + + try: + api = sfp.get_xcvr_api() + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) + + if password is None: + password = CDB_DEFAULT_HOST_PASSWORD + try: + status = api.cdb_enter_host_password(int(password)) + except NotImplementedError: + click.echo("This functionality is not applicable for this transceiver") + sys.exit(EXIT_FAIL) + + if status == 1: + click.echo("CDB: Host password accepted") + else: + click.echo("CDB: Host password NOT accepted! status = {}".format(status)) # 'version' subcommand @cli.command() diff --git a/tests/sfputil_test.py b/tests/sfputil_test.py index 2ef8aa6f2434..c3b6b96a9e67 100644 --- a/tests/sfputil_test.py +++ b/tests/sfputil_test.py @@ -1,6 +1,7 @@ import sys import os from unittest import mock +from unittest.mock import MagicMock, patch from .mock_tables import dbconnector @@ -189,3 +190,54 @@ def test_error_status_from_db(self): expected_output_ethernet0 = expected_output[:1] output = sfputil.fetch_error_status_from_state_db('Ethernet0', db.db) assert output == expected_output_ethernet0 + + @patch('sfputil.main.platform_chassis') + @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) + def test_show_firmware_version(self, mock_chassis): + mock_sfp = MagicMock() + mock_api = MagicMock() + mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api) + mock_sfp.get_presence.return_value = True + mock_chassis.get_sfp = MagicMock(return_value=mock_sfp) + mock_api.get_module_fw_info.return_value = {'info' : ""} + runner = CliRunner() + result = runner.invoke(sfputil.cli.commands['show'].commands['fwversion'], ["Ethernet0"]) + assert result.exit_code == 0 + + @patch('sfputil.main.platform_chassis') + @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) + def test_unlock_firmware(self, mock_chassis): + mock_sfp = MagicMock() + mock_api = MagicMock() + mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api) + mock_sfp.get_presence.return_value = True + mock_chassis.get_sfp = MagicMock(return_value=mock_sfp) + mock_api.cdb_enter_host_password.return_value = 0 + runner = CliRunner() + result = runner.invoke(sfputil.cli.commands['firmware'].commands['unlock'], ["Ethernet0"]) + assert result.exit_code == 0 + + + @patch('sfputil.main.platform_chassis') + @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) + def test_run_firmwre(self, mock_chassis): + mock_sfp = MagicMock() + mock_api = MagicMock() + mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api) + mock_sfp.get_presence.return_value = True + mock_chassis.get_sfp = MagicMock(return_value=mock_sfp) + mock_api.cdb_run_firmware.return_value = 1 + status = sfputil.run_firmware("Ethernet0", 1) + assert status == 1 + + @patch('sfputil.main.platform_chassis') + @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) + def test_commit_firmwre(self, mock_chassis): + mock_sfp = MagicMock() + mock_api = MagicMock() + mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api) + mock_sfp.get_presence.return_value = True + mock_chassis.get_sfp = MagicMock(return_value=mock_sfp) + mock_api.cdb_commit_firmware.return_value = 1 + status = sfputil.commit_firmware("Ethernet0") + assert status == 1