Skip to content
This repository has been archived by the owner on Jul 30, 2020. It is now read-only.

Commit

Permalink
[AIRFLOW-4731] Fix GCS hook with google-storage-client 1.16 (apache#5368
Browse files Browse the repository at this point in the history
)

google-storage-client 1.16 introduced a breaking change where the
signature of client.get_bucket changed from (bucket_name) to
(bucket_or_name). Calls with named arguments to this method now fail.
This commit makes all calls positional to work around this.
  • Loading branch information
Gordon Ball authored and ashb committed Jun 10, 2019
1 parent 3b8e991 commit 201e671
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 11 deletions.
4 changes: 4 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ assists users migrating to a new version.

The `elasticsearch_` prefix has been removed from all config items under the `[elasticsearch]` section. For example `elasticsearch_host` is now just `host`.

### Changes to the Google Cloud Storage Hook

Updating to `google-cloud-storage >= 1.16` changes the signature of the upstream `client.get_bucket()` method from `get_bucket(bucket_name: str)` to `get_bucket(bucket_or_name: Union[str, Bucket])`. This method is not directly exposed by the airflow hook, but any code accessing the connection directly (`GoogleCloudStorageHook().get_conn().get_bucket(...)` or similar) will need to be updated.

### Removal of Mesos Executor
The Mesos Executor is removed from the code base as it was not widely used and not maintained. [Mailing List Discussion on deleting it](https://lists.apache.org/list.html?dev@airflow.apache.org:lte=1M:mesos).

Expand Down
18 changes: 9 additions & 9 deletions airflow/contrib/hooks/gcs_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ def rewrite(self, source_bucket, source_object, destination_bucket,
raise ValueError('source_bucket and source_object cannot be empty.')

client = self.get_conn()
source_bucket = client.get_bucket(bucket_name=source_bucket)
source_bucket = client.get_bucket(source_bucket)
source_object = source_bucket.blob(blob_name=source_object)
destination_bucket = client.get_bucket(bucket_name=destination_bucket)
destination_bucket = client.get_bucket(destination_bucket)

token, bytes_rewritten, total_bytes = destination_bucket.blob(
blob_name=destination_object).rewrite(
Expand Down Expand Up @@ -196,7 +196,7 @@ def upload(self, bucket_name, object_name, filename,
filename = filename_gz

client = self.get_conn()
bucket = client.get_bucket(bucket_name=bucket_name)
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(blob_name=object_name)
blob.upload_from_filename(filename=filename,
content_type=mime_type)
Expand All @@ -216,7 +216,7 @@ def exists(self, bucket_name, object_name):
:type object_name: str
"""
client = self.get_conn()
bucket = client.get_bucket(bucket_name=bucket_name)
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(blob_name=object_name)
return blob.exists()

Expand Down Expand Up @@ -262,7 +262,7 @@ def delete(self, bucket_name, object_name):
:type object_name: str
"""
client = self.get_conn()
bucket = client.get_bucket(bucket_name=bucket_name)
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(blob_name=object_name)
blob.delete()

Expand All @@ -286,7 +286,7 @@ def list(self, bucket_name, versions=None, max_results=None, prefix=None, delimi
:return: a stream of object names matching the filtering criteria
"""
client = self.get_conn()
bucket = client.get_bucket(bucket_name=bucket_name)
bucket = client.get_bucket(bucket_name)

ids = []
pageToken = None
Expand Down Expand Up @@ -330,7 +330,7 @@ def get_size(self, bucket_name, object_name):
object_name,
bucket_name)
client = self.get_conn()
bucket = client.get_bucket(bucket_name=bucket_name)
bucket = client.get_bucket(bucket_name)
blob = bucket.get_blob(blob_name=object_name)
blob.reload()
blob_size = blob.size
Expand All @@ -350,7 +350,7 @@ def get_crc32c(self, bucket_name, object_name):
self.log.info('Retrieving the crc32c checksum of '
'object_name: %s in bucket_name: %s', object_name, bucket_name)
client = self.get_conn()
bucket = client.get_bucket(bucket_name=bucket_name)
bucket = client.get_bucket(bucket_name)
blob = bucket.get_blob(blob_name=object_name)
blob.reload()
blob_crc32c = blob.crc32c
Expand All @@ -370,7 +370,7 @@ def get_md5hash(self, bucket_name, object_name):
self.log.info('Retrieving the MD5 hash of '
'object: %s in bucket: %s', object_name, bucket_name)
client = self.get_conn()
bucket = client.get_bucket(bucket_name=bucket_name)
bucket = client.get_bucket(bucket_name)
blob = bucket.get_blob(blob_name=object_name)
blob.reload()
blob_md5hash = blob.md5_hash
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def write_version(filename: str = os.path.join(*["airflow", "git_version"])):
'google-cloud-container>=0.1.1',
'google-cloud-language>=1.1.1',
'google-cloud-spanner>=1.7.1',
'google-cloud-storage~=1.14',
'google-cloud-storage~=1.16',
'google-cloud-translate>=1.3.3',
'google-cloud-videointelligence>=1.7.0',
'google-cloud-vision>=0.35.2',
Expand Down
2 changes: 1 addition & 1 deletion tests/contrib/hooks/test_gcs_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_exists(self, mock_service):

# Then
self.assertTrue(response)
get_bucket_mock.assert_called_once_with(bucket_name=test_bucket)
get_bucket_mock.assert_called_once_with(test_bucket)
blob_object.assert_called_once_with(blob_name=test_object)
exists_method.assert_called_once_with()

Expand Down

0 comments on commit 201e671

Please sign in to comment.