Skip to content

Commit

Permalink
[ML][Pipelines] Support queue settings in pipeline job (#29209)
Browse files Browse the repository at this point in the history
* add queue_settings when to REST object of Command

* test: add unit test for queue settings

* test: serverless compute only has job_tier

* test: add E2E test & recording

* update test case for queue settings

* add assert to REST object back from service

* update breaking live test due to queue settings
  • Loading branch information
zhengfeiwang committed Mar 24, 2023
1 parent f02be13 commit a7ec3bc
Show file tree
Hide file tree
Showing 22 changed files with 23,045 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ def _to_job(self) -> CommandJob:

@classmethod
def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
return ["resources", "distribution", "limits", "environment_variables"]
return ["resources", "distribution", "limits", "environment_variables", "queue_settings"]

def _to_rest_object(self, **kwargs) -> dict:
rest_obj = super()._to_rest_object(**kwargs)
Expand Down
1 change: 1 addition & 0 deletions sdk/ml/azure-ai-ml/azure/ai/ml/entities/_builders/sweep.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
"objective",
"early_termination",
"search_space",
"queue_settings",
]

def _to_rest_object(self, **kwargs) -> dict:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1823,6 +1823,28 @@ def test_serverless_compute_in_pipeline(self, client: MLClient, test_path: str)
pipeline_job = load_job(yaml_path)
assert_job_cancel(pipeline_job, client)

def test_pipeline_job_serverless_compute_with_job_tier(self, client: MLClient) -> None:
yaml_path = "./tests/test_configs/pipeline_jobs/serverless_compute/job_tier/pipeline_with_job_tier.yml"
pipeline_job = load_job(yaml_path)
created_pipeline_job = assert_job_cancel(pipeline_job, client)
rest_obj = created_pipeline_job._to_rest_object()
assert rest_obj.properties.jobs["spot_job_tier"]["queue_settings"] == {"job_tier": "Spot"}
assert rest_obj.properties.jobs["standard_job_tier"]["queue_settings"] == {"job_tier": "Standard"}

def test_pipeline_job_serverless_compute_sweep_in_pipeline_with_job_tier(self, client: MLClient) -> None:
yaml_path = "./tests/test_configs/pipeline_jobs/serverless_compute/job_tier/sweep_in_pipeline/pipeline.yml"
pipeline_job = load_job(yaml_path)
created_pipeline_job = assert_job_cancel(pipeline_job, client)
rest_obj = created_pipeline_job._to_rest_object()
assert rest_obj.properties.jobs["node"]["queue_settings"] == {"job_tier": "standard"}

def test_pipeline_job_serverless_compute_automl_in_pipeline_with_job_tier(self, client: MLClient) -> None:
yaml_path = "./tests/test_configs/pipeline_jobs/serverless_compute/job_tier/automl_in_pipeline/pipeline.yml"
pipeline_job = load_job(yaml_path)
created_pipeline_job = assert_job_cancel(pipeline_job, client)
rest_obj = created_pipeline_job._to_rest_object()
assert rest_obj.properties.jobs["text_ner_node"]["queue_settings"] == {"job_tier": "spot"}

@pytest.mark.disable_mock_code_hash
def test_register_automl_output(self, client: MLClient, randstr: Callable[[str], str]):
register_pipeline_path = "./tests/test_configs/pipeline_jobs/jobs_with_automl_nodes/automl_regression_with_command_node_register_output.yml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2070,3 +2070,25 @@ def empty_value_pipeline(integer: int, boolean: bool, number: float, str_param:
rest_obj = pipeline._to_rest_object()
expect_resource = {"instance_count": "${{parent.inputs.integer}}", "shm_size": "${{parent.inputs.shm_size}}"}
assert rest_obj.properties.jobs["component"]["resources"] == expect_resource

def test_pipeline_job_serverless_compute_with_job_tier(self) -> None:
yaml_path = "./tests/test_configs/pipeline_jobs/serverless_compute/job_tier/pipeline_with_job_tier.yml"
pipeline_job = load_job(yaml_path)
rest_obj = pipeline_job._to_rest_object()
assert rest_obj.properties.jobs["spot_job_tier"]["queue_settings"] == {"job_tier": "Spot"}
assert rest_obj.properties.jobs["standard_job_tier"]["queue_settings"] == {"job_tier": "Standard"}

def test_pipeline_job_sweep_with_job_tier_in_pipeline(self) -> None:
yaml_path = "./tests/test_configs/pipeline_jobs/serverless_compute/job_tier/sweep_in_pipeline/pipeline.yml"
pipeline_job = load_job(yaml_path)
# for sweep job, its job_tier value will be lowercase due to its implementation,
# and service side shall accept both capital and lowercase, so it is expected for now.
rest_obj = pipeline_job._to_rest_object()
assert rest_obj.properties.jobs["node"]["queue_settings"] == {"job_tier": "standard"}

def test_pipeline_job_automl_with_job_tier_in_pipeline(self) -> None:
yaml_path = "./tests/test_configs/pipeline_jobs/serverless_compute/job_tier/automl_in_pipeline/pipeline.yml"
pipeline_job = load_job(yaml_path)
# similar to sweep job, automl job job_tier value is also lowercase.
rest_obj = pipeline_job._to_rest_object()
assert rest_obj.properties.jobs["text_ner_node"]["queue_settings"] == {"job_tier": "spot"}
Loading

0 comments on commit a7ec3bc

Please sign in to comment.