From 280c1601a2d06af3394c37588a539b3a018785a9 Mon Sep 17 00:00:00 2001 From: kennedyshead Date: Fri, 26 Jan 2018 19:30:48 +0100 Subject: [PATCH] fixes #11848 (#11915) Adding tests to check the component after latest patch --- .coveragerc | 1 - .../components/device_tracker/asuswrt.py | 61 ++-- .../components/device_tracker/test_asuswrt.py | 282 +++++++++++++++++- 3 files changed, 296 insertions(+), 48 deletions(-) diff --git a/.coveragerc b/.coveragerc index c1a9fa291fe90..84ca187fb3a93 100644 --- a/.coveragerc +++ b/.coveragerc @@ -341,7 +341,6 @@ omit = homeassistant/components/cover/scsgate.py homeassistant/components/device_tracker/actiontec.py homeassistant/components/device_tracker/aruba.py - homeassistant/components/device_tracker/asuswrt.py homeassistant/components/device_tracker/automatic.py homeassistant/components/device_tracker/bbox.py homeassistant/components/device_tracker/bluetooth_le_tracker.py diff --git a/homeassistant/components/device_tracker/asuswrt.py b/homeassistant/components/device_tracker/asuswrt.py index f49f54b3622d9..0d27c4b5efde7 100644 --- a/homeassistant/components/device_tracker/asuswrt.py +++ b/homeassistant/components/device_tracker/asuswrt.py @@ -25,9 +25,7 @@ CONF_PUB_KEY = 'pub_key' CONF_SSH_KEY = 'ssh_key' - DEFAULT_SSH_PORT = 22 - SECRET_GROUP = 'Password or SSH Key' PLATFORM_SCHEMA = vol.All( @@ -118,20 +116,10 @@ def __init__(self, config): self.port = config[CONF_PORT] if self.protocol == 'ssh': - if not (self.ssh_key or self.password): - _LOGGER.error("No password or private key specified") - self.success_init = False - return - self.connection = SshConnection( self.host, self.port, self.username, self.password, self.ssh_key, self.mode == 'ap') else: - if not self.password: - _LOGGER.error("No password specified") - self.success_init = False - return - self.connection = TelnetConnection( self.host, self.port, self.username, self.password, self.mode == 'ap') @@ -177,11 +165,16 @@ def get_asuswrt_data(self): """ devices = {} devices.update(self._get_wl()) - devices = self._get_arp(devices) - devices = self._get_neigh(devices) + devices.update(self._get_arp()) + devices.update(self._get_neigh(devices)) if not self.mode == 'ap': devices.update(self._get_leases(devices)) - return devices + + ret_devices = {} + for key in devices: + if devices[key].ip is not None: + ret_devices[key] = devices[key] + return ret_devices def _get_wl(self): lines = self.connection.run_command(_WL_CMD) @@ -219,18 +212,13 @@ def _get_neigh(self, cur_devices): result = _parse_lines(lines, _IP_NEIGH_REGEX) devices = {} for device in result: - if device['mac']: + if device['mac'] is not None: mac = device['mac'].upper() - devices[mac] = Device(mac, None, None) - else: - cur_devices = { - k: v for k, v in - cur_devices.items() if v.ip != device['ip'] - } - cur_devices.update(devices) - return cur_devices - - def _get_arp(self, cur_devices): + old_ip = cur_devices.get(mac, {}).ip or None + devices[mac] = Device(mac, device.get('ip', old_ip), None) + return devices + + def _get_arp(self): lines = self.connection.run_command(_ARP_CMD) if not lines: return {} @@ -240,13 +228,7 @@ def _get_arp(self, cur_devices): if device['mac']: mac = device['mac'].upper() devices[mac] = Device(mac, device['ip'], None) - else: - cur_devices = { - k: v for k, v in - cur_devices.items() if v.ip != device['ip'] - } - cur_devices.update(devices) - return cur_devices + return devices class _Connection: @@ -272,7 +254,7 @@ class SshConnection(_Connection): def __init__(self, host, port, username, password, ssh_key, ap): """Initialize the SSH connection properties.""" - super(SshConnection, self).__init__() + super().__init__() self._ssh = None self._host = host @@ -322,7 +304,7 @@ def connect(self): self._ssh.login(self._host, self._username, password=self._password, port=self._port) - super(SshConnection, self).connect() + super().connect() def disconnect(self): \ # pylint: disable=broad-except @@ -334,7 +316,7 @@ def disconnect(self): \ finally: self._ssh = None - super(SshConnection, self).disconnect() + super().disconnect() class TelnetConnection(_Connection): @@ -342,7 +324,7 @@ class TelnetConnection(_Connection): def __init__(self, host, port, username, password, ap): """Initialize the Telnet connection properties.""" - super(TelnetConnection, self).__init__() + super().__init__() self._telnet = None self._host = host @@ -361,7 +343,6 @@ def run_command(self, command): try: if not self.connected: self.connect() - self._telnet.write('{}\n'.format(command).encode('ascii')) data = (self._telnet.read_until(self._prompt_string). split(b'\n')[1:-1]) @@ -392,7 +373,7 @@ def connect(self): self._telnet.write((self._password + '\n').encode('ascii')) self._prompt_string = self._telnet.read_until(b'#').split(b'\n')[-1] - super(TelnetConnection, self).connect() + super().connect() def disconnect(self): \ # pylint: disable=broad-except @@ -402,4 +383,4 @@ def disconnect(self): \ except Exception: pass - super(TelnetConnection, self).disconnect() + super().disconnect() diff --git a/tests/components/device_tracker/test_asuswrt.py b/tests/components/device_tracker/test_asuswrt.py index 0159eec2effe0..808d3569b8bf3 100644 --- a/tests/components/device_tracker/test_asuswrt.py +++ b/tests/components/device_tracker/test_asuswrt.py @@ -5,6 +5,7 @@ from unittest import mock import voluptuous as vol +from future.backports import socket from homeassistant.setup import setup_component from homeassistant.components import device_tracker @@ -12,8 +13,9 @@ CONF_CONSIDER_HOME, CONF_TRACK_NEW, CONF_NEW_DEVICE_DEFAULTS, CONF_AWAY_HIDE) from homeassistant.components.device_tracker.asuswrt import ( - CONF_PROTOCOL, CONF_MODE, CONF_PUB_KEY, DOMAIN, - CONF_PORT, PLATFORM_SCHEMA) + CONF_PROTOCOL, CONF_MODE, CONF_PUB_KEY, DOMAIN, _ARP_REGEX, + CONF_PORT, PLATFORM_SCHEMA, Device, get_scanner, AsusWrtDeviceScanner, + _parse_lines, SshConnection, TelnetConnection) from homeassistant.const import (CONF_PLATFORM, CONF_PASSWORD, CONF_USERNAME, CONF_HOST) @@ -23,6 +25,84 @@ FAKEFILE = None +VALID_CONFIG_ROUTER_SSH = {DOMAIN: { + CONF_PLATFORM: 'asuswrt', + CONF_HOST: 'fake_host', + CONF_USERNAME: 'fake_user', + CONF_PROTOCOL: 'ssh', + CONF_MODE: 'router', + CONF_PORT: '22' +} +} + +WL_DATA = [ + 'assoclist 01:02:03:04:06:08\r', + 'assoclist 08:09:10:11:12:14\r', + 'assoclist 08:09:10:11:12:15\r' +] + +WL_DEVICES = { + '01:02:03:04:06:08': Device( + mac='01:02:03:04:06:08', ip=None, name=None), + '08:09:10:11:12:14': Device( + mac='08:09:10:11:12:14', ip=None, name=None), + '08:09:10:11:12:15': Device( + mac='08:09:10:11:12:15', ip=None, name=None) +} + +ARP_DATA = [ + '? (123.123.123.125) at 01:02:03:04:06:08 [ether] on eth0\r', + '? (123.123.123.126) at 08:09:10:11:12:14 [ether] on br0\r' + '? (123.123.123.127) at on br0\r' +] + +ARP_DEVICES = { + '01:02:03:04:06:08': Device( + mac='01:02:03:04:06:08', ip='123.123.123.125', name=None), + '08:09:10:11:12:14': Device( + mac='08:09:10:11:12:14', ip='123.123.123.126', name=None) +} + +NEIGH_DATA = [ + '123.123.123.125 dev eth0 lladdr 01:02:03:04:06:08 REACHABLE\r', + '123.123.123.126 dev br0 lladdr 08:09:10:11:12:14 STALE\r' + '123.123.123.127 dev br0 FAILED\r' +] + +NEIGH_DEVICES = { + '01:02:03:04:06:08': Device( + mac='01:02:03:04:06:08', ip='123.123.123.125', name=None), + '08:09:10:11:12:14': Device( + mac='08:09:10:11:12:14', ip='123.123.123.126', name=None) +} + +LEASES_DATA = [ + '51910 01:02:03:04:06:08 123.123.123.125 TV 01:02:03:04:06:08\r', + '79986 01:02:03:04:06:10 123.123.123.127 android 01:02:03:04:06:15\r', + '23523 08:09:10:11:12:14 123.123.123.126 * 08:09:10:11:12:14\r', +] + +LEASES_DEVICES = { + '01:02:03:04:06:08': Device( + mac='01:02:03:04:06:08', ip='123.123.123.125', name='TV'), + '08:09:10:11:12:14': Device( + mac='08:09:10:11:12:14', ip='123.123.123.126', name='') +} + +WAKE_DEVICES = { + '01:02:03:04:06:08': Device( + mac='01:02:03:04:06:08', ip='123.123.123.125', name='TV'), + '08:09:10:11:12:14': Device( + mac='08:09:10:11:12:14', ip='123.123.123.126', name='') +} + +WAKE_DEVICES_AP = { + '01:02:03:04:06:08': Device( + mac='01:02:03:04:06:08', ip='123.123.123.125', name=None), + '08:09:10:11:12:14': Device( + mac='08:09:10:11:12:14', ip='123.123.123.126', name=None) +} + def setup_module(): """Setup the test module.""" @@ -55,6 +135,24 @@ def teardown_method(self, _): except FileNotFoundError: pass + def test_parse_lines_wrong_input(self): + """Testing parse lines.""" + output = _parse_lines("asdf asdfdfsafad", _ARP_REGEX) + self.assertEqual(output, []) + + def test_get_device_name(self): + """Test for getting name.""" + scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) + scanner.last_results = WAKE_DEVICES + self.assertEqual('TV', scanner.get_device_name('01:02:03:04:06:08')) + self.assertEqual(None, scanner.get_device_name('01:02:03:04:08:08')) + + def test_scan_devices(self): + """Test for scan devices.""" + scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) + scanner.last_results = WAKE_DEVICES + self.assertEqual(list(WAKE_DEVICES), scanner.scan_devices()) + def test_password_or_pub_key_required(self): \ # pylint: disable=invalid-name """Test creating an AsusWRT scanner without a pass or pubkey.""" @@ -63,13 +161,14 @@ def test_password_or_pub_key_required(self): \ self.hass, DOMAIN, {DOMAIN: { CONF_PLATFORM: 'asuswrt', CONF_HOST: 'fake_host', - CONF_USERNAME: 'fake_user' + CONF_USERNAME: 'fake_user', + CONF_PROTOCOL: 'ssh' }}) @mock.patch( 'homeassistant.components.device_tracker.asuswrt.AsusWrtDeviceScanner', return_value=mock.MagicMock()) - def test_get_scanner_with_password_no_pubkey(self, asuswrt_mock): \ + def test_get_scanner_with_password_no_pubkey(self, asuswrt_mock): \ # pylint: disable=invalid-name """Test creating an AsusWRT scanner with a password and no pubkey.""" conf_dict = { @@ -99,7 +198,7 @@ def test_get_scanner_with_password_no_pubkey(self, asuswrt_mock): \ @mock.patch( 'homeassistant.components.device_tracker.asuswrt.AsusWrtDeviceScanner', return_value=mock.MagicMock()) - def test_get_scanner_with_pubkey_no_password(self, asuswrt_mock): \ + def test_get_scanner_with_pubkey_no_password(self, asuswrt_mock): \ # pylint: disable=invalid-name """Test creating an AsusWRT scanner with a pubkey and no password.""" conf_dict = { @@ -178,7 +277,7 @@ def test_ssh_login_with_password(self): password='fake_pass', port=22) ) - def test_ssh_login_without_password_or_pubkey(self): \ + def test_ssh_login_without_password_or_pubkey(self): \ # pylint: disable=invalid-name """Test that login is not called without password or pub_key.""" ssh = mock.MagicMock() @@ -249,7 +348,7 @@ def test_telnet_login_with_password(self): mock.call(b'#') ) - def test_telnet_login_without_password(self): \ + def test_telnet_login_without_password(self): \ # pylint: disable=invalid-name """Test that login is not called without password or pub_key.""" telnet = mock.MagicMock() @@ -277,3 +376,172 @@ def test_telnet_login_without_password(self): \ assert setup_component(self.hass, DOMAIN, {DOMAIN: conf_dict}) telnet.login.assert_not_called() + + def test_get_asuswrt_data(self): + """Test aususwrt data fetch.""" + scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) + scanner._get_wl = mock.Mock() + scanner._get_arp = mock.Mock() + scanner._get_neigh = mock.Mock() + scanner._get_leases = mock.Mock() + scanner._get_wl.return_value = WL_DEVICES + scanner._get_arp.return_value = ARP_DEVICES + scanner._get_neigh.return_value = NEIGH_DEVICES + scanner._get_leases.return_value = LEASES_DEVICES + self.assertEqual(WAKE_DEVICES, scanner.get_asuswrt_data()) + + def test_get_asuswrt_data_ap(self): + """Test for get asuswrt_data in ap mode.""" + conf = VALID_CONFIG_ROUTER_SSH.copy()[DOMAIN] + conf[CONF_MODE] = 'ap' + scanner = AsusWrtDeviceScanner(conf) + scanner._get_wl = mock.Mock() + scanner._get_arp = mock.Mock() + scanner._get_neigh = mock.Mock() + scanner._get_leases = mock.Mock() + scanner._get_wl.return_value = WL_DEVICES + scanner._get_arp.return_value = ARP_DEVICES + scanner._get_neigh.return_value = NEIGH_DEVICES + scanner._get_leases.return_value = LEASES_DEVICES + self.assertEqual(WAKE_DEVICES_AP, scanner.get_asuswrt_data()) + + def test_update_info(self): + """Test for update info.""" + scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) + scanner.get_asuswrt_data = mock.Mock() + scanner.get_asuswrt_data.return_value = WAKE_DEVICES + self.assertTrue(scanner._update_info()) + self.assertTrue(scanner.last_results, WAKE_DEVICES) + scanner.success_init = False + self.assertFalse(scanner._update_info()) + + @mock.patch( + 'homeassistant.components.device_tracker.asuswrt.SshConnection') + def test_get_wl(self, mocked_ssh): + """Testing wl.""" + mocked_ssh.run_command.return_value = WL_DATA + scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) + scanner.connection = mocked_ssh + self.assertEqual(WL_DEVICES, scanner._get_wl()) + mocked_ssh.run_command.return_value = '' + self.assertEqual({}, scanner._get_wl()) + + @mock.patch( + 'homeassistant.components.device_tracker.asuswrt.SshConnection') + def test_get_arp(self, mocked_ssh): + """Testing arp.""" + mocked_ssh.run_command.return_value = ARP_DATA + + scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) + scanner.connection = mocked_ssh + self.assertEqual(ARP_DEVICES, scanner._get_arp()) + mocked_ssh.run_command.return_value = '' + self.assertEqual({}, scanner._get_arp()) + + @mock.patch( + 'homeassistant.components.device_tracker.asuswrt.SshConnection') + def test_get_neigh(self, mocked_ssh): + """Testing neigh.""" + mocked_ssh.run_command.return_value = NEIGH_DATA + + scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) + scanner.connection = mocked_ssh + self.assertEqual(NEIGH_DEVICES, scanner._get_neigh(ARP_DEVICES.copy())) + mocked_ssh.run_command.return_value = '' + self.assertEqual({}, scanner._get_neigh(ARP_DEVICES.copy())) + + @mock.patch( + 'homeassistant.components.device_tracker.asuswrt.SshConnection') + def test_get_leases(self, mocked_ssh): + """Testing leases.""" + mocked_ssh.run_command.return_value = LEASES_DATA + + scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) + scanner.connection = mocked_ssh + self.assertEqual( + LEASES_DEVICES, scanner._get_leases(NEIGH_DEVICES.copy())) + mocked_ssh.run_command.return_value = '' + self.assertEqual({}, scanner._get_leases(NEIGH_DEVICES.copy())) + + +class TestSshConnection(unittest.TestCase): + """Testing SshConnection.""" + + def setUp(self): + """Setup test env.""" + self.connection = SshConnection( + 'fake', 'fake', 'fake', 'fake', 'fake', 'fake') + self.connection._connected = True + + def test_run_command_exception_eof(self): + """Testing exception in run_command.""" + from pexpect import exceptions + self.connection._ssh = mock.Mock() + self.connection._ssh.sendline = mock.Mock() + self.connection._ssh.sendline.side_effect = exceptions.EOF('except') + self.connection.run_command('test') + self.assertFalse(self.connection._connected) + self.assertIsNone(self.connection._ssh) + + def test_run_command_exception_pxssh(self): + """Testing exception in run_command.""" + from pexpect import pxssh + self.connection._ssh = mock.Mock() + self.connection._ssh.sendline = mock.Mock() + self.connection._ssh.sendline.side_effect = pxssh.ExceptionPxssh( + 'except') + self.connection.run_command('test') + self.assertFalse(self.connection._connected) + self.assertIsNone(self.connection._ssh) + + def test_run_command_assertion_error(self): + """Testing exception in run_command.""" + self.connection._ssh = mock.Mock() + self.connection._ssh.sendline = mock.Mock() + self.connection._ssh.sendline.side_effect = AssertionError('except') + self.connection.run_command('test') + self.assertFalse(self.connection._connected) + self.assertIsNone(self.connection._ssh) + + +class TestTelnetConnection(unittest.TestCase): + """Testing TelnetConnection.""" + + def setUp(self): + """Setup test env.""" + self.connection = TelnetConnection( + 'fake', 'fake', 'fake', 'fake', 'fake') + self.connection._connected = True + + def test_run_command_exception_eof(self): + """Testing EOFException in run_command.""" + self.connection._telnet = mock.Mock() + self.connection._telnet.write = mock.Mock() + self.connection._telnet.write.side_effect = EOFError('except') + self.connection.run_command('test') + self.assertFalse(self.connection._connected) + + def test_run_command_exception_connection_refused(self): + """Testing ConnectionRefusedError in run_command.""" + self.connection._telnet = mock.Mock() + self.connection._telnet.write = mock.Mock() + self.connection._telnet.write.side_effect = ConnectionRefusedError( + 'except') + self.connection.run_command('test') + self.assertFalse(self.connection._connected) + + def test_run_command_exception_gaierror(self): + """Testing socket.gaierror in run_command.""" + self.connection._telnet = mock.Mock() + self.connection._telnet.write = mock.Mock() + self.connection._telnet.write.side_effect = socket.gaierror('except') + self.connection.run_command('test') + self.assertFalse(self.connection._connected) + + def test_run_command_exception_oserror(self): + """Testing OSError in run_command.""" + self.connection._telnet = mock.Mock() + self.connection._telnet.write = mock.Mock() + self.connection._telnet.write.side_effect = OSError('except') + self.connection.run_command('test') + self.assertFalse(self.connection._connected)