From 84c20fcef86d9777b6ce6a4745b6415ec288ed93 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Mon, 12 Sep 2022 11:33:56 -0700 Subject: [PATCH] Enhancement/enable serverless (#303) * Experiment with Dataproc Serverless * add serverless as another submission method * add changelog and run tests against target core branch * fix syntax * fix schema overwrite * use 0.21 version of connector Co-authored-by: Jeremy Cohen --- .../unreleased/Features-20220909-122924.yaml | 7 + dbt/adapters/bigquery/impl.py | 128 +++------------ dbt/adapters/bigquery/python_submissions.py | 152 ++++++++++++++++++ .../macros/materializations/table.sql | 2 +- dev-requirements.txt | 4 +- tests/conftest.py | 2 +- tests/functional/adapter/test_python_model.py | 2 +- 7 files changed, 187 insertions(+), 110 deletions(-) create mode 100644 .changes/unreleased/Features-20220909-122924.yaml create mode 100644 dbt/adapters/bigquery/python_submissions.py diff --git a/.changes/unreleased/Features-20220909-122924.yaml b/.changes/unreleased/Features-20220909-122924.yaml new file mode 100644 index 000000000..cde9bbb43 --- /dev/null +++ b/.changes/unreleased/Features-20220909-122924.yaml @@ -0,0 +1,7 @@ +kind: Features +body: Add support for Dataproc Serverless +time: 2022-09-09T12:29:24.993388-07:00 +custom: + Author: ChenyuLInx + Issue: "248" + PR: "303" diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index dde6f865d..602aafd19 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Dict, List, Optional, Any, Set, Union +from typing import Dict, List, Optional, Any, Set, Union, Type from dbt.dataclass_schema import dbtClassMixin, ValidationError import dbt.deprecations @@ -7,14 +7,24 @@ import dbt.clients.agate_helper from dbt import ui # type: ignore -from dbt.adapters.base import BaseAdapter, available, RelationType, SchemaSearchMap, AdapterConfig -from dbt.adapters.base.impl import log_code_execution +from dbt.adapters.base import ( + BaseAdapter, + available, + RelationType, + SchemaSearchMap, + AdapterConfig, + PythonJobHelper, +) from dbt.adapters.cache import _make_key from dbt.adapters.bigquery.relation import BigQueryRelation from dbt.adapters.bigquery import BigQueryColumn from dbt.adapters.bigquery import BigQueryConnectionManager +from dbt.adapters.bigquery.python_submissions import ( + ClusterDataprocHelper, + ServerlessDataProcHelper, +) from dbt.adapters.bigquery.connections import BigQueryAdapterResponse from dbt.contracts.graph.manifest import Manifest from dbt.events import AdapterLogger @@ -835,108 +845,16 @@ def run_sql_for_tests(self, sql, fetch, conn=None): else: return list(res) - @available.parse_none - @log_code_execution - def submit_python_job(self, parsed_model: dict, compiled_code: str): - # TODO improve the typing here. N.B. Jinja returns a `jinja2.runtime.Undefined` instead - # of `None` which evaluates to True! - - # TODO limit this function to run only when doing the materialization of python nodes - # TODO should we also to timeout here? - - # validate all additional stuff for python is set - schema = getattr(parsed_model, "schema", self.config.credentials.schema) - identifier = parsed_model["alias"] - python_required_configs = [ - "dataproc_region", - "dataproc_cluster_name", - "gcs_bucket", - ] - for required_config in python_required_configs: - if not getattr(self.connections.profile.credentials, required_config): - raise ValueError( - f"Need to supply {required_config} in profile to submit python job" - ) - if not hasattr(self, "dataproc_helper"): - self.dataproc_helper = DataProcHelper(self.connections.profile.credentials) - model_file_name = f"{schema}/{identifier}.py" - # upload python file to GCS - self.dataproc_helper.upload_to_gcs(model_file_name, compiled_code) - # submit dataproc job - self.dataproc_helper.submit_dataproc_job(model_file_name) - - # TODO proper result for this - message = "OK" - code = None - num_rows = None - bytes_processed = None - return BigQueryAdapterResponse( # type: ignore[call-arg] - _message=message, - rows_affected=num_rows, - code=code, - bytes_processed=bytes_processed, - ) - + def generate_python_submission_response(self, submission_result) -> BigQueryAdapterResponse: + return BigQueryAdapterResponse(_message="OK") # type: ignore[call-arg] -class DataProcHelper: - def __init__(self, credential): - """_summary_ + @property + def default_python_submission_method(self) -> str: + return "serverless" - Args: - credential (_type_): _description_ - """ - try: - # Library only needed for python models - from google.cloud import dataproc_v1 - from google.cloud import storage - except ImportError: - raise RuntimeError( - "You need to install [dataproc] extras to run python model in dbt-bigquery" - ) - self.credential = credential - self.GoogleCredentials = BigQueryConnectionManager.get_credentials(credential) - self.storage_client = storage.Client( - project=self.credential.database, credentials=self.GoogleCredentials - ) - self.job_client = dataproc_v1.JobControllerClient( - client_options={ - "api_endpoint": "{}-dataproc.googleapis.com:443".format( - self.credential.dataproc_region - ) - }, - credentials=self.GoogleCredentials, - ) - - def upload_to_gcs(self, filename: str, compiled_code: str): - bucket = self.storage_client.get_bucket(self.credential.gcs_bucket) - blob = bucket.blob(filename) - blob.upload_from_string(compiled_code) - - def submit_dataproc_job(self, filename: str): - # Create the job config. - job = { - "placement": {"cluster_name": self.credential.dataproc_cluster_name}, - "pyspark_job": { - "main_python_file_uri": "gs://{}/{}".format(self.credential.gcs_bucket, filename) - }, + @property + def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]: + return { + "cluster": ClusterDataprocHelper, + "serverless": ServerlessDataProcHelper, } - operation = self.job_client.submit_job_as_operation( - request={ - "project_id": self.credential.database, - "region": self.credential.dataproc_region, - "job": job, - } - ) - response = operation.result() - return response - - # TODO: there might be useful results here that we can parse and return - # Dataproc job output is saved to the Cloud Storage bucket - # allocated to the job. Use regex to obtain the bucket and blob info. - # matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) - # output = ( - # self.storage_client - # .get_bucket(matches.group(1)) - # .blob(f"{matches.group(2)}.000000000") - # .download_as_string() - # ) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py new file mode 100644 index 000000000..408984c2f --- /dev/null +++ b/dbt/adapters/bigquery/python_submissions.py @@ -0,0 +1,152 @@ +from typing import Dict, Union + +from dbt.adapters.base import PythonJobHelper +from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials +from google.api_core import retry +from google.api_core.client_options import ClientOptions + +try: + # library only needed for python models + from google.cloud import storage, dataproc_v1 # type: ignore +except ImportError: + _has_dataproc_lib = False +else: + _has_dataproc_lib = True + + +class BaseDataProcHelper(PythonJobHelper): + def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None: + """_summary_ + + Args: + credential (_type_): _description_ + """ + if not _has_dataproc_lib: + raise RuntimeError( + "You need to install [dataproc] extras to run python model in dbt-bigquery" + ) + # validate all additional stuff for python is set + schema = parsed_model["schema"] + identifier = parsed_model["alias"] + self.parsed_model = parsed_model + python_required_configs = [ + "dataproc_region", + "gcs_bucket", + ] + for required_config in python_required_configs: + if not getattr(credential, required_config): + raise ValueError( + f"Need to supply {required_config} in profile to submit python job" + ) + self.model_file_name = f"{schema}/{identifier}.py" + self.credential = credential + self.GoogleCredentials = BigQueryConnectionManager.get_credentials(credential) + self.storage_client = storage.Client( + project=self.credential.database, credentials=self.GoogleCredentials + ) + self.gcs_location = "gs://{}/{}".format(self.credential.gcs_bucket, self.model_file_name) + + # set retry policy, default to timeout after 24 hours + self.timeout = self.parsed_model["config"].get( + "timeout", self.credential.job_execution_timeout_seconds or 60 * 60 * 24 + ) + self.retry = retry.Retry(maximum=10.0, deadline=self.timeout) + self.client_options = ClientOptions( + api_endpoint="{}-dataproc.googleapis.com:443".format(self.credential.dataproc_region) + ) + self.job_client = self._get_job_client() + + def _upload_to_gcs(self, filename: str, compiled_code: str) -> None: + bucket = self.storage_client.get_bucket(self.credential.gcs_bucket) + blob = bucket.blob(filename) + blob.upload_from_string(compiled_code) + + def submit(self, compiled_code: str) -> dataproc_v1.types.jobs.Job: + # upload python file to GCS + self._upload_to_gcs(self.model_file_name, compiled_code) + # submit dataproc job + return self._submit_dataproc_job() + + def _get_job_client( + self, + ) -> Union[dataproc_v1.JobControllerClient, dataproc_v1.BatchControllerClient]: + raise NotImplementedError("_get_job_client not implemented") + + def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: + raise NotImplementedError("_submit_dataproc_job not implemented") + + +class ClusterDataprocHelper(BaseDataProcHelper): + def _get_job_client(self) -> dataproc_v1.JobControllerClient: + if not self._get_cluster_name(): + raise ValueError( + "Need to supply dataproc_cluster_name in profile or config to submit python job with cluster submission method" + ) + return dataproc_v1.JobControllerClient( # type: ignore + client_options=self.client_options, credentials=self.GoogleCredentials + ) + + def _get_cluster_name(self) -> str: + return self.parsed_model["config"].get( + "dataproc_cluster_name", self.credential.dataproc_cluster_name + ) + + def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: + job = { + "placement": {"cluster_name": self._get_cluster_name()}, + "pyspark_job": { + "main_python_file_uri": self.gcs_location, + }, + } + operation = self.job_client.submit_job_as_operation( # type: ignore + request={ + "project_id": self.credential.database, + "region": self.credential.dataproc_region, + "job": job, + } + ) + response = operation.result(retry=self.retry) + return response + + +class ServerlessDataProcHelper(BaseDataProcHelper): + def _get_job_client(self) -> dataproc_v1.BatchControllerClient: + return dataproc_v1.BatchControllerClient( + client_options=self.client_options, credentials=self.GoogleCredentials + ) + + def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: + # create the Dataproc Serverless job config + batch = dataproc_v1.Batch() + batch.pyspark_batch.main_python_file_uri = self.gcs_location + # how to keep this up to date? + # we should probably also open this up to be configurable + batch.pyspark_batch.jar_file_uris = [ + "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.21.1.jar" + ] + # should we make all of these spark/dataproc properties configurable? + # https://cloud.google.com/dataproc-serverless/docs/concepts/properties + # https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches#runtimeconfig + batch.runtime_config.properties = { + "spark.executor.instances": "2", + } + parent = f"projects/{self.credential.database}/locations/{self.credential.dataproc_region}" + request = dataproc_v1.CreateBatchRequest( + parent=parent, + batch=batch, + ) + # make the request + operation = self.job_client.create_batch(request=request) # type: ignore + # this takes quite a while, waiting on GCP response to resolve + response = operation.result(retry=self.retry) + return response + # there might be useful results here that we can parse and return + # Dataproc job output is saved to the Cloud Storage bucket + # allocated to the job. Use regex to obtain the bucket and blob info. + # matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) + # output = ( + # self.storage_client + # .get_bucket(matches.group(1)) + # .blob(f"{matches.group(2)}.000000000") + # .download_as_string() + # ) diff --git a/dbt/include/bigquery/macros/materializations/table.sql b/dbt/include/bigquery/macros/materializations/table.sql index 75751f331..5ca735aa1 100644 --- a/dbt/include/bigquery/macros/materializations/table.sql +++ b/dbt/include/bigquery/macros/materializations/table.sql @@ -67,6 +67,6 @@ df = model(dbt, spark) df.write \ .mode("overwrite") \ .format("bigquery") \ - .option("writeMethod", "direct") \ + .option("writeMethod", "direct").option("writeDisposition", 'WRITE_TRUNCATE') \ .save("{{target_relation}}") {% endmacro %} diff --git a/dev-requirements.txt b/dev-requirements.txt index 129dbbe64..23418bf3f 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,7 +1,7 @@ # install latest changes in dbt-core # TODO: how to automate switching from develop to version branches? -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter +git+https://github.com/dbt-labs/dbt-core.git@enhancement/python_submission_helper#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git@enhancement/python_submission_helper#egg=dbt-tests-adapter&subdirectory=tests/adapter black==22.8.0 bumpversion diff --git a/tests/conftest.py b/tests/conftest.py index 7b0c69fc3..e74fa424b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -44,6 +44,6 @@ def service_account_target(): 'keyfile_json': credentials, # following 3 for python model 'dataproc_region': os.getenv("DATAPROC_REGION"), - 'dataproc_cluster_name': os.getenv("DATAPROC_CLUSTER_NAME"), + 'dataproc_cluster_name': os.getenv("DATAPROC_CLUSTER_NAME"), # only needed for cluster submission method 'gcs_bucket': os.getenv("GCS_BUCKET") } diff --git a/tests/functional/adapter/test_python_model.py b/tests/functional/adapter/test_python_model.py index 03ad871e2..68bb90e68 100644 --- a/tests/functional/adapter/test_python_model.py +++ b/tests/functional/adapter/test_python_model.py @@ -3,7 +3,7 @@ from dbt.tests.util import run_dbt, write_file import dbt.tests.adapter.python_model.test_python_model as dbt_tests -@pytest.skip("cluster unstable", allow_module_level=True) + class TestPythonIncrementalMatsDataproc(dbt_tests.BasePythonIncrementalTests): pass