Skip to content

Commit

Permalink
fix: CI
Browse files Browse the repository at this point in the history
  • Loading branch information
peppelinux committed Nov 21, 2023
1 parent 70d85bd commit 978bcab
Show file tree
Hide file tree
Showing 16 changed files with 289 additions and 5 deletions.
28 changes: 28 additions & 0 deletions example/cryptotools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import uuid

from copy import deepcopy
from django.conf import settings

from . tools import iat_now, exp_from_now, create_jws


def issue_security_jwts(audiences :list, subject :str, **kwargs) -> dict:
jwk = settings.PRIVATE_SIGNATURE_JWKS["keys"][0]
payloads = {}
data = {
"iss": getattr(settings, "BASE", "https://xdrplus-iam.acsia.org"),
"jti": str(uuid.uuid4()),
"iat": iat_now(),
"exp": exp_from_now(
minutes = getattr(settings, "DEFAULT_EXP", 3)
),
"sub": subject
}
data.update(kwargs)

for aud in audiences:
_d = deepcopy(data)
_d["aud"] = aud
payloads[aud] = create_jws(payload = _d, jwk_dict = jwk)

return payloads
3 changes: 3 additions & 0 deletions example/cryptotools/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from django.contrib import admin

# Register your models here.
6 changes: 6 additions & 0 deletions example/cryptotools/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class CryptotoolsConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'cryptotools'
46 changes: 46 additions & 0 deletions example/cryptotools/jwt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import base64
import json
import re
from typing import Union

JWT_REGEXP = r'^[\w\-]+\.[\w\-]+\.[\w\-]+'


def unpad_jwt_element(jwt: str, position: int) -> dict:
if isinstance(jwt, bytes):
jwt = jwt.decode()
b = jwt.split(".")[position]
padded = f"{b}{'=' * divmod(len(b), 4)[1]}"
data = json.loads(base64.urlsafe_b64decode(padded))
return data


def unpad_jwt_header(jwt: str) -> dict:
return unpad_jwt_element(jwt, position=0)


def unpad_jwt_payload(jwt: str) -> dict:
return unpad_jwt_element(jwt, position=1)


def get_jwk_from_jwt(jwt: Union[str, dict], provider_jwks: dict) -> dict:
"""
docs here
"""
if isinstance(jwt, str):
head = unpad_jwt_header(jwt)
elif isinstance(jwt, dict):
head = jwt

kid = head["kid"]
if isinstance(provider_jwks, dict) and provider_jwks.get('keys'):
provider_jwks = provider_jwks['keys']
for jwk in provider_jwks:
if jwk["kid"] == kid:
return jwk
return {}


def is_jwt_format(jwt: str) -> bool:
res = re.match(JWT_REGEXP, jwt)
return bool(res)
Empty file.
3 changes: 3 additions & 0 deletions example/cryptotools/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from django.db import models

# Create your models here.
3 changes: 3 additions & 0 deletions example/cryptotools/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from django.test import TestCase

# Create your tests here.
185 changes: 185 additions & 0 deletions example/cryptotools/tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
from django.utils import timezone
from django.utils.timezone import make_aware

from secrets import token_hex

import datetime
import binascii
import json
import cryptojwt
import logging
import re

from cryptojwt.jwe.jwe import factory
from cryptojwt.jwe.jwe_ec import JWE_EC
from cryptojwt.jwe.jwe_rsa import JWE_RSA
from cryptojwt.jwk.ec import new_ec_key
from cryptojwt.jwk.jwk import key_from_jwk_dict
from cryptojwt.jws.jws import JWS
from django.conf import settings

from . jwt import unpad_jwt_header

from typing import Union

DEFAULT_EC_CRV = getattr(settings, "DEFAULT_EC_CRV", "P-256")
DEFAULT_HASH_FUNC = getattr(settings, "DEFAULT_HASH_FUNC", "P256")
DEFAULT_JWE_ALG = getattr(settings, "DEFAULT_JWE_ALG", "RSA-OAEP")
DEFAULT_JWE_ENC = getattr(settings, "DEFAULT_JWE_ENC", "A256CBC-HS512")
ENCRYPTION_ENC_SUPPORTED = getattr(
settings,
"ENCRYPTION_ENC_SUPPORTED", [
"A128CBC-HS256",
"A192CBC-HS384",
"A256CBC-HS512",
"A128GCM",
"A192GCM",
"A256GCM",
]
)
SIGNING_ALG_VALUES_SUPPORTED = getattr(
settings,
"SIGNING_ALG_VALUES_SUPPORTED",
["RS256", "RS384", "RS512", "ES256", "ES384", "ES512"],
)
ENCRYPTION_ALG_VALUES_SUPPORTED = getattr(
settings,
"ENCRYPTION_ALG_VALUES_SUPPORTED",
[
"RSA-OAEP",
"RSA-OAEP-256",
"ECDH-ES",
"ECDH-ES+A128KW",
"ECDH-ES+A192KW",
"ECDH-ES+A256KW",
],
)

logger = logging.getLogger(__name__)


def iat_now() -> int:
return int(datetime.datetime.now().timestamp())


def exp_from_now(minutes: int = 33) -> int:
_now = timezone.localtime()
return int((_now + datetime.timedelta(minutes=minutes)).timestamp())


def secparams_check(payload :dict, aud : Union [str, list], allowed_clients :list) -> bool:
if not all(
(
payload.get('iss', None),
payload.get('iat', None) < iat_now(),
payload.get('exp', None) > iat_now(),
payload.get('aud', None) in (aud, re.sub("^https?://", "", aud))
)
):
return False


