Skip to content

Commit

Permalink
Add 'Bucket.generate_signed_url' method.
Browse files Browse the repository at this point in the history
  • Loading branch information
tseaver committed Apr 1, 2019
1 parent 734a9b3 commit 68742be
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 2 deletions.
2 changes: 1 addition & 1 deletion storage/google/cloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def generate_signed_url(
two must be passed.
:type api_access_endpoint: str
:param api_access_endpoint: Optional URI base. Defaults to empty string.
:param api_access_endpoint: Optional URI base.
:type method: str
:param method: The HTTP verb that will be used when requesting the URL.
Expand Down
118 changes: 118 additions & 0 deletions storage/google/cloud/storage/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
from google.cloud.storage._helpers import _PropertyMixin
from google.cloud.storage._helpers import _scalar_property
from google.cloud.storage._helpers import _validate_name
from google.cloud.storage._signing import generate_signed_url_v2
from google.cloud.storage._signing import generate_signed_url_v4
from google.cloud.storage.acl import BucketACL
from google.cloud.storage.acl import DefaultObjectACL
from google.cloud.storage.blob import Blob
Expand All @@ -45,6 +47,7 @@
"valid before the bucket is created. Instead, pass the location "
"to `Bucket.create`."
)
_API_ACCESS_ENDPOINT = "https://storage.googleapis.com"


def _blobs_page_start(iterator, page, response):
Expand Down Expand Up @@ -1969,3 +1972,118 @@ def lock_retention_policy(self, client=None):
method="POST", path=path, query_params=query_params, _target_object=self
)
self._set_properties(api_response)

def generate_signed_url(
self,
expiration=None,
max_age=None,
api_access_endpoint=_API_ACCESS_ENDPOINT,
method="GET",
headers=None,
query_parameters=None,
client=None,
credentials=None,
version="v2",
):
"""Generates a signed URL for this bucket.
.. note::
If you are on Google Compute Engine, you can't generate a signed
URL using GCE service account. Follow `Issue 50`_ for updates on
this. If you'd like to be able to generate a signed URL from GCE,
you can use a standard service account from a JSON file rather
than a GCE service account.
.. _Issue 50: https://github.com/GoogleCloudPlatform/\
google-auth-library-python/issues/50
If you have a bucket that you want to allow access to for a set
amount of time, you can use this method to generate a URL that
is only valid within a certain time period.
This is particularly useful if you don't want publicly
accessible buckets, but don't want to require users to explicitly
log in.
:type expiration: Union[Integer, datetime.datetime, datetime.timedelta]
:param expiration: Point in time when the signed URL should expire.
Exclusive with :arg:`max_age`: exactly one of the
two must be passed.
:type max_age: Integer
:param max_age: Max number of seconds until the signature expires.
Exclusive with :arg:`expiration`: exactly one of the
two must be passed.
:type api_access_endpoint: str
:param api_access_endpoint: Optional URI base.
:type method: str
:param method: The HTTP verb that will be used when requesting the URL.
:type headers: dict
:param headers:
(Optional) Additional HTTP headers to be included as part of the
signed URLs. See:
https://cloud.google.com/storage/docs/xml-api/reference-headers
Requests using the signed URL *must* pass the specified header
(name and value) with each request for the URL.
:type query_parameters: dict
:param query_parameters:
(Optional) Additional query paramtersto be included as part of the
signed URLs. See:
https://cloud.google.com/storage/docs/xml-api/reference-headers#query
:type client: :class:`~google.cloud.storage.client.Client` or
``NoneType``
:param client: (Optional) The client to use. If not passed, falls back
to the ``client`` stored on the blob's bucket.
:type credentials: :class:`oauth2client.client.OAuth2Credentials` or
:class:`NoneType`
:param credentials: (Optional) The OAuth2 credentials to use to sign
the URL. Defaults to the credentials stored on the
client used.
:type version: str
:param version: (Optional) The version of signed credential to create.
Must be one of 'v2' | 'v4'.
:raises: :exc:`ValueError` when version is invalid.
:raises: :exc:`ValueError` when both :arg:`expiration` and
:arg:`max_age` are passed, or when neither is passed.
:raises: :exc:`TypeError` when expiration is not a valid type.
:raises: :exc:`AttributeError` if credentials is not an instance
of :class:`google.auth.credentials.Signing`.
:rtype: str
:returns: A signed URL you can use to access the resource
until expiration.
"""
if version not in ("v2", "v4"):
raise ValueError("'version' must be either 'v2' or 'v4'")

resource = "/{bucket_name}".format(bucket_name=self.name)

if credentials is None:
client = self._require_client(client)
credentials = client._credentials

if version == "v2":
helper = generate_signed_url_v2
else:
helper = generate_signed_url_v4

return helper(
credentials,
resource=resource,
expiration=expiration,
max_age=max_age,
api_access_endpoint=api_access_endpoint,
method=method.upper(),
headers=headers,
query_parameters=query_parameters,
)
171 changes: 170 additions & 1 deletion storage/tests/unit/test_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -2573,6 +2573,168 @@ def test_lock_retention_policy_w_user_project(self):
{"ifMetagenerationMatch": 1234, "userProject": user_project},
)

def test_generate_signed_url_w_invalid_version(self):
expiration = "2014-10-16T20:34:37.000Z"
connection = _Connection()
client = _Client(connection)
bucket = self._make_one(name="bucket_name", client=client)
with self.assertRaises(ValueError):
bucket.generate_signed_url(expiration, version="nonesuch")

def _generate_signed_url_helper(
self,
version,
bucket_name="bucket-name",
api_access_endpoint=None,
method="GET",
content_md5=None,
content_type=None,
response_type=None,
response_disposition=None,
generation=None,
headers=None,
query_parameters=None,
credentials=None,
expiration=None,
max_age=None,
):
from six.moves.urllib import parse
from google.cloud._helpers import UTC
from google.cloud.storage.blob import _API_ACCESS_ENDPOINT

api_access_endpoint = api_access_endpoint or _API_ACCESS_ENDPOINT

delta = datetime.timedelta(hours=1)

if expiration is None and max_age is None:
expiration = datetime.datetime.utcnow().replace(tzinfo=UTC) + delta

connection = _Connection()
client = _Client(connection)
bucket = self._make_one(name=bucket_name, client=client)
to_patch = "google.cloud.storage.bucket.generate_signed_url_{}".format(version)

with mock.patch(to_patch) as signer:
signed_uri = bucket.generate_signed_url(
expiration=expiration,
max_age=max_age,
api_access_endpoint=api_access_endpoint,
method=method,
credentials=credentials,
headers=headers,
query_parameters=query_parameters,
version=version,
)

self.assertEqual(signed_uri, signer.return_value)

if credentials is None:
expected_creds = client._credentials
else:
expected_creds = credentials

encoded_name = bucket_name.encode("utf-8")
expected_resource = "/{}".format(parse.quote(encoded_name))
expected_kwargs = {
"resource": expected_resource,
"expiration": expiration,
"max_age": max_age,
"api_access_endpoint": api_access_endpoint,
"method": method.upper(),
"headers": headers,
"query_parameters": query_parameters,
}
signer.assert_called_once_with(expected_creds, **expected_kwargs)

def _generate_signed_url_v2_helper(self, **kw):
version = "v2"
self._generate_signed_url_helper(version, **kw)

def test_generate_signed_url_v2_w_defaults(self):
self._generate_signed_url_v2_helper()

def test_generate_signed_url_v2_w_expiration(self):
from google.cloud._helpers import UTC

expiration = datetime.datetime.utcnow().replace(tzinfo=UTC)
self._generate_signed_url_v2_helper(expiration=expiration)

def test_generate_signed_url_v2_w_max_age(self):
self._generate_signed_url_v2_helper(max_age=3600)

def test_generate_signed_url_v2_w_endpoint(self):
self._generate_signed_url_v2_helper(
api_access_endpoint="https://api.example.com/v1"
)

def test_generate_signed_url_v2_w_method(self):
self._generate_signed_url_v2_helper(method="POST")

def test_generate_signed_url_v2_w_lowercase_method(self):
self._generate_signed_url_v2_helper(method="get")

def test_generate_signed_url_v2_w_content_md5(self):
self._generate_signed_url_v2_helper(content_md5="FACEDACE")

def test_generate_signed_url_v2_w_content_type(self):
self._generate_signed_url_v2_helper(content_type="text.html")

def test_generate_signed_url_v2_w_response_type(self):
self._generate_signed_url_v2_helper(response_type="text.html")

def test_generate_signed_url_v2_w_response_disposition(self):
self._generate_signed_url_v2_helper(response_disposition="inline")

def test_generate_signed_url_v2_w_generation(self):
self._generate_signed_url_v2_helper(generation=12345)

def test_generate_signed_url_v2_w_headers(self):
self._generate_signed_url_v2_helper(headers={"x-goog-foo": "bar"})

def test_generate_signed_url_v2_w_credentials(self):
credentials = object()
self._generate_signed_url_v2_helper(credentials=credentials)

def _generate_signed_url_v4_helper(self, **kw):
version = "v4"
self._generate_signed_url_helper(version, **kw)

def test_generate_signed_url_v4_w_defaults(self):
self._generate_signed_url_v2_helper()

def test_generate_signed_url_v4_w_endpoint(self):
self._generate_signed_url_v4_helper(
api_access_endpoint="https://api.example.com/v1"
)

def test_generate_signed_url_v4_w_method(self):
self._generate_signed_url_v4_helper(method="POST")

def test_generate_signed_url_v4_w_lowercase_method(self):
self._generate_signed_url_v4_helper(method="get")

def test_generate_signed_url_v4_w_content_md5(self):
self._generate_signed_url_v4_helper(content_md5="FACEDACE")

def test_generate_signed_url_v4_w_content_type(self):
self._generate_signed_url_v4_helper(content_type="text.html")

def test_generate_signed_url_v4_w_response_type(self):
self._generate_signed_url_v4_helper(response_type="text.html")

def test_generate_signed_url_v4_w_response_disposition(self):
self._generate_signed_url_v4_helper(response_disposition="inline")

def test_generate_signed_url_v4_w_generation(self):
self._generate_signed_url_v4_helper(generation=12345)

def test_generate_signed_url_v4_w_headers(self):
self._generate_signed_url_v4_helper(headers={"x-goog-foo": "bar"})

def test_generate_signed_url_v4_w_credentials(self):
credentials = object()
self._generate_signed_url_v4_helper(credentials=credentials)


class _Connection(object):
_delete_bucket = False
Expand Down Expand Up @@ -2612,6 +2774,13 @@ def api_request(self, **kw):

class _Client(object):
def __init__(self, connection, project=None):
self._connection = connection
self._base_connection = connection
self.project = project

@property
def _connection(self):
return self._base_connection

@property
def _credentials(self):
return self._base_connection.credentials

0 comments on commit 68742be

Please sign in to comment.