Skip to content

Commit

Permalink
Fix generate params regions lookup + refactor pipeline management (#584)
Browse files Browse the repository at this point in the history
* Fix generate params regions lookup

**Why?**

With the changes introduced in PR #525, support was added for different
deployment map providers.

This changed the location of the pipeline parameters to the scope of the
source. In the case of our S3 deployment map provider, that changed:
`/deployment/${pipeline_name}/regions` to
`/deployment/S3/${pipeline_name}/regions`.

In the `generate_params.py` helper, it still continued to look for the regions
in the original SSM Parameter Store location.

**What?**

* Refactored pipeline management to separate execution details from the
  pipeline definition, the pipeline input, and the SSM parameters retrieved.
  * Renaming `input` to `pipeline_input` to clearly indicate what input we are
    referring to.
* Refactored line length issues spotted while making changes to forward the
  deployment map source and name.
* Added the `deployment_map_source` and `deployment_map_name` context to the
  ADF Pipeline Management state machine.
* Fixed an issue in the pipeline generation, where the EventBridge rule would
  be defined if the account_id would be set when CodeCommit was not used as
  the source provider.
* Removed
  `${bootstrap_repo}/adf-build/shared/cdk/generate_pipeline_inputs.py` as that
  is replaced by the ADF Pipeline Management using the Step Function state
  machine instead.
* Moved flattening lists to the list_utils helper, making it easier to reuse
  this logic elsewhere. Since the lists had to be sorted and made unique each
  time this logic was moved to this function too. Reducing repeated code and
  different variations trying to achieve the same thing.

* Support debug logs in generate_params.py

**Why?**

We need more debug information to enable debugging generate params efficiently.

**What?**

* Added debug log levels.
* End-users can add the `ADF_LOG_LEVEL` environment variable and set it to
  `DEBUG` to get the logs printed.

* Merge in default deployment props into target specifications

**Why?**

The default provider, properties, and regions were not merged into
the target configuration yet.

This moved the lookup of the defaults to the execution step of the
pipeline deployment.

If we move applying the defaults to the input stage, this makes it easier
to see what will be applied at execution level. As that input is captured
in the new Pipeline Management state machine.

**What?**

* Apply default deploy provider configuration to the target configuration at
  input generation time.

* S3 Deploy Extract should use the default False when not set
  • Loading branch information
sbkok authored Jan 1, 2023
1 parent ada0648 commit a4de9ab
Show file tree
Hide file tree
Showing 25 changed files with 986 additions and 742 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,45 +14,70 @@


LOGGER = configure_logger(__name__)
DEPLOYMENT_ACCOUNT_REGION = os.environ["AWS_REGION"]
DEPLOYMENT_ACCOUNT_ID = os.environ["ACCOUNT_ID"]
PIPELINE_MANAGEMENT_STATEMACHINE = os.getenv("PIPELINE_MANAGEMENT_STATEMACHINE_ARN")
CLOUDWATCH = boto3.client("cloudwatch")
METRICS = ADFMetrics(CLOUDWATCH, "PIPELINE_MANAGEMENT/RULE")

_cache = None
_CACHE = None


def lambda_handler(pipeline, _):
"""Main Lambda Entry point"""
def lambda_handler(event, _):
"""
Main Lambda Entry point, creating the cross-account EventBridge rule
if the source account of the CodeCommit repository is in another account
than the deployment account.
Such that a change in the source repository will trigger the pipeline.
Args:
event (dict): The ADF Pipeline Management State Machine execution
input object.
"""

# pylint: disable=W0603
# Global variable here to cache across lambda execution runtimes.
global _cache
if not _cache:
_cache = Cache()
global _CACHE
if not _CACHE:
_CACHE = Cache()
METRICS.put_metric_data(
{"MetricName": "CacheInitialized", "Value": 1, "Unit": "Count"}
)

LOGGER.info(pipeline)
LOGGER.info(event)

pipeline = event['pipeline_definition']

source_provider = (
pipeline.get("default_providers", {})
.get("source", {})
.get("provider", "codecommit")
)
source_account_id = (
pipeline.get("default_providers", {})
.get("source", {})
.get("properties", {})
.get("account_id", {})
.get("account_id")
)
if (
source_account_id
source_provider == "codecommit"
and source_account_id
and int(source_account_id) != int(DEPLOYMENT_ACCOUNT_ID)
and not _cache.exists(source_account_id)
and not _CACHE.exists(source_account_id)
):
LOGGER.info(
"Source is CodeCommit and the repository is hosted in the %s "
"account instead of the deployment account (%s). Creating or "
"updating EventBridge forward rule to forward change events "
"from the source account to the deployment account in "
"EventBridge.",
source_account_id,
DEPLOYMENT_ACCOUNT_ID,
)
rule = Rule(source_account_id)
rule.create_update()
_cache.add(source_account_id, True)
_CACHE.add(source_account_id, True)
METRICS.put_metric_data(
{"MetricName": "CreateOrUpdate", "Value": 1, "Unit": "Count"}
)

return pipeline
return event
Original file line number Diff line number Diff line change
Expand Up @@ -16,47 +16,74 @@
METRICS = ADFMetrics(CLOUDWATCH, "PIPELINE_MANAGEMENT/REPO")
LOGGER = configure_logger(__name__)
DEPLOYMENT_ACCOUNT_REGION = os.environ["AWS_REGION"]
DEPLOYMENT_ACCOUNT_ID = os.environ["ACCOUNT_ID"]


def lambda_handler(pipeline, _):
"""Main Lambda Entry point"""
def lambda_handler(event, _):
"""
Main Lambda Entry point, responsible for creating the CodeCommit
repository if required.
Args:
event (dict): The ADF Pipeline Management Input event, holding the
pipeline definition and event source details.
Returns:
dict: The input event.
"""
pipeline = event.get('pipeline_definition')
source_provider = (
pipeline.get("default_providers", {})
.get("source", {})
.get("provider", "codecommit")
)
if source_provider != "codecommit":
LOGGER.debug(
"This pipeline is not a CodeCommit source provider. "
"No actions required."
)
return event

parameter_store = ParameterStore(DEPLOYMENT_ACCOUNT_REGION, boto3)
auto_create_repositories = parameter_store.fetch_parameter(
"auto_create_repositories"
)
LOGGER.info(auto_create_repositories)
if auto_create_repositories == "enabled":
code_account_id = (
pipeline.get("default_providers", {})
.get("source", {})
.get("properties", {})
.get("account_id", {})
LOGGER.debug("Auto create repositories is: %s", auto_create_repositories)
if auto_create_repositories != "enabled":
LOGGER.debug(
"ADF is not configured to automatically create CodeCommit "
"repositories if they don't exist yet."
)
has_custom_repo = (
pipeline.get("default_providers", {})
.get("source", {})
.get("properties", {})
.get("repository", {})
return event

code_account_id = (
pipeline.get("default_providers", {})
.get("source", {})
.get("properties", {})
.get("account_id")
)
has_custom_repo = (
pipeline.get("default_providers", {})
.get("source", {})
.get("properties", {})
.get("repository", {})
)
if (
code_account_id
and str(code_account_id).isdigit()
and not has_custom_repo
):
repo = Repo(
code_account_id,
pipeline.get("name"),
pipeline.get("description"),
)
if (
auto_create_repositories
and code_account_id
and str(code_account_id).isdigit()
and not has_custom_repo
):
repo = Repo(
code_account_id,
pipeline.get("name"),
pipeline.get("description"),
)
repo.create_update()
METRICS.put_metric_data(
{
"MetricName": "CreateOrUpdate",
"Value": 1,
"Unit": "Count",
}
)

return pipeline
repo.create_update()
METRICS.put_metric_data(
{
"MetricName": "CreateOrUpdate",
"Value": 1,
"Unit": "Count",
}
)

return event
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,28 @@ def store_regional_parameter_config(
either as top level regions for a pipeline or stage specific regions
These are only used to track pipelines.
"""
if pipeline.top_level_regions:
parameter_store.put_parameter(
f"/deployment/{deployment_map_source}/{pipeline.name}/regions",
str(list(set(pipeline.top_level_regions))),
)
return

parameter_store.put_parameter(
f"/deployment/{deployment_map_source}/{pipeline.name}/regions",
str(list(set(Pipeline.flatten_list(pipeline.stage_regions)))),
str(pipeline.get_all_regions()),
)


def fetch_required_ssm_params(pipeline_input, regions):
"""
Fetch the required SSM parameters for the regions of this pipeline.
Args:
pipeline_input (dict): The pipeline input dictionary.
regions ([str]): The regions of the pipeline.
Returns:
dict[str, dict[str,str] | str]:
The SSM parameters in a dictionary. Where the key is the region or
a generic SSM parameter key for this pipeline. The value is either
a dictionary holding the key/value pairs of SSM parameters, or the
value of the generic SSM parameter.
"""
output = {}
for region in regions:
parameter_store = ParameterStore(region, boto3)
Expand All @@ -61,43 +69,62 @@ def fetch_required_ssm_params(pipeline_input, regions):
output[region]["modules"] = parameter_store.fetch_parameter(
"deployment_account_bucket"
)
output['default_scm_branch'] = parameter_store.fetch_parameter(
'default_scm_branch',
output["default_scm_branch"] = parameter_store.fetch_parameter(
"default_scm_branch",
)
codestar_connection_path = (
pipeline_input
.get('default_providers', {})
.get('source')
.get('properties', {})
.get('codestar_connection_path', {})
.get("default_providers", {})
.get("source")
.get("properties", {})
.get("codestar_connection_path")
)
if codestar_connection_path:
output['codestar_connection_arn'] = (
output["codestar_connection_arn"] = (
parameter_store.fetch_parameter(codestar_connection_path)
)
return output


def generate_pipeline_inputs(pipeline, organizations, parameter_store):
def generate_pipeline_inputs(
pipeline,
deployment_map_source,
organizations,
parameter_store,
):
"""
Generate the pipeline inputs for the given pipeline definition.
Args:
pipeline (dict): The pipeline definition, as specified in the
deployment map that defines this pipeline.
deployment_map_source (str): The deployment map source (i.e. "S3").
organizations (Organizations): The Organizations class instance.
parameter_store (ParameterStore): The Parameter Store class instance.
"""
data = {}
pipeline_object = Pipeline(pipeline)
regions = []

for target in pipeline.get("targets", []):
target_structure = TargetStructure(target)
for step in target_structure.target:
regions = step.get(
"regions", pipeline.get("regions", DEPLOYMENT_ACCOUNT_REGION)
)
for raw_step in target_structure.target:
step = pipeline_object.merge_in_deploy_defaults(raw_step)
paths_tags = []
for path in step.get("path", []):
paths_tags.append(path)
if step.get("tags") is not None:
paths_tags.append(step.get("tags", {}))
for path_or_tag in paths_tags:
pipeline_object.stage_regions.append(regions)
pipeline_object.stage_regions.append(step.get("regions"))
pipeline_target = Target(
path_or_tag, target_structure, organizations, step, regions
path_or_tag,
target_structure,
organizations,
step,
)
pipeline_target.fetch_accounts_for_target()
# Targets should be a list of lists.
Expand All @@ -116,43 +143,61 @@ def generate_pipeline_inputs(pipeline, organizations, parameter_store):
if DEPLOYMENT_ACCOUNT_REGION not in regions:
pipeline_object.stage_regions.append(DEPLOYMENT_ACCOUNT_REGION)

pipeline_object.generate_input()
data["pipeline_input"] = pipeline_object.generate_input()
data["ssm_params"] = fetch_required_ssm_params(
pipeline_object.input,
pipeline_object.input["regions"] or [DEPLOYMENT_ACCOUNT_REGION]
data["pipeline_input"],
data["pipeline_input"]["regions"],
)
data["input"] = pipeline_object.input
if 'codestar_connection_arn' in data["ssm_params"]:
data['input']['default_providers']['source']['properties'][
'codestar_connection_arn'
] = data["ssm_params"]['codestar_connection_arn']
data['input']['default_scm_branch'] = data["ssm_params"].get(
'default_scm_branch',
if "codestar_connection_arn" in data["ssm_params"]:
data["pipeline_input"]["default_providers"]["source"]["properties"][
"codestar_connection_arn"
] = data["ssm_params"]["codestar_connection_arn"]
data["pipeline_input"]["default_scm_branch"] = data["ssm_params"].get(
"default_scm_branch",
)
store_regional_parameter_config(
pipeline_object,
parameter_store,
pipeline.get("deployment_map_source"),
deployment_map_source,
)
return data


def lambda_handler(pipeline, _):
"""Main Lambda Entry point"""
def lambda_handler(event, _):
"""
Main Lambda Entry point, responsible to generate the pipeline input
data based on the pipeline definition.
Args:
event (dict): The ADF Pipeline Management State Machine input object,
holding the pipeline definition.
Returns:
dict: The input event enriched with the pipeline inputs and ssm
parameter values retrieved.
"""
parameter_store = ParameterStore(DEPLOYMENT_ACCOUNT_REGION, boto3)
sts = STS()
cross_account_role_name = parameter_store.fetch_parameter(
"cross_account_access_role",
)
role = sts.assume_cross_account_role(
(
f'arn:{get_partition(DEPLOYMENT_ACCOUNT_REGION)}:iam::'
f'{ROOT_ACCOUNT_ID}:role/{cross_account_role_name}-readonly'
f"arn:{get_partition(DEPLOYMENT_ACCOUNT_REGION)}:iam::"
f"{ROOT_ACCOUNT_ID}:role/{cross_account_role_name}-readonly"
),
"pipeline",
)
organizations = Organizations(role)

output = generate_pipeline_inputs(pipeline, organizations, parameter_store)
pipeline_input_data = generate_pipeline_inputs(
event.get("pipeline_definition"),
event.get("deployment_map_source"),
organizations,
parameter_store,
)

return output
return {
**event,
**pipeline_input_data,
}
Loading

0 comments on commit a4de9ab

Please sign in to comment.