Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for the meta API and update access.py #39

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
47dd28b
Update access.py: allow data response, allow url parameters on get
foreign-sub Sep 30, 2019
cc5d620
Add support for the meta api
foreign-sub Oct 1, 2019
21de65c
Update meta.py: fix search_meta_emissions
foreign-sub Oct 4, 2019
6cd7bbc
Update access.py, meta.py: pep257
foreign-sub Oct 4, 2019
573ca4d
Update access.py: delete blank line, explicit import
foreign-sub Oct 4, 2019
1a55eb4
Update access.py: cleanup, code comment
foreign-sub Oct 5, 2019
b65fb9e
Update meta.py: cleanup
foreign-sub Oct 5, 2019
f4ad354
Update access.py: avoid conflict
foreign-sub Oct 7, 2019
0a2d9a7
Merge branch 'aiofreepybox' into add-meta
foreign-sub Oct 7, 2019
ee5ec13
Update meta.py: more docstring
foreign-sub Oct 9, 2019
ae148fb
Update access.py: cleanup
foreign-sub Oct 10, 2019
51aa0c2
Update meta.py: docstring, format
foreign-sub Oct 20, 2019
0213f58
Merge branch 'aiofreepybox' into add-meta
foreign-sub Oct 21, 2019
6fa96ab
Merge branch 'aiofreepybox' into add-meta
foreign-sub Oct 27, 2019
b3a9604
Update access.py: format, cleanup
foreign-sub Nov 8, 2019
7da0750
Update access.py: Typing, catch possible timeout error
foreign-sub Nov 9, 2019
3c8e136
Update access.py: raise on access_denied
foreign-sub Nov 9, 2019
a46cf2d
Update access.py: fix return data if "error" not in resp
foreign-sub Nov 9, 2019
9d25656
Update access.py: return data error resp
foreign-sub Nov 10, 2019
f2091d8
Update meta.py: typing,update access.py: fix types
foreign-sub Nov 10, 2019
27e4c56
Update acces.py: cleanup
foreign-sub Nov 10, 2019
fab0536
Update meta.py: add missing types
foreign-sub Nov 11, 2019
53d3048
Update meta.py: add missing type
foreign-sub Nov 11, 2019
a9b5db3
Update meta.py: typing
foreign-sub Jan 22, 2020
df0a7c3
Update access.py: typing
foreign-sub Jan 22, 2020
eb9a72a
Update access.py: fix session_permissions type
foreign-sub Feb 7, 2020
3040996
Update access.py: fix typo
foreign-sub Feb 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 130 additions & 86 deletions aiofreepybox/access.py
Original file line number Diff line number Diff line change
@@ -1,147 +1,191 @@
from asyncio import TimeoutError
import hmac
import json
import logging
from typing import Any, Callable, Dict, Mapping, Optional, Tuple
from urllib.parse import urljoin
from aiofreepybox.exceptions import *

logger = logging.getLogger(__name__)
from aiohttp.client import ClientSession

from aiofreepybox.exceptions import (
AuthorizationError,
HttpRequestError,
InsufficientPermissionsError,
)

_DEFAULT_TIMEOUT = 10
_LOGGER = logging.getLogger(__name__)


class Access:
def __init__(self, session, base_url, app_token, app_id, http_timeout):
"""
Access
"""

def __init__(
self,
session: ClientSession,
base_url: str,
app_token: str,
app_id: str,
http_timeout: int,
) -> None:
self.session = session
self.base_url = base_url
self.app_token = app_token
self.app_id = app_id
self.timeout = http_timeout
self.session_token = None
self.session_permissions = None
self.session_token: Optional[str] = None
self.session_permissions: Optional[Dict[str, bool]] = None

async def _get_challenge(self, base_url, timeout=10):
'''
async def _get_challenge(
self, base_url: str, timeout: int = _DEFAULT_TIMEOUT
) -> str:
"""
Return challenge from freebox API
'''
url = urljoin(base_url, 'login')
r = await self.session.get(url, timeout=timeout)
resp = await r.json()

