Skip to content

Commit

Permalink
Fix lock encryption key retrieval (#236)
Browse files Browse the repository at this point in the history
Co-authored-by: J. Nick Koston <nick@koston.org>
  • Loading branch information
dsypniewski and bdraco committed May 23, 2024
1 parent 343f6cc commit 067989c
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 75 deletions.
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
aiohttp>=3.9.5
bleak>=0.17.0
bleak-retry-connector>=2.9.0
cryptography>=38.0.3
boto3>=1.20.24
requests>=2.28.1
3 changes: 1 addition & 2 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pytest-asyncio
pytest-cov
aiohttp>=3.9.5
bleak>=0.17.0
bleak-retry-connector>=3.4.0
cryptography>=38.0.3
boto3>=1.20.24
requests>=2.28.1
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
name="PySwitchbot",
packages=["switchbot", "switchbot.devices", "switchbot.adv_parsers"],
install_requires=[
"aiohttp>=3.9.5",
"bleak>=0.19.0",
"bleak-retry-connector>=3.4.0",
"cryptography>=39.0.0",
"pyOpenSSL>=23.0.0",
"boto3>=1.20.24",
"requests>=2.28.1",
],
version="0.44.1",
description="A library to communicate with Switchbot",
Expand Down
11 changes: 2 additions & 9 deletions switchbot/api_config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
# Those values have been obtained from the following files in SwitchBot Android app
# That's how you can verify them yourself
# /assets/switchbot_config.json
# /res/raw/amplifyconfiguration.json
# /res/raw/awsconfiguration.json

SWITCHBOT_APP_API_BASE_URL = "https://l9ren7efdj.execute-api.us-east-1.amazonaws.com"
SWITCHBOT_APP_COGNITO_POOL = {
"PoolId": "us-east-1_x1fixo5LC",
"AppClientId": "66r90hdllaj4nnlne4qna0muls",
"AppClientSecret": "1v3v7vfjsiggiupkeuqvsovg084e3msbefpj9rgh611u30uug6t8",
"Region": "us-east-1",
}
SWITCHBOT_APP_API_BASE_URL = "api.switchbot.net"
SWITCHBOT_APP_CLIENT_ID = "5nnwmhmsa9xxskm14hd85lm9bm"
8 changes: 8 additions & 0 deletions switchbot/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
DEFAULT_SCAN_TIMEOUT = 5


class SwitchbotApiError(RuntimeError):
"""Raised when API call fails.
This exception inherits from RuntimeError to avoid breaking existing code
but will be changed to Exception in a future release.
"""


class SwitchbotAuthenticationError(RuntimeError):
"""Raised when authentication fails.
Expand Down
137 changes: 77 additions & 60 deletions switchbot/devices/lock.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
"""Library to handle connection with Switchbot Lock."""
from __future__ import annotations

import base64
import hashlib
import hmac
import json
import asyncio
import logging
import time
from typing import Any

import boto3
import requests
import aiohttp
from bleak.backends.device import BLEDevice
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

from ..api_config import SWITCHBOT_APP_API_BASE_URL, SWITCHBOT_APP_COGNITO_POOL
from ..api_config import SWITCHBOT_APP_API_BASE_URL, SWITCHBOT_APP_CLIENT_ID
from ..const import (
LockStatus,
SwitchbotAccountConnectionError,
SwitchbotApiError,
SwitchbotAuthenticationError,
)
from .device import SwitchbotDevice, SwitchbotOperationError
Expand Down Expand Up @@ -86,77 +83,97 @@ async def verify_encryption_key(

return lock_info is not None

@staticmethod
async def api_request(
session: aiohttp.ClientSession,
subdomain: str,
path: str,
data: dict = None,
headers: dict = None,
) -> dict:
url = f"https://{subdomain}.{SWITCHBOT_APP_API_BASE_URL}/{path}"
async with session.post(url, json=data, headers=headers) as result:
if result.status > 299:
raise SwitchbotApiError(
f"Unexpected status code returned by SwitchBot API: {result.status}"
)

response = await result.json()
if response["statusCode"] != 100:
raise SwitchbotApiError(
f"{response['message']}, status code: {response['statusCode']}"
)

return response["body"]

# Old non-async method preserved for backwards compatibility
@staticmethod
def retrieve_encryption_key(device_mac: str, username: str, password: str):
async def async_fn():
async with aiohttp.ClientSession() as session:
return await SwitchbotLock.async_retrieve_encryption_key(
session, device_mac, username, password
)

return asyncio.run(async_fn())

@staticmethod
async def async_retrieve_encryption_key(
session: aiohttp.ClientSession, device_mac: str, username: str, password: str
) -> dict:
"""Retrieve lock key from internal SwitchBot API."""
device_mac = device_mac.replace(":", "").replace("-", "").upper()
msg = bytes(username + SWITCHBOT_APP_COGNITO_POOL["AppClientId"], "utf-8")
secret_hash = base64.b64encode(
hmac.new(
SWITCHBOT_APP_COGNITO_POOL["AppClientSecret"].encode(),
msg,
digestmod=hashlib.sha256,
).digest()
).decode()

cognito_idp_client = boto3.client(
"cognito-idp", region_name=SWITCHBOT_APP_COGNITO_POOL["Region"]
)

try:
auth_response = cognito_idp_client.initiate_auth(
ClientId=SWITCHBOT_APP_COGNITO_POOL["AppClientId"],
AuthFlow="USER_PASSWORD_AUTH",
AuthParameters={
"USERNAME": username,
"PASSWORD": password,
"SECRET_HASH": secret_hash,
auth_result = await SwitchbotLock.api_request(
session,
"account",
"account/api/v1/user/login",
{
"clientId": SWITCHBOT_APP_CLIENT_ID,
"username": username,
"password": password,
"grantType": "password",
"verifyCode": "",
},
)
except cognito_idp_client.exceptions.NotAuthorizedException as err:
raise SwitchbotAuthenticationError(
f"Failed to authenticate: {err}"
) from err
auth_headers = {"authorization": auth_result["access_token"]}
except Exception as err:
raise SwitchbotAuthenticationError(
f"Unexpected error during authentication: {err}"
) from err
raise SwitchbotAuthenticationError(f"Authentication failed: {err}") from err

if (
auth_response is None
or "AuthenticationResult" not in auth_response
or "AccessToken" not in auth_response["AuthenticationResult"]
):
raise SwitchbotAuthenticationError("Unexpected authentication response")
try:
userinfo = await SwitchbotLock.api_request(
session, "account", "account/api/v1/user/userinfo", {}, auth_headers
)
if "botRegion" in userinfo and userinfo["botRegion"] != "":
region = userinfo["botRegion"]
else:
region = "us"
except Exception as err:
raise SwitchbotAccountConnectionError(
f"Failed to retrieve SwitchBot Account user details: {err}"
) from err

access_token = auth_response["AuthenticationResult"]["AccessToken"]
try:
key_response = requests.post(
url=SWITCHBOT_APP_API_BASE_URL + "/developStage/keys/v1/communicate",
headers={"authorization": access_token},
json={
device_info = await SwitchbotLock.api_request(
session,
f"wonderlabs.{region}",
"wonder/keys/v1/communicate",
{
"device_mac": device_mac,
"keyType": "user",
},
timeout=10,
auth_headers,
)
except requests.exceptions.RequestException as err:

return {
"key_id": device_info["communicationKey"]["keyId"],
"encryption_key": device_info["communicationKey"]["key"],
}
except Exception as err:
raise SwitchbotAccountConnectionError(
f"Failed to retrieve encryption key from SwitchBot Account: {err}"
) from err
if key_response.status_code > 299:
raise SwitchbotAuthenticationError(
f"Unexpected status code returned by SwitchBot Account API: {key_response.status_code}"
)
key_response_content = json.loads(key_response.content)
if key_response_content["statusCode"] != 100:
raise SwitchbotAuthenticationError(
f"Unexpected status code returned by SwitchBot API: {key_response_content['statusCode']}"
)

return {
"key_id": key_response_content["body"]["communicationKey"]["keyId"],
"encryption_key": key_response_content["body"]["communicationKey"]["key"],
}

async def lock(self) -> bool:
"""Send lock command."""
Expand Down

0 comments on commit 067989c

Please sign in to comment.