From 8178f6dd74dab47b993ba532dd12f0cfdb5799f1 Mon Sep 17 00:00:00 2001 From: gemerden Date: Thu, 9 Apr 2020 11:45:07 +0200 Subject: [PATCH] Role authorization --- flask_httpauth.py | 128 ++++++++++++++++++++++++++++++-------------- tests/test_multi.py | 32 ++++++++++- tests/test_roles.py | 121 +++++++++++++++++++++++++++++++++++++++++ tox.ini | 5 +- 4 files changed, 245 insertions(+), 41 deletions(-) create mode 100644 tests/test_roles.py diff --git a/flask_httpauth.py b/flask_httpauth.py index 41beff8..79143cf 100644 --- a/flask_httpauth.py +++ b/flask_httpauth.py @@ -23,6 +23,7 @@ def __init__(self, scheme=None, realm=None): self.scheme = scheme self.realm = realm or "Authentication Required" self.get_password_callback = None + self.get_user_roles_callback = None self.auth_error_callback = None def default_get_password(username): @@ -38,6 +39,10 @@ def get_password(self, f): self.get_password_callback = f return f + def get_user_roles(self, f): + self.get_user_roles_callback = f + return f + def error_handler(self, f): @wraps(f) def decorated(*args, **kwargs): @@ -85,25 +90,61 @@ def get_auth_password(self, auth): return password - def login_required(self, f): - @wraps(f) - def decorated(*args, **kwargs): - auth = self.get_auth() - - # Flask normally handles OPTIONS requests on its own, but in the - # case it is configured to forward those to the application, we - # need to ignore authentication headers and let the request through - # to avoid unwanted interactions with CORS. - if request.method != 'OPTIONS': # pragma: no cover - password = self.get_auth_password(auth) - - if not self.authenticate(auth, password): - # Clear TCP receive buffer of any pending data - request.data - return self.auth_error_callback() - - return f(*args, **kwargs) - return decorated + def authorize(self, role, user, auth): + if role is None: + return True + if isinstance(role, (list, tuple)): + roles = role + else: + roles = [role] + if user is True: + user = auth + if self.get_user_roles_callback is None: # pragma: no cover + raise ValueError('get_user_roles callback is not defined') + user_roles = self.get_user_roles_callback(user) + if user_roles is None: + user_roles = {} + elif not isinstance(user_roles, (list, tuple)): + user_roles = {user_roles} + else: + user_roles = set(user_roles) + for role in roles: + if isinstance(role, (list, tuple)): + role = set(role) + if role & user_roles == role: + return True + elif role in user_roles: + return True + + def login_required(self, f=None, role=None): + if f is not None and role is not None: # pragma: no cover + raise ValueError('role is the only supported argument') + + def login_required_internal(f): + @wraps(f) + def decorated(*args, **kwargs): + auth = self.get_auth() + + # Flask normally handles OPTIONS requests on its own, but in + # the case it is configured to forward those to the + # application, we need to ignore authentication headers and + # let the request through to avoid unwanted interactions with + # CORS. + if request.method != 'OPTIONS': # pragma: no cover + password = self.get_auth_password(auth) + + user = self.authenticate(auth, password) + if not user or not self.authorize(role, user, auth): + # Clear TCP receive buffer of any pending data + request.data + return self.auth_error_callback() + + return f(*args, **kwargs) + return decorated + + if f: + return login_required_internal(f) + return login_required_internal def username(self): if not request.authorization: @@ -271,23 +312,32 @@ def __init__(self, main_auth, *args): self.main_auth = main_auth self.additional_auth = args - def login_required(self, f): - @wraps(f) - def decorated(*args, **kwargs): - selected_auth = None - if 'Authorization' in request.headers: - try: - scheme, creds = request.headers['Authorization'].split( - None, 1) - except ValueError: - # malformed Authorization header - pass - else: - for auth in self.additional_auth: - if auth.scheme == scheme: - selected_auth = auth - break - if selected_auth is None: - selected_auth = self.main_auth - return selected_auth.login_required(f)(*args, **kwargs) - return decorated + def login_required(self, f=None, role=None): + if f is not None and role is not None: # pragma: no cover + raise ValueError('role is the only supported argument') + + def login_required_internal(f): + @wraps(f) + def decorated(*args, **kwargs): + selected_auth = None + if 'Authorization' in request.headers: + try: + scheme, creds = request.headers[ + 'Authorization'].split(None, 1) + except ValueError: + # malformed Authorization header + pass + else: + for auth in self.additional_auth: + if auth.scheme == scheme: + selected_auth = auth + break + if selected_auth is None: + selected_auth = self.main_auth + return selected_auth.login_required(role=role)(f)( + *args, **kwargs) + return decorated + + if f: + return login_required_internal(f) + return login_required_internal diff --git a/tests/test_multi.py b/tests/test_multi.py index 175bc01..d660267 100644 --- a/tests/test_multi.py +++ b/tests/test_multi.py @@ -15,12 +15,24 @@ def setUp(self): @basic_auth.verify_password def verify_password(username, password): - return username == 'john' and password == 'hello' + if username == 'john' and password == 'hello': + return 'john' + + @basic_auth.get_user_roles + def get_basic_role(username): + if username == 'john': + return ['foo', 'bar'] @token_auth.verify_token def verify_token(token): return token == 'this-is-the-token!' + @token_auth.get_user_roles + def get_token_role(auth): + if auth['token'] == 'this-is-the-token!': + return 'foo' + return + @token_auth.error_handler def error_handler(): return 'error', 401, {'WWW-Authenticate': 'MyToken realm="Foo"'} @@ -34,6 +46,11 @@ def index(): def auth_route(): return 'access granted' + @app.route('/protected-with-role') + @multi_auth.login_required(role='foo') + def auth_role_route(): + return 'role access granted' + self.app = app self.client = app.test_client() @@ -86,3 +103,16 @@ def test_multi_malformed_header(self): response = self.client.get( '/protected', headers={'Authorization': 'token-without-scheme'}) self.assertEqual(response.status_code, 401) + + def test_multi_auth_login_valid_basic_role(self): + creds = base64.b64encode(b'john:hello').decode('utf-8') + response = self.client.get( + '/protected-with-role', headers={'Authorization': + 'Basic ' + creds}) + self.assertEqual(response.data.decode('utf-8'), 'role access granted') + + def test_multi_auth_login_valid_token_role(self): + response = self.client.get( + '/protected-with-role', headers={'Authorization': + 'MyToken this-is-the-token!'}) + self.assertEqual(response.data.decode('utf-8'), 'role access granted') diff --git a/tests/test_roles.py b/tests/test_roles.py new file mode 100644 index 0000000..06bf4f7 --- /dev/null +++ b/tests/test_roles.py @@ -0,0 +1,121 @@ +import unittest +import base64 +from flask import Flask, g +from flask_httpauth import HTTPBasicAuth + + +class HTTPAuthTestCase(unittest.TestCase): + def setUp(self): + app = Flask(__name__) + app.config['SECRET_KEY'] = 'my secret' + + roles_auth = HTTPBasicAuth() + + @roles_auth.verify_password + def roles_auth_verify_password(username, password): + g.anon = False + if username == 'john': + return password == 'hello' + elif username == 'susan': + return password == 'bye' + elif username == '': + g.anon = True + return True + return False + + @roles_auth.get_user_roles + def get_user_roles(auth): + username = auth.username + if username == 'john': + return 'normal' + elif username == 'susan': + return ('normal', 'special') + + @roles_auth.error_handler + def error_handler(): + return 'error', 403 # use a custom error status + + @app.route('/') + def index(): + return 'index' + + @app.route('/normal') + @roles_auth.login_required(role='normal') + def roles_auth_route_normal(): + return 'normal:' + roles_auth.username() + + @app.route('/special') + @roles_auth.login_required(role='special') + def roles_auth_route_special(): + return 'special:' + roles_auth.username() + + @app.route('/normal-or-special') + @roles_auth.login_required(role=('normal', 'special')) + def roles_auth_route_normal_or_special(): + return 'normal_or_special:' + roles_auth.username() + + @app.route('/normal-and-special') + @roles_auth.login_required(role=(('normal', 'special'),)) + def roles_auth_route_normal_and_special(): + return 'normal_and_special:' + roles_auth.username() + + self.app = app + self.roles_auth = roles_auth + self.client = app.test_client() + + def test_verify_roles_valid_normal_1(self): + creds = base64.b64encode(b'susan:bye').decode('utf-8') + response = self.client.get( + '/normal', headers={'Authorization': 'Basic ' + creds}) + self.assertEqual(response.data, b'normal:susan') + + def test_verify_roles_valid_normal_2(self): + creds = base64.b64encode(b'john:hello').decode('utf-8') + response = self.client.get( + '/normal', headers={'Authorization': 'Basic ' + creds}) + self.assertEqual(response.data, b'normal:john') + + def test_verify_auth_login_valid_special(self): + creds = base64.b64encode(b'susan:bye').decode('utf-8') + response = self.client.get( + '/special', headers={'Authorization': 'Basic ' + creds}) + self.assertEqual(response.data, b'special:susan') + + def test_verify_auth_login_invalid_special(self): + creds = base64.b64encode(b'john:hello').decode('utf-8') + response = self.client.get( + '/special', headers={'Authorization': 'Basic ' + creds}) + self.assertEqual(response.status_code, 403) + self.assertTrue('WWW-Authenticate' in response.headers) + + def test_verify_auth_login_valid_normal_or_special_1(self): + creds = base64.b64encode(b'susan:bye').decode('utf-8') + response = self.client.get( + '/normal-or-special', headers={'Authorization': 'Basic ' + creds}) + self.assertEqual(response.data, b'normal_or_special:susan') + + def test_verify_auth_login_valid_normal_or_special_2(self): + creds = base64.b64encode(b'john:hello').decode('utf-8') + response = self.client.get( + '/normal-or-special', headers={'Authorization': 'Basic ' + creds}) + self.assertEqual(response.data, b'normal_or_special:john') + + def test_verify_auth_login_valid_normal_and_special_1(self): + creds = base64.b64encode(b'susan:bye').decode('utf-8') + response = self.client.get( + '/normal-and-special', headers={'Authorization': 'Basic ' + creds}) + self.assertEqual(response.data, b'normal_and_special:susan') + + def test_verify_auth_login_valid_normal_and_special_2(self): + creds = base64.b64encode(b'john:hello').decode('utf-8') + response = self.client.get( + '/normal-and-special', headers={'Authorization': 'Basic ' + creds}) + self.assertEqual(response.status_code, 403) + self.assertTrue('WWW-Authenticate' in response.headers) + + def test_verify_auth_login_invalid_password(self): + creds = base64.b64encode(b'john:bye').decode('utf-8') + response = self.client.get( + '/normal', headers={'Authorization': 'Basic ' + creds}) + self.assertEqual(response.status_code, 403) + self.assertTrue('WWW-Authenticate' in response.headers) diff --git a/tox.ini b/tox.ini index 0fc21f1..590ef1d 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist=flake8,py27,py35,py36,py37,pypy,pypy3,docs,coverage +envlist=flake8,py27,py35,py36,py37,py38,pypy,pypy3,docs,coverage skip_missing_interpreters=True [testenv] @@ -29,6 +29,9 @@ basepython=python3.6 [testenv:py37] basepython=python3.7 +[testenv:py38] +basepython=python3.8 + [testenv:pypy] basepython=pypy