# raise exception if resp.success != True
if not resp.get('success'):
raise AuthorizationError('Getting challenge failed (APIResponse: {})'
.format(json.dumps(resp)))
"""

return resp['result']['challenge']
url = urljoin(base_url, "login")
async with self.session.get(url, timeout=timeout) as r:
resp = await r.json()

async def _get_session_token(self, base_url, app_token, app_id, timeout=10):
# raise exception if resp.success != True
if not resp.get("success"):
raise AuthorizationError(
"Getting challenge failed (APIResponse: {})".format(json.dumps(resp))
)

return resp["result"]["challenge"]

async def _get_session_token(
self,
base_url: str,
app_token: str,
app_id: str,
timeout: int = _DEFAULT_TIMEOUT,
) -> Tuple[str, Dict[str, bool]]:
"""
Get session token from freebox.
Returns (session_token, session_permissions)
"""

# Get challenge from API
challenge = await self._get_challenge(base_url, timeout)

# Hash app_token with chalenge key to get the password
h = hmac.new(app_token.encode(), challenge.encode(), 'sha1')
# Hash app_token with challenge key to get the password
h = hmac.new(app_token.encode(), challenge.encode(), "sha1")
password = h.hexdigest()

url = urljoin(base_url, 'login/session/')
data = json.dumps({'app_id': app_id, 'password': password})
r = await self.session.post(url, data=data, timeout=timeout)
resp = await r.json()
url = urljoin(base_url, "login/session/")
data = json.dumps({"app_id": app_id, "password": password})
async with await self.session.post(url, data=data, timeout=timeout) as r:
resp = await r.json()

# raise exception if resp.success != True
if not resp.get('success'):
raise AuthorizationError('Starting session failed (APIResponse: {})'
.format(json.dumps(resp)))
if not resp.get("success"):
raise AuthorizationError(
"Starting session failed (APIResponse: {})".format(json.dumps(resp))
)

session_token = resp.get('result').get('session_token')
session_permissions = resp.get('result').get('permissions')
session_token, session_permissions = (
resp.get("result").get("session_token"),
resp.get("result").get("permissions"),
)
return session_token, session_permissions

return(session_token, session_permissions)
async def _refresh_session_token(self) -> None:
"""Refresh session token"""

async def _refresh_session_token(self):
# Get token for the current session
session_token, session_permissions = await self._get_session_token(
self.base_url,
self.app_token,
self.app_id,
self.timeout)

logger.info('Session opened')
logger.info('Permissions: ' + str(session_permissions))
self.session_token = session_token
self.session_permissions = session_permissions

def _get_headers(self):
return {'X-Fbx-App-Auth': self.session_token}

async def _perform_request(self, verb, end_url, **kwargs):
'''
self.session_token, self.session_permissions = await self._get_session_token(
self.base_url, self.app_token, self.app_id, self.timeout
)
_LOGGER.info("Session opened")
_LOGGER.debug("Permissions: " + str(self.session_permissions))

def _get_headers(self) -> Dict[str, Optional[str]]:
"""Get headers"""
return {"X-Fbx-App-Auth": self.session_token}

async def _perform_request(self, verb: Callable, end_url: str, **kwargs) -> Any:
"""
Perform the given request, refreshing the session token if needed
'''
"""
if not self.session_token:
await self._refresh_session_token()

url = urljoin(self.base_url, end_url)
request_params = {
**kwargs,
"headers": self._get_headers(),
"timeout": self.timeout
"timeout": self.timeout,
}
r = await verb(url, **request_params)
try:
r = await verb(url, **request_params)
except TimeoutError as e:
raise HttpRequestError(e)

# Return response if content is not json
if r.content_type != 'application/json':
if r.content_type != "application/json":
return r
else:
resp = await r.json()

if resp.get('error_code') in ['auth_required', "invalid_session"]:
logger.debug('Invalid session')
await self._refresh_session_token()
request_params["headers"] = self._get_headers()
r = await verb(url, **request_params)
resp = await r.json()

if not resp['success']:
errMsg = 'Request failed (APIResponse: {})'.format(json.dumps(resp))
if resp.get('error_code') == 'insufficient_rights':
raise InsufficientPermissionsError(errMsg)
else:
raise HttpRequestError(errMsg)

return resp['result'] if 'result' in resp else None
resp = await r.json()

async def get(self, end_url):
'''
if resp.get("error_code") in ["auth_required", "invalid_session"]:
_LOGGER.debug("Invalid session")
self.session_token = None
return await self._perform_request(verb, end_url, **kwargs)

# Check for 'result' response success
if not resp["success"] if "success" in resp else True:
# Check for 'data' response success
if (not resp["error"] if "error" in resp else True) and resp["data"]:
# Return 'data' response
return resp.get("data", None)
elif isinstance(resp["error"], dict):
return resp.get("error", None)

error_message = "Request failed (APIResponse: {})".format(json.dumps(resp))
if resp.get("error_code") in ["insufficient_rights", "access_denied"]:
raise InsufficientPermissionsError(error_message)
raise HttpRequestError(error_message)

# Return 'result' response
return resp.get("result", None)

async def get(
self, end_url: str, params_url: Optional[Mapping[str, str]] = None
) -> Any:
"""
Send get request and return results
'''
return await self._perform_request(self.session.get, end_url)
"""
params = params_url if params_url is not None else None
return await self._perform_request(self.session.get, end_url, params=params)

async def post(self, end_url, payload=None):
'''
async def post(self, end_url: str, payload: Optional[Any] = None) -> Any:
"""
Send post request and return results
'''
"""
data = json.dumps(payload) if payload is not None else None
return await self._perform_request(self.session.post, end_url, data=data)

async def put(self, end_url, payload=None):
'''
async def put(self, end_url: str, payload: Optional[Any] = None) -> Any:
"""
Send post request and return results
'''
"""
data = json.dumps(payload) if payload is not None else None
return await self._perform_request(self.session.put, end_url, data=data)

async def delete(self, end_url, payload=None):
'''
async def delete(self, end_url: str, payload: Optional[Any] = None) -> Any:
"""
Send delete request and return results
'''
"""
data = json.dumps(payload) if payload is not None else None
return await self._perform_request(self.session.delete, end_url, data=data)

async def get_permissions(self):
'''
Returns the permissions for this session/app.
'''
async def get_permissions(self) -> Optional[Dict[str, bool]]:
"""
Returns the permissions for this session/app
"""
if not self.session_permissions:
await self._refresh_session_token()
return self.session_permissions
2 changes: 2 additions & 0 deletions aiofreepybox/aiofreepybox.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from aiofreepybox.api.connection import Connection
from aiofreepybox.api.home import Home
from aiofreepybox.api.parental import Parental
from aiofreepybox.api.meta import Meta
from aiofreepybox.api.netshare import Netshare
from aiofreepybox.api.notifications import Notifications
from aiofreepybox.api.rrd import Rrd
Expand Down Expand Up @@ -95,6 +96,7 @@ async def open(self, host, port):
self.connection = Connection(self._access)
self.home = Home(self._access)
self.parental = Parental(self._access)
self.meta = Meta(self._access)
self.netshare = Netshare(self._access)
self.notifications = Notifications(self._access)
self.rrd = Rrd(self._access)
Expand Down
Loading