-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
_signing.py
233 lines (183 loc) · 8.13 KB
/
_signing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import datetime
import six
import google.auth.credentials
from google.cloud import _helpers
NOW = datetime.datetime.utcnow # To be replaced by tests.
def ensure_signed_credentials(credentials):
"""Raise AttributeError if the credentials are unsigned.
:type credentials: :class:`google.auth.credentials.Signing`
:param credentials: The credentials used to create a private key
for signing text.
:raises: :exc:`AttributeError` if credentials is not an instance
of :class:`google.auth.credentials.Signing`.
"""
if not isinstance(credentials, google.auth.credentials.Signing):
auth_uri = (
"https://google-cloud-python.readthedocs.io/en/latest/"
"core/auth.html?highlight=authentication#setting-up-"
"a-service-account"
)
raise AttributeError(
"you need a private key to sign credentials."
"the credentials you are currently using %s "
"just contains a token. see %s for more "
"details." % (type(credentials), auth_uri)
)
def get_signed_query_params(credentials, expiration, string_to_sign):
"""Gets query parameters for creating a signed URL.
:type credentials: :class:`google.auth.credentials.Signing`
:param credentials: The credentials used to create a private key
for signing text.
:type expiration: int or long
:param expiration: When the signed URL should expire.
:type string_to_sign: str
:param string_to_sign: The string to be signed by the credentials.
:raises: :exc:`AttributeError` if credentials is not an instance
of :class:`google.auth.credentials.Signing`.
:rtype: dict
:returns: Query parameters matching the signing credentials with a
signed payload.
"""
ensure_signed_credentials(credentials)
signature_bytes = credentials.sign_bytes(string_to_sign)
signature = base64.b64encode(signature_bytes)
service_account_name = credentials.signer_email
return {
"GoogleAccessId": service_account_name,
"Expires": str(expiration),
"Signature": signature,
}
def get_expiration_seconds(expiration):
"""Convert 'expiration' to a number of seconds in the future.
:type expiration: int, long, datetime.datetime, datetime.timedelta
:param expiration: When the signed URL should expire.
:raises: :exc:`TypeError` when expiration is not a valid type.
:rtype: int
:returns: a timestamp as an absolute number of seconds.
"""
# If it's a timedelta, add it to `now` in UTC.
if isinstance(expiration, datetime.timedelta):
now = NOW().replace(tzinfo=_helpers.UTC)
expiration = now + expiration
# If it's a datetime, convert to a timestamp.
if isinstance(expiration, datetime.datetime):
micros = _helpers._microseconds_from_datetime(expiration)
expiration = micros // 10 ** 6
if not isinstance(expiration, six.integer_types):
raise TypeError(
"Expected an integer timestamp, datetime, or "
"timedelta. Got %s" % type(expiration)
)
return expiration
def generate_signed_url(
credentials,
resource,
expiration,
api_access_endpoint="",
method="GET",
content_md5=None,
content_type=None,
response_type=None,
response_disposition=None,
generation=None,
):
"""Generate signed URL to provide query-string auth'n to a resource.
.. note::
Assumes ``credentials`` implements the
:class:`google.auth.credentials.Signing` interface. Also assumes
``credentials`` has a ``service_account_email`` property which
identifies the credentials.
.. note::
If you are on Google Compute Engine, you can't generate a signed URL.
Follow `Issue 922`_ for updates on this. If you'd like to be able to
generate a signed URL from GCE, you can use a standard service account
from a JSON file rather than a GCE service account.
See headers `reference`_ for more details on optional arguments.
.. _Issue 922: https://github.com/GoogleCloudPlatform/\
google-cloud-python/issues/922
.. _reference: https://cloud.google.com/storage/docs/reference-headers
:type credentials: :class:`google.auth.credentials.Signing`
:param credentials: Credentials object with an associated private key to
sign text.
:type resource: str
:param resource: A pointer to a specific resource
(typically, ``/bucket-name/path/to/blob.txt``).
:type expiration: :class:`int`, :class:`long`, :class:`datetime.datetime`,
:class:`datetime.timedelta`
:param expiration: When the signed URL should expire.
:type api_access_endpoint: str
:param api_access_endpoint: Optional URI base. Defaults to empty string.
:type method: str
:param method: The HTTP verb that will be used when requesting the URL.
Defaults to ``'GET'``. If method is ``'RESUMABLE'`` then the
signature will additionally contain the `x-goog-resumable`
header, and the method changed to POST. See the signed URL
docs regarding this flow:
https://cloud.google.com/storage/docs/access-control/signed-urls
:type content_md5: str
:param content_md5: (Optional) The MD5 hash of the object referenced by
``resource``.
:type content_type: str
:param content_type: (Optional) The content type of the object referenced
by ``resource``.
:type response_type: str
:param response_type: (Optional) Content type of responses to requests for
the signed URL. Used to over-ride the content type of
the underlying resource.
:type response_disposition: str
:param response_disposition: (Optional) Content disposition of responses to
requests for the signed URL.
:type generation: str
:param generation: (Optional) A value that indicates which generation of
the resource to fetch.
:raises: :exc:`TypeError` when expiration is not a valid type.
:raises: :exc:`AttributeError` if credentials is not an instance
of :class:`google.auth.credentials.Signing`.
:rtype: str
:returns: A signed URL you can use to access the resource
until expiration.
"""
expiration = get_expiration_seconds(expiration)
if method == "RESUMABLE":
method = "POST"
canonicalized_resource = "x-goog-resumable:start\n{0}".format(resource)
else:
canonicalized_resource = "{0}".format(resource)
# Generate the string to sign.
string_to_sign = "\n".join(
[
method,
content_md5 or "",
content_type or "",
str(expiration),
canonicalized_resource,
]
)
# Set the right query parameters.
query_params = get_signed_query_params(credentials, expiration, string_to_sign)
if response_type is not None:
query_params["response-content-type"] = response_type
if response_disposition is not None:
query_params["response-content-disposition"] = response_disposition
if generation is not None:
query_params["generation"] = generation
# Return the built URL.
return "{endpoint}{resource}?{querystring}".format(
endpoint=api_access_endpoint,
resource=resource,
querystring=six.moves.urllib.parse.urlencode(query_params),
)