Skip to content

Commit

Permalink
Enhancement/enable serverless (#303)
Browse files Browse the repository at this point in the history
* Experiment with Dataproc Serverless

* add serverless as another submission method

* add changelog and run tests against target core branch

* fix syntax

* fix schema overwrite

* use 0.21 version of connector

Co-authored-by: Jeremy Cohen <jeremy@dbtlabs.com>
  • Loading branch information
ChenyuLInx and jtcohen6 committed Sep 12, 2022
1 parent 53f8b90 commit 84c20fc
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 110 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20220909-122924.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Add support for Dataproc Serverless
time: 2022-09-09T12:29:24.993388-07:00
custom:
Author: ChenyuLInx
Issue: "248"
PR: "303"
128 changes: 23 additions & 105 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Set, Union
from typing import Dict, List, Optional, Any, Set, Union, Type
from dbt.dataclass_schema import dbtClassMixin, ValidationError

import dbt.deprecations
import dbt.exceptions
import dbt.clients.agate_helper

from dbt import ui # type: ignore
from dbt.adapters.base import BaseAdapter, available, RelationType, SchemaSearchMap, AdapterConfig
from dbt.adapters.base.impl import log_code_execution
from dbt.adapters.base import (
BaseAdapter,
available,
RelationType,
SchemaSearchMap,
AdapterConfig,
PythonJobHelper,
)

from dbt.adapters.cache import _make_key

from dbt.adapters.bigquery.relation import BigQueryRelation
from dbt.adapters.bigquery import BigQueryColumn
from dbt.adapters.bigquery import BigQueryConnectionManager
from dbt.adapters.bigquery.python_submissions import (
ClusterDataprocHelper,
ServerlessDataProcHelper,
)
from dbt.adapters.bigquery.connections import BigQueryAdapterResponse
from dbt.contracts.graph.manifest import Manifest
from dbt.events import AdapterLogger
Expand Down Expand Up @@ -835,108 +845,16 @@ def run_sql_for_tests(self, sql, fetch, conn=None):
else:
return list(res)

@available.parse_none
@log_code_execution
def submit_python_job(self, parsed_model: dict, compiled_code: str):
# TODO improve the typing here. N.B. Jinja returns a `jinja2.runtime.Undefined` instead
# of `None` which evaluates to True!

# TODO limit this function to run only when doing the materialization of python nodes
# TODO should we also to timeout here?

# validate all additional stuff for python is set
schema = getattr(parsed_model, "schema", self.config.credentials.schema)
identifier = parsed_model["alias"]
python_required_configs = [
"dataproc_region",
"dataproc_cluster_name",
"gcs_bucket",
]
for required_config in python_required_configs:
if not getattr(self.connections.profile.credentials, required_config):
raise ValueError(
f"Need to supply {required_config} in profile to submit python job"
)
if not hasattr(self, "dataproc_helper"):
self.dataproc_helper = DataProcHelper(self.connections.profile.credentials)
model_file_name = f"{schema}/{identifier}.py"
# upload python file to GCS
self.dataproc_helper.upload_to_gcs(model_file_name, compiled_code)
# submit dataproc job
self.dataproc_helper.submit_dataproc_job(model_file_name)

# TODO proper result for this
message = "OK"
code = None
num_rows = None
bytes_processed = None
return BigQueryAdapterResponse( # type: ignore[call-arg]
_message=message,
rows_affected=num_rows,
code=code,
bytes_processed=bytes_processed,
)

def generate_python_submission_response(self, submission_result) -> BigQueryAdapterResponse:
return BigQueryAdapterResponse(_message="OK") # type: ignore[call-arg]

class DataProcHelper:
def __init__(self, credential):
"""_summary_
@property
def default_python_submission_method(self) -> str:
return "serverless"

Args:
credential (_type_): _description_
"""
try:
# Library only needed for python models
from google.cloud import dataproc_v1
from google.cloud import storage
except ImportError:
raise RuntimeError(
"You need to install [dataproc] extras to run python model in dbt-bigquery"
)
self.credential = credential
self.GoogleCredentials = BigQueryConnectionManager.get_credentials(credential)
self.storage_client = storage.Client(
project=self.credential.database, credentials=self.GoogleCredentials
)
self.job_client = dataproc_v1.JobControllerClient(
client_options={
"api_endpoint": "{}-dataproc.googleapis.com:443".format(
self.credential.dataproc_region
)
},
credentials=self.GoogleCredentials,
)

def upload_to_gcs(self, filename: str, compiled_code: str):
bucket = self.storage_client.get_bucket(self.credential.gcs_bucket)
blob = bucket.blob(filename)
blob.upload_from_string(compiled_code)

def submit_dataproc_job(self, filename: str):
# Create the job config.
job = {
"placement": {"cluster_name": self.credential.dataproc_cluster_name},
"pyspark_job": {
"main_python_file_uri": "gs://{}/{}".format(self.credential.gcs_bucket, filename)
},
@property
def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
return {
"cluster": ClusterDataprocHelper,
"serverless": ServerlessDataProcHelper,
}
operation = self.job_client.submit_job_as_operation(
request={
"project_id": self.credential.database,
"region": self.credential.dataproc_region,
"job": job,
}
)
response = operation.result()
return response

# TODO: there might be useful results here that we can parse and return
# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
# matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
# output = (
# self.storage_client
# .get_bucket(matches.group(1))
# .blob(f"{matches.group(2)}.000000000")
# .download_as_string()
# )
152 changes: 152 additions & 0 deletions dbt/adapters/bigquery/python_submissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from typing import Dict, Union

from dbt.adapters.base import PythonJobHelper
from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials
from google.api_core import retry
from google.api_core.client_options import ClientOptions

try:
# library only needed for python models
from google.cloud import storage, dataproc_v1 # type: ignore
except ImportError:
_has_dataproc_lib = False
else:
_has_dataproc_lib = True


class BaseDataProcHelper(PythonJobHelper):
def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None:
"""_summary_
Args:
credential (_type_): _description_
"""
if not _has_dataproc_lib:
raise RuntimeError(
"You need to install [dataproc] extras to run python model in dbt-bigquery"
)
# validate all additional stuff for python is set
schema = parsed_model["schema"]
identifier = parsed_model["alias"]
self.parsed_model = parsed_model
python_required_configs = [
"dataproc_region",
"gcs_bucket",
]
for required_config in python_required_configs:
if not getattr(credential, required_config):
raise ValueError(
f"Need to supply {required_config} in profile to submit python job"
)
self.model_file_name = f"{schema}/{identifier}.py"
self.credential = credential
self.GoogleCredentials = BigQueryConnectionManager.get_credentials(credential)
self.storage_client = storage.Client(
project=self.credential.database, credentials=self.GoogleCredentials
)
self.gcs_location = "gs://{}/{}".format(self.credential.gcs_bucket, self.model_file_name)

# set retry policy, default to timeout after 24 hours
self.timeout = self.parsed_model["config"].get(
"timeout", self.credential.job_execution_timeout_seconds or 60 * 60 * 24
)
self.retry = retry.Retry(maximum=10.0, deadline=self.timeout)
self.client_options = ClientOptions(
api_endpoint="{}-dataproc.googleapis.com:443".format(self.credential.dataproc_region)
)
self.job_client = self._get_job_client()

def _upload_to_gcs(self, filename: str, compiled_code: str) -> None:
bucket = self.storage_client.get_bucket(self.credential.gcs_bucket)
blob = bucket.blob(filename)
blob.upload_from_string(compiled_code)

def submit(self, compiled_code: str) -> dataproc_v1.types.jobs.Job:
# upload python file to GCS
self._upload_to_gcs(self.model_file_name, compiled_code)
# submit dataproc job
return self._submit_dataproc_job()

def _get_job_client(
self,
) -> Union[dataproc_v1.JobControllerClient, dataproc_v1.BatchControllerClient]:
raise NotImplementedError("_get_job_client not implemented")

def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
raise NotImplementedError("_submit_dataproc_job not implemented")


class ClusterDataprocHelper(BaseDataProcHelper):
def _get_job_client(self) -> dataproc_v1.JobControllerClient:
if not self._get_cluster_name():
raise ValueError(
"Need to supply dataproc_cluster_name in profile or config to submit python job with cluster submission method"
)
return dataproc_v1.JobControllerClient( # type: ignore
client_options=self.client_options, credentials=self.GoogleCredentials
)

def _get_cluster_name(self) -> str:
return self.parsed_model["config"].get(
"dataproc_cluster_name", self.credential.dataproc_cluster_name
)

def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
job = {
"placement": {"cluster_name": self._get_cluster_name()},
"pyspark_job": {
"main_python_file_uri": self.gcs_location,
},
}
operation = self.job_client.submit_job_as_operation( # type: ignore
request={
"project_id": self.credential.database,
"region": self.credential.dataproc_region,
"job": job,
}
)
response = operation.result(retry=self.retry)
return response


class ServerlessDataProcHelper(BaseDataProcHelper):
def _get_job_client(self) -> dataproc_v1.BatchControllerClient:
return dataproc_v1.BatchControllerClient(
client_options=self.client_options, credentials=self.GoogleCredentials
)

def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
# create the Dataproc Serverless job config
batch = dataproc_v1.Batch()
batch.pyspark_batch.main_python_file_uri = self.gcs_location
# how to keep this up to date?
# we should probably also open this up to be configurable
batch.pyspark_batch.jar_file_uris = [
"gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.21.1.jar"
]
# should we make all of these spark/dataproc properties configurable?
# https://cloud.google.com/dataproc-serverless/docs/concepts/properties
# https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches#runtimeconfig
batch.runtime_config.properties = {
"spark.executor.instances": "2",
}
parent = f"projects/{self.credential.database}/locations/{self.credential.dataproc_region}"
request = dataproc_v1.CreateBatchRequest(
parent=parent,
batch=batch,
)
# make the request
operation = self.job_client.create_batch(request=request) # type: ignore
# this takes quite a while, waiting on GCP response to resolve
response = operation.result(retry=self.retry)
return response
# there might be useful results here that we can parse and return
# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
# matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
# output = (
# self.storage_client
# .get_bucket(matches.group(1))
# .blob(f"{matches.group(2)}.000000000")
# .download_as_string()
# )
2 changes: 1 addition & 1 deletion dbt/include/bigquery/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ df = model(dbt, spark)
df.write \
.mode("overwrite") \
.format("bigquery") \
.option("writeMethod", "direct") \
.option("writeMethod", "direct").option("writeDisposition", 'WRITE_TRUNCATE') \
.save("{{target_relation}}")
{% endmacro %}
4 changes: 2 additions & 2 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# install latest changes in dbt-core
# TODO: how to automate switching from develop to version branches?
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter
git+https://github.com/dbt-labs/dbt-core.git@enhancement/python_submission_helper#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-core.git@enhancement/python_submission_helper#egg=dbt-tests-adapter&subdirectory=tests/adapter

black==22.8.0
bumpversion
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ def service_account_target():
'keyfile_json': credentials,
# following 3 for python model
'dataproc_region': os.getenv("DATAPROC_REGION"),
'dataproc_cluster_name': os.getenv("DATAPROC_CLUSTER_NAME"),
'dataproc_cluster_name': os.getenv("DATAPROC_CLUSTER_NAME"), # only needed for cluster submission method
'gcs_bucket': os.getenv("GCS_BUCKET")
}
2 changes: 1 addition & 1 deletion tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dbt.tests.util import run_dbt, write_file
import dbt.tests.adapter.python_model.test_python_model as dbt_tests

@pytest.skip("cluster unstable", allow_module_level=True)

class TestPythonIncrementalMatsDataproc(dbt_tests.BasePythonIncrementalTests):
pass

Expand Down

0 comments on commit 84c20fc

Please sign in to comment.