From c9fe627030591d434a8dcc46abeea34ed8bbf24a Mon Sep 17 00:00:00 2001 From: peppelinux Date: Tue, 16 Feb 2021 18:14:15 +0100 Subject: [PATCH] Admin metadata upload form validation --- testenv/server.py | 2 +- testenv/spmetadata.py | 2 +- testenv/storages.py | 16 +++++++++++++--- testenv/tests/test_parsers.py | 9 +++++---- testenv/utils.py | 13 ++++++++----- 5 files changed, 28 insertions(+), 14 deletions(-) diff --git a/testenv/server.py b/testenv/server.py index 9813ba22..7da6f626 100644 --- a/testenv/server.py +++ b/testenv/server.py @@ -922,7 +922,7 @@ def _wsgiconf(self): _cnf = { 'host': self._config.host, 'port': self._config.port, - 'debug': self._config.debug, + 'debug': self._config.debug } if self._config.https: key = self._config.https_key_file_path diff --git a/testenv/spmetadata.py b/testenv/spmetadata.py index 925316c9..9b32c44e 100644 --- a/testenv/spmetadata.py +++ b/testenv/spmetadata.py @@ -283,7 +283,7 @@ def __init__(self, conf): def load(self, entity_id): metadata = self._provider.get(entity_id) - if metadata is None: + if not metadata: raise MetadataNotFoundError(entity_id) return ServiceProviderMetadata(metadata, self, 'db') diff --git a/testenv/storages.py b/testenv/storages.py index cecd817f..efc51c43 100644 --- a/testenv/storages.py +++ b/testenv/storages.py @@ -11,7 +11,9 @@ from sqlalchemy import Column, Integer, String, create_engine from sqlalchemy.dialects.postgresql import JSON from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import sessionmaker, validates + +from .utils import saml_to_dict Base = declarative_base() @@ -228,8 +230,16 @@ def __init__(self, model, session, class DatabaseSPProvider(AbstractDatabaseProvider): class DatabaseSPRecord(Base): __tablename__ = 'spmetadata' - entity_id = Column(String, primary_key=True) - body = Column(String) + entity_id = Column(String, primary_key=True, nullable=False) + body = Column(String, nullable=False) + + @validates('body') + def validate_body(self, key, body): + try: + saml_to_dict(body) + except Exception as e: + raise AssertionError(f'Metadata invalid: {e}') + return body TABLE = DatabaseSPRecord diff --git a/testenv/tests/test_parsers.py b/testenv/tests/test_parsers.py index ed805c71..e93749dc 100644 --- a/testenv/tests/test_parsers.py +++ b/testenv/tests/test_parsers.py @@ -8,7 +8,8 @@ from lxml import objectify from testenv.exceptions import ( - DeserializationError, RequestParserError, SPIDValidationError, XMLFormatValidationError, XMLSchemaValidationError, + DeserializationError, RequestParserError, SPIDValidationError, ValidationError, XMLFormatValidationError, + XMLSchemaValidationError, ) from testenv.parser import HTTPPostRequestParser, HTTPRedirectRequestParser, HTTPRequestDeserializer, SAMLTree from testenv.tests.utils import FakeRequest @@ -220,10 +221,10 @@ def test_deserialization(self): class SAMLStupidMetadataTestCase(unittest.TestCase): def test_broken_metadata_xml_valuerror(self): with open("testenv/tests/data/sp-invalid-xml-starttag.xml") as xmlstr: - self.assertEqual({}, saml_to_dict(xmlstr)) + self.assertRaises(ValidationError, saml_to_dict, xmlstr) def test_stupid_metadata_xml_valuerror(self): - saml_to_dict('test.stupido') + self.assertRaises(ValidationError, saml_to_dict, 'test.stupido') def test_nullmetadata_xml_valuerror(self): - self.assertEqual({}, saml_to_dict(None)) + self.assertRaises(ValidationError, saml_to_dict, None) diff --git a/testenv/utils.py b/testenv/utils.py index ac053f5b..b05ccaf7 100644 --- a/testenv/utils.py +++ b/testenv/utils.py @@ -11,6 +11,8 @@ from testenv import log from testenv.settings import MULTIPLE_OCCURRENCES_TAGS, SPID_ERRORS +from .exceptions import ValidationError + TIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ' TIME_FORMAT_WITH_FRAGMENT = re.compile( r'^(\d{4,4}-\d{2,2}-\d{2,2}T\d{2,2}:\d{2,2}:\d{2,2})(\.\d*)?Z?$') @@ -91,22 +93,23 @@ def saml_to_dict(xmlstr): if not xmlstr: logger.error(f'{_err_msg} [Null Value Error] on xmlstr') - return {} + raise ValidationError(_err_msg) # sometimes a bytes objects, sometimes a '_io.TextIOWrapper' object ... elif isinstance(xmlstr, io.TextIOWrapper): xmlstr = xmlstr.read() try: root = objectify.fromstring(xmlstr) - except ValueError: + except ValueError as e: logger.error(f'{_err_msg} [ValueError] on: ' f'{xmlstr[0:_trunc]}') - return {} + raise ValidationError(e) + # that's for resiliency ... - except Exception: + except Exception as e: logger.error(f'{_err_msg} [Unknown Error] on: ' f'{xmlstr[0:_trunc]}') - return {} + raise ValidationError(e) def _obj(elem): children = {}