if isinstance(allowed_clients, str):
allowed_clients = [allowed_clients]


for i in allowed_clients:
if i == payload['iss']:
return True

return False


def datetime_from_timestamp(value) -> datetime.datetime:
return make_aware(datetime.datetime.fromtimestamp(value))


def create_jwk(key = None, hash_func=None, crv = None) -> tuple:
key = key or new_ec_key(crv = crv or DEFAULT_EC_CRV)
key.add_kid()
return key.serialize(private = True), key.serialize()


def key_from_jwk(jwk_dict: dict) -> tuple:
key = key_from_jwk_dict(jwk_dict)
return key.serialize(private = True), key.serialize()


def create_jws(payload: dict, jwk_dict: dict, alg: str = "ES256", protected:dict = {}, **kwargs) -> str:
_key = key_from_jwk_dict(jwk_dict)
_signer = JWS(payload, alg=alg, **kwargs)

jwt = _signer.sign_compact([_key], protected=protected, **kwargs)
return jwt


def verify_jws(jws: str, pub_jwk: dict, **kwargs) -> str:
_key = key_from_jwk_dict(pub_jwk)

_head = unpad_jwt_header(jws)
if _head.get("kid") != pub_jwk["kid"]: # pragma: no cover
raise Exception(
f"kid error: {_head.get('kid')} != {pub_jwk['kid']}"
)

_alg = _head["alg"]
if _alg not in SIGNING_ALG_VALUES_SUPPORTED or not _alg: # pragma: no cover
raise UnsupportedAlgorithm(f"{_alg} has beed disabled for security reason")

verifier = JWS(alg=_head["alg"], **kwargs)
msg = verifier.verify_compact(jws, [_key])
return msg


def create_jwe(plain_dict: Union[dict, str, int, None], jwk_dict: dict, **kwargs) -> str:
logger.debug(f"Encrypting dict as JWE: " f"{plain_dict}")
_key = key_from_jwk_dict(jwk_dict)

if isinstance(_key, cryptojwt.jwk.rsa.RSAKey):
JWE_CLASS = JWE_RSA
elif isinstance(_key, cryptojwt.jwk.ec.ECKey):
JWE_CLASS = JWE_EC

if isinstance(plain_dict, dict):
_payload = json.dumps(plain_dict).encode()
elif not plain_dict:
logger.warning(f"create_jwe with null payload!")
_payload = ""
elif isinstance(plain_dict, (str, int)):
_payload = plain_dict
else:
logger.error(f"create_jwe with unsupported payload type!")
_payload = ""

_keyobj = JWE_CLASS(
_payload,
alg=DEFAULT_JWE_ALG,
enc=DEFAULT_JWE_ENC,
kid=_key.kid,
**kwargs
)

jwe = _keyobj.encrypt(_key.public_key())
logger.debug(f"Encrypted dict as JWE: {jwe}")
return jwe


def decrypt_jwe(jwe: str, jwk_dict: dict) -> dict:
# get header
try:
jwe_header = unpad_jwt_header(jwe)
except (binascii.Error, Exception) as e: # pragma: no cover
logger.error(f"Failed to extract JWT header: {e}")
raise Exception("The JWT is not valid")

_alg = jwe_header.get("alg", DEFAULT_JWE_ALG)
_enc = jwe_header.get("enc", DEFAULT_JWE_ENC)
# jwe_header.get("kid")

if _alg not in ENCRYPTION_ALG_VALUES_SUPPORTED: # pragma: no cover
raise UnsupportedAlgorithm(f"{_alg} has beed disabled for security reason")

_decryptor = factory(jwe, alg=_alg, enc=_enc)
_dkey = key_from_jwk_dict(jwk_dict)
msg = _decryptor.decrypt(jwe, [_dkey])
return msg
13 changes: 13 additions & 0 deletions example/cryptotools/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import copy
from django.conf import settings
from django.http import JsonResponse
from django.shortcuts import render


def public_jwk(request):
keys = copy.deepcopy(settings.PUBLIC_SIGNATURE_JWKS)
keys['keys'].extend(settings.PUBLIC_ENCRYPTION_JWKS['keys'])
return JsonResponse(
keys, safe = False
)

1 change: 0 additions & 1 deletion example/django_idp/settingslocal.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
'allauth.account',
'allauth.socialaccount',
"allauth.socialaccount.providers.google",
"allauth.socialaccount.providers.azure",
"allauth.socialaccount.providers.microsoft",

# custom
Expand Down
2 changes: 0 additions & 2 deletions example/django_idp/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
from django.urls import include, path
# from uniauth_saml2_idp.views import LoginAuthView

from cryptotools.views import public_jwk

import os

admin.site.site_title = os.getenv("ADMIN_UI_HEADER", 'uniAuth-IAM')
Expand Down
Empty file added example/logs/django.log
Empty file.
Empty file added example/logs/uniauth.log
Empty file.
1 change: 1 addition & 0 deletions example/uniauth_saml2_idp
2 changes: 1 addition & 1 deletion requirements-customizations.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ uritemplate
django-cors-headers # for rest api

# proxy
django-allauth
django-allauth>0.58,<0.59

cryptojwt>=1.8,<=1.9

Expand Down
1 change: 0 additions & 1 deletion uniauth_saml2_idp/uniauth_saml2_idp

This file was deleted.

0 comments on commit 978bcab

Please sign in to comment.