From eba6a452406b780f60ceb7eb52a2de4a17b7b551 Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Tue, 3 Sep 2024 08:42:13 -0700 Subject: [PATCH] Bring back better findinmap resolution (#3579) * Bring back better findinmap resolution --- src/cfnlint/context/_mappings.py | 8 +- src/cfnlint/jsonschema/_resolvers_cfn.py | 156 +++++++++++++----- src/cfnlint/rules/functions/_BaseFn.py | 22 ++- test/unit/module/context/test_mappings.py | 13 ++ .../module/jsonschema/test_resolvers_cfn.py | 29 +++- test/unit/rules/functions/test_basefn.py | 30 ++++ 6 files changed, 193 insertions(+), 65 deletions(-) diff --git a/src/cfnlint/context/_mappings.py b/src/cfnlint/context/_mappings.py index b01fca50bb..7599aea010 100644 --- a/src/cfnlint/context/_mappings.py +++ b/src/cfnlint/context/_mappings.py @@ -32,7 +32,7 @@ def create_from_dict(cls, instance: Any) -> Mappings: for k, v in instance.items(): if k == "Fn::Transform": is_transform = True - else: + elif isinstance(k, str): result[k] = Map.create_from_dict(v) return cls(result, is_transform) except (ValueError, AttributeError) as e: @@ -65,10 +65,8 @@ def create_from_dict(cls, instance: Any) -> _MappingSecondaryKey: for k, v in instance.items(): if k == "Fn::Transform": is_transform = True - elif isinstance(v, (str, list, int, float)): + elif isinstance(k, str) and isinstance(v, (str, list, int, float)): keys[k] = v - else: - continue return cls(keys, is_transform) @@ -95,6 +93,6 @@ def create_from_dict(cls, instance: Any) -> Map: for k, v in instance.items(): if k == "Fn::Transform": is_transform = True - else: + elif isinstance(k, str): keys[k] = _MappingSecondaryKey.create_from_dict(v) return cls(keys, is_transform) diff --git a/src/cfnlint/jsonschema/_resolvers_cfn.py b/src/cfnlint/jsonschema/_resolvers_cfn.py index 9f890b5a20..b79ed57d29 100644 --- a/src/cfnlint/jsonschema/_resolvers_cfn.py +++ b/src/cfnlint/jsonschema/_resolvers_cfn.py @@ -14,6 +14,7 @@ from cfnlint.helpers import AVAILABILITY_ZONES, REGEX_SUB_PARAMETERS from cfnlint.jsonschema import ValidationError, Validator from cfnlint.jsonschema._typing import ResolutionResult +from cfnlint.jsonschema._utils import equal def unresolvable(validator: Validator, instance: Any) -> ResolutionResult: @@ -39,11 +40,12 @@ def find_in_map(validator: Validator, instance: Any) -> ResolutionResult: if len(instance) not in [3, 4]: return - default_value_found = None + default_value_found = False if len(instance) == 4: options = instance[3] if validator.is_type(options, "object"): if "DefaultValue" in options: + default_value_found = True for value, v, _ in validator.resolve_value(options["DefaultValue"]): yield value, v.evolve( context=v.context.evolve( @@ -52,7 +54,6 @@ def find_in_map(validator: Validator, instance: Any) -> ResolutionResult: ) ), ), None - default_value_found = True if not default_value_found and not validator.context.mappings.maps: if validator.context.mappings.is_transform: @@ -65,54 +66,121 @@ def find_in_map(validator: Validator, instance: Any) -> ResolutionResult: path=deque([0]), ) - if ( - validator.is_type(instance[0], "string") - and ( - validator.is_type(instance[1], "string") - or validator.is_type(instance[1], "integer") - ) - and validator.is_type(instance[2], "string") - ): - map = validator.context.mappings.maps.get(instance[0]) - if map is None: - if not default_value_found: - yield None, validator, ValidationError( - ( - f"{instance[0]!r} is not one of " - f"{list(validator.context.mappings.maps.keys())!r}" - ), - path=deque([0]), - ) - return + mappings = list(validator.context.mappings.maps.keys()) + results = [] + found_valid_combination = False + for map_name, map_v, _ in validator.resolve_value(instance[0]): + if not validator.is_type(map_name, "string"): + continue - top_key = map.keys.get(instance[1]) - if top_key is None: - if map.is_transform: - return + if all(not (equal(map_name, each)) for each in mappings): if not default_value_found: - yield None, validator, ValidationError( + results.append( ( - f"{instance[1]!r} is not one of " - f"{list(map.keys.keys())!r} for " - f"mapping {instance[0]!r}" - ), - path=deque([1]), + None, + map_v, + ValidationError( + f"{map_name!r} is not one of {mappings!r}", + path=deque([0]), + ), + ) ) - return + continue - value = top_key.keys.get(instance[2]) - if value is None: - if top_key.is_transform: - return - if not default_value_found: - yield value, validator, ValidationError( - ( - f"{instance[2]!r} is not one of " - f"{list(top_key.keys.keys())!r} for mapping " - f"{instance[0]!r} and key {instance[1]!r}" - ), - path=deque([2]), + if validator.context.mappings.maps[map_name].is_transform: + continue + + for top_level_key, top_v, _ in validator.resolve_value(instance[1]): + if validator.is_type(top_level_key, "integer"): + top_level_key = str(top_level_key) + if not validator.is_type(top_level_key, "string"): + continue + + top_level_keys = list(validator.context.mappings.maps[map_name].keys.keys()) + if all(not (equal(top_level_key, each)) for each in top_level_keys): + if not default_value_found: + results.append( + ( + None, + top_v, + ValidationError( + ( + f"{top_level_key!r} is not one of " + f"{top_level_keys!r} for mapping " + f"{map_name!r}" + ), + path=deque([1]), + ), + ) + ) + continue + + if ( + not top_level_key + or validator.context.mappings.maps[map_name] + .keys[top_level_key] + .is_transform + ): + continue + + for second_level_key, second_v, err in validator.resolve_value(instance[2]): + if validator.is_type(second_level_key, "integer"): + second_level_key = str(second_level_key) + if not validator.is_type(second_level_key, "string"): + continue + second_level_keys = list( + validator.context.mappings.maps[map_name] + .keys[top_level_key] + .keys.keys() ) + if all( + not (equal(second_level_key, each)) for each in second_level_keys + ): + if not default_value_found: + results.append( + ( + None, + second_v, + ValidationError( + ( + f"{second_level_key!r} is not " + f"one of {second_level_keys!r} " + f"for mapping {map_name!r} and " + f"key {top_level_key!r}" + ), + path=deque([2]), + ), + ) + ) + continue + + found_valid_combination = True + + for value in validator.context.mappings.maps[map_name].find_in_map( + top_level_key, + second_level_key, + ): + yield ( + value, + validator.evolve( + context=validator.context.evolve( + path=validator.context.path.evolve( + value_path=deque( + [ + "Mappings", + map_name, + top_level_key, + second_level_key, + ] + ) + ) + ) + ), + None, + ) + + if not found_valid_combination: + yield from iter(results) def get_azs(validator: Validator, instance: Any) -> ResolutionResult: diff --git a/src/cfnlint/rules/functions/_BaseFn.py b/src/cfnlint/rules/functions/_BaseFn.py index b2edd69fc3..d1777e25eb 100644 --- a/src/cfnlint/rules/functions/_BaseFn.py +++ b/src/cfnlint/rules/functions/_BaseFn.py @@ -56,6 +56,17 @@ def validator(self, validator: Validator) -> Validator: ), ) + def _clean_resolve_errors( + self, err: ValidationError, value: Any, instance: Any + ) -> ValidationError: + err.message = err.message.replace(f"{value!r}", f"{instance!r}") + err.message = f"{err.message} when {self.fn.name!r} is resolved" + if self.child_rules[self.resolved_rule]: + err.rule = self.child_rules[self.resolved_rule] + for i, err_ctx in enumerate(err.context): + err.context[i] = self._clean_resolve_errors(err_ctx, value, instance) + return err + def resolve( self, validator: Validator, @@ -92,14 +103,9 @@ def resolve( return for err in errs: - err.message = err.message.replace(f"{value!r}", f"{instance!r}") - err.message = f"{err.message} when {self.fn.name!r} is resolved" - all_errs.append(err) - - for err in all_errs: - if self.child_rules[self.resolved_rule]: - err.rule = self.child_rules[self.resolved_rule] - yield err + all_errs.append(self._clean_resolve_errors(err, value, instance)) + + yield from iter(all_errs) def _resolve_ref(self, validator, schema) -> Any: diff --git a/test/unit/module/context/test_mappings.py b/test/unit/module/context/test_mappings.py index 6769871ba9..510eaada9a 100644 --- a/test/unit/module/context/test_mappings.py +++ b/test/unit/module/context/test_mappings.py @@ -82,6 +82,19 @@ def test_transforms(): }, Mappings({}, True), ), + ( + "Invalid mappings with wrong types", + { + "A": {True: {"C": "foo"}}, + "1": {"2": {False: "foo"}}, + }, + Mappings( + { + "A": Map({}, False), + "1": Map({"2": _MappingSecondaryKey({}, False)}), + } + ), + ), ], ) def test_mapping_creation(name, mappings, expected): diff --git a/test/unit/module/jsonschema/test_resolvers_cfn.py b/test/unit/module/jsonschema/test_resolvers_cfn.py index dd647c8e37..ec98e2bcfe 100644 --- a/test/unit/module/jsonschema/test_resolvers_cfn.py +++ b/test/unit/module/jsonschema/test_resolvers_cfn.py @@ -18,10 +18,16 @@ def _resolve(name, instance, expected_results, **kwargs): resolutions = list(validator.resolve_value(instance)) + assert len(resolutions) == len( + expected_results + ), f"{name!r} got {len(resolutions)!r}" + for i, (instance, v, errors) in enumerate(resolutions): - assert instance == expected_results[i][0] - assert v.context.path.value_path == expected_results[i][1] - assert errors == expected_results[i][2] + assert instance == expected_results[i][0], f"{name!r} got {instance!r}" + assert ( + v.context.path.value_path == expected_results[i][1] + ), f"{name!r} got {v.context.path.value_path!r}" + assert errors == expected_results[i][2], f"{name!r} got {errors!r}" @pytest.mark.parametrize( @@ -153,7 +159,7 @@ def test_resolvers_ref(name, instance, response): ), ( "Invalid FindInMap with an invalid type for third element", - {"Fn::FindInMap": ["foo", "bar", ["value"]]}, + {"Fn::FindInMap": ["foo", "first", ["value"]]}, [], ), ( @@ -218,16 +224,16 @@ def test_invalid_functions(name, instance, response): ], ), ( - "Valid FindInMap with a default value", + "Valid FindInMap with bad keys and a default value", {"Fn::FindInMap": ["foo", "bar", "value", {"DefaultValue": "default"}]}, [("default", deque([4, "DefaultValue"]), None)], ), ( - "Valid FindInMap with a default value", + "Valid FindInMap with valid keys and a default value", {"Fn::FindInMap": ["foo", "first", "second", {"DefaultValue": "default"}]}, [ ("default", deque([4, "DefaultValue"]), None), - ("bar", deque([2]), None), + ("bar", deque(["Mappings", "foo", "first", "second"]), None), ], ), ( @@ -240,7 +246,8 @@ def test_invalid_functions(name, instance, response): ValidationError( ( "'bar' is not one of ['foo', " - "'transformFirstKey', 'transformSecondKey']" + "'transformFirstKey', 'transformSecondKey', " + "'integers']" ), path=deque(["Fn::FindInMap", 0]), ), @@ -300,6 +307,11 @@ def test_invalid_functions(name, instance, response): ) ], ), + ( + "Valid FindInMap with integer types", + {"Fn::FindInMap": ["integers", 1, 2]}, + [("Value", deque(["Mappings", "integers", "1", "2"]), None)], + ), ( "Valid FindInMap with a bad second key and default", {"Fn::FindInMap": ["foo", "first", "third", {"DefaultValue": "default"}]}, @@ -334,6 +346,7 @@ def test_valid_functions(name, instance, response): "foo": {"first": {"second": "bar"}}, "transformFirstKey": {"Fn::Transform": {"second": "bar"}}, "transformSecondKey": {"first": {"Fn::Transform": "bar"}}, + "integers": {"1": {"2": "Value"}}, } ) ) diff --git a/test/unit/rules/functions/test_basefn.py b/test/unit/rules/functions/test_basefn.py index 255b46652a..6363f47f06 100644 --- a/test/unit/rules/functions/test_basefn.py +++ b/test/unit/rules/functions/test_basefn.py @@ -56,6 +56,36 @@ def rule(): ) ], ), + ( + "Errors with context error", + {"Fn::Sub": "Bar"}, + {"anyOf": [{"enum": ["Foo"]}]}, + [ + ValidationError( + message=( + "{'Fn::Sub': 'Bar'} is not valid " + "under any of the given schemas " + "when '' is resolved" + ), + path=deque(["Fn::Sub"]), + validator="", + schema_path=deque(["anyOf"]), + rule=_ChildRule(), + context=[ + ValidationError( + message=( + "{'Fn::Sub': 'Bar'} is not one of " + "['Foo'] when '' is resolved" + ), + path=deque([]), + validator="enum", + schema_path=deque([0, "enum"]), + rule=_ChildRule(), + ) + ], + ) + ], + ), ], ) def test_resolve(name, instance, schema, expected, validator, rule):