diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 38f327895..fec5965cf 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -22,6 +22,8 @@ import warnings import pickle import copyreg +import struct +import base64 import functools from google.api_core import exceptions @@ -32,9 +34,11 @@ from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry from google.cloud.storage.retry import DEFAULT_RETRY +import google_crc32c + from google.resumable_media.requests.upload import XMLMPUContainer from google.resumable_media.requests.upload import XMLMPUPart - +from google.resumable_media.common import DataCorruption warnings.warn( "The module `transfer_manager` is a preview feature. Functionality and API " @@ -44,6 +48,7 @@ TM_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024 DEFAULT_MAX_WORKERS = 8 +MAX_CRC32C_ZERO_ARRAY_SIZE = 4 * 1024 * 1024 METADATA_HEADER_TRANSLATION = { "cacheControl": "Cache-Control", "contentDisposition": "Content-Disposition", @@ -57,6 +62,20 @@ PROCESS = "process" THREAD = "thread" +DOWNLOAD_CRC32C_MISMATCH_TEMPLATE = """\ +Checksum mismatch while downloading: + + {} + +The object metadata indicated a crc32c checksum of: + + {} + +but the actual crc32c checksum of the downloaded contents was: + + {} +""" + _cached_clients = {} @@ -732,6 +751,8 @@ def download_chunks_concurrently( deadline=None, worker_type=PROCESS, max_workers=DEFAULT_MAX_WORKERS, + *, + crc32c_checksum=True, ): """Download a single file in chunks, concurrently. @@ -744,9 +765,6 @@ def download_chunks_concurrently( performance under normal circumstances due to Python interpreter threading behavior. The default is therefore to use processes instead of threads. - Checksumming (md5 or crc32c) is not supported for chunked operations. Any - `checksum` parameter passed in to download_kwargs will be ignored. - :param bucket: The bucket which contains the blobs to be downloaded @@ -768,10 +786,13 @@ def download_chunks_concurrently( :param download_kwargs: A dictionary of keyword arguments to pass to the download method. Refer to the documentation for blob.download_to_file() or - blob.download_to_filename() for more information. The dict is directly passed into the download methods and is not validated by this function. + blob.download_to_filename() for more information. The dict is directly + passed into the download methods and is not validated by this function. Keyword arguments "start" and "end" which are not supported and will - cause a ValueError if present. + cause a ValueError if present. The key "checksum" is also not supported + in download_kwargs, but see the argument "crc32c_checksum" (which does + not go in download_kwargs) below. :type deadline: int :param deadline: @@ -811,8 +832,22 @@ def download_chunks_concurrently( and the default is a conservative number that should work okay in most cases without consuming excessive resources. - :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. + :type crc32c_checksum: bool + :param crc32c_checksum: + Whether to compute a checksum for the resulting object, using the crc32c + algorithm. As the checksums for each chunk must be combined using a + feature of crc32c that is not available for md5, md5 is not supported. + + :raises: + :exc:`concurrent.futures.TimeoutError` + if deadline is exceeded. + :exc:`google.resumable_media.common.DataCorruption` if the download's + checksum doesn't agree with server-computed checksum. The + `google.resumable_media` exception is used here for consistency + with other download methods despite the exception originating + elsewhere. """ + client = blob.client if download_kwargs is None: download_kwargs = {} @@ -820,6 +855,10 @@ def download_chunks_concurrently( raise ValueError( "Download arguments 'start' and 'end' are not supported by download_chunks_concurrently." ) + if "checksum" in download_kwargs: + raise ValueError( + "'checksum' is in download_kwargs, but is not supported because sliced downloads have a different checksum mechanism from regular downloads. Use the 'crc32c_checksum' argument on download_chunks_concurrently instead." + ) download_kwargs["command"] = "tm.download_sharded" @@ -851,6 +890,7 @@ def download_chunks_concurrently( start=start, end=cursor - 1, download_kwargs=download_kwargs, + crc32c_checksum=crc32c_checksum, ) ) @@ -858,9 +898,34 @@ def download_chunks_concurrently( futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED ) - # Raise any exceptions. Successful results can be ignored. + # Raise any exceptions; combine checksums. + results = [] for future in futures: - future.result() + results.append(future.result()) + + if crc32c_checksum and results: + crc_digest = _digest_ordered_checksum_and_size_pairs(results) + actual_checksum = base64.b64encode(crc_digest).decode("utf-8") + expected_checksum = blob.crc32c + if actual_checksum != expected_checksum: + # For consistency with other download methods we will use + # "google.resumable_media.common.DataCorruption" despite the error + # not originating inside google.resumable_media. + download_url = blob._get_download_url( + client, + if_generation_match=download_kwargs.get("if_generation_match"), + if_generation_not_match=download_kwargs.get("if_generation_not_match"), + if_metageneration_match=download_kwargs.get("if_metageneration_match"), + if_metageneration_not_match=download_kwargs.get( + "if_metageneration_not_match" + ), + ) + raise DataCorruption( + None, + DOWNLOAD_CRC32C_MISMATCH_TEMPLATE.format( + download_url, expected_checksum, actual_checksum + ), + ) return None @@ -1118,23 +1183,58 @@ def _headers_from_metadata(metadata): def _download_and_write_chunk_in_place( - maybe_pickled_blob, filename, start, end, download_kwargs + maybe_pickled_blob, filename, start, end, download_kwargs, crc32c_checksum ): """Helper function that runs inside a thread or subprocess. `maybe_pickled_blob` is either a Blob (for threads) or a specially pickled Blob (for processes) because the default pickling mangles Client objects - which are attached to Blobs.""" + which are attached to Blobs. + + Returns a crc if configured (or None) and the size written. + """ if isinstance(maybe_pickled_blob, Blob): blob = maybe_pickled_blob else: blob = pickle.loads(maybe_pickled_blob) - with open( - filename, "rb+" - ) as f: # Open in mixed read/write mode to avoid truncating or appending - f.seek(start) - return blob._prep_and_do_download(f, start=start, end=end, **download_kwargs) + + with _ChecksummingSparseFileWrapper(filename, start, crc32c_checksum) as f: + blob._prep_and_do_download(f, start=start, end=end, **download_kwargs) + return (f.crc, (end - start) + 1) + + +class _ChecksummingSparseFileWrapper: + """A file wrapper that writes to a sparse file and optionally checksums. + + This wrapper only implements write() and does not inherit from `io` module + base classes. + """ + + def __init__(self, filename, start_position, crc32c_enabled): + # Open in mixed read/write mode to avoid truncating or appending + self.f = open(filename, "rb+") + self.f.seek(start_position) + self._crc = None + self._crc32c_enabled = crc32c_enabled + + def write(self, chunk): + if self._crc32c_enabled: + if self._crc is None: + self._crc = google_crc32c.value(chunk) + else: + self._crc = google_crc32c.extend(self._crc, chunk) + self.f.write(chunk) + + @property + def crc(self): + return self._crc + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + self.f.close() def _call_method_on_maybe_pickled_blob( @@ -1208,6 +1308,32 @@ def _get_pool_class_and_requirements(worker_type): ) +def _digest_ordered_checksum_and_size_pairs(checksum_and_size_pairs): + base_crc = None + zeroes = bytes(MAX_CRC32C_ZERO_ARRAY_SIZE) + for part_crc, size in checksum_and_size_pairs: + if not base_crc: + base_crc = part_crc + else: + base_crc ^= 0xFFFFFFFF # precondition + + # Zero pad base_crc32c. To conserve memory, do so with only + # MAX_CRC32C_ZERO_ARRAY_SIZE at a time. Reuse the zeroes array where + # possible. + padded = 0 + while padded < size: + desired_zeroes_size = min((size - padded), MAX_CRC32C_ZERO_ARRAY_SIZE) + base_crc = google_crc32c.extend(base_crc, zeroes[:desired_zeroes_size]) + padded += desired_zeroes_size + + base_crc ^= 0xFFFFFFFF # postcondition + base_crc ^= part_crc + crc_digest = struct.pack( + ">L", base_crc + ) # https://cloud.google.com/storage/docs/json_api/v1/objects#crc32c + return crc_digest + + class _LazyClient: """An object that will transform into either a cached or a new Client""" diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index 8014411e8..7a5f8c960 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -213,6 +213,7 @@ def test_list_blobs_with_prefix(test_blob, capsys): def test_upload_blob(test_bucket): with tempfile.NamedTemporaryFile() as source_file: source_file.write(b"test") + source_file.flush() storage_upload_file.upload_blob( test_bucket.name, source_file.name, "test_upload_blob" @@ -243,6 +244,7 @@ def test_upload_blob_with_kms(test_bucket): blob_name = f"test_upload_with_kms_{uuid.uuid4().hex}" with tempfile.NamedTemporaryFile() as source_file: source_file.write(b"test") + source_file.flush() storage_upload_with_kms_key.upload_blob_with_kms( test_bucket.name, source_file.name, @@ -779,6 +781,7 @@ def test_transfer_manager_download_chunks_concurrently(test_bucket, capsys): with tempfile.NamedTemporaryFile() as file: file.write(b"test") + file.flush() storage_upload_file.upload_blob(test_bucket.name, file.name, BLOB_NAME) diff --git a/setup.py b/setup.py index 11ee0a190..88d2f581b 100644 --- a/setup.py +++ b/setup.py @@ -33,6 +33,7 @@ "google-cloud-core >= 2.3.0, < 3.0dev", "google-resumable-media >= 2.6.0", "requests >= 2.18.0, < 3.0.0dev", + "google-crc32c >= 1.0, < 2.0dev", ] extras = {"protobuf": ["protobuf<5.0.0dev"]} diff --git a/tests/system/test_transfer_manager.py b/tests/system/test_transfer_manager.py index fc7bc2d51..b8f209b63 100644 --- a/tests/system/test_transfer_manager.py +++ b/tests/system/test_transfer_manager.py @@ -172,8 +172,19 @@ def test_download_chunks_concurrently(shared_bucket, file_data): with open(trailing_chunk_filename, "rb") as file_obj: assert _base64_md5hash(file_obj) == source_file["hash"] + # And for a case where there is only one chunk. + trailing_chunk_filename = os.path.join(tempdir, "chunky_file_3") + transfer_manager.download_chunks_concurrently( + download_blob, + trailing_chunk_filename, + chunk_size=size, + deadline=DEADLINE, + ) + with open(trailing_chunk_filename, "rb") as file_obj: + assert _base64_md5hash(file_obj) == source_file["hash"] + # Also test threaded mode. - threaded_filename = os.path.join(tempdir, "chunky_file_3") + threaded_filename = os.path.join(tempdir, "chunky_file_4") transfer_manager.download_chunks_concurrently( download_blob, threaded_filename, diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index 9c371d2ca..503b8fd2e 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -22,6 +22,8 @@ from google.api_core import exceptions +from google.resumable_media.common import DataCorruption + import os import tempfile import mock @@ -546,8 +548,6 @@ def test_download_chunks_concurrently(): expected_download_kwargs = EXPECTED_DOWNLOAD_KWARGS.copy() expected_download_kwargs["command"] = "tm.download_sharded" - blob_mock._handle_filename_and_download.return_value = FAKE_RESULT - with mock.patch("google.cloud.storage.transfer_manager.open", mock.mock_open()): result = transfer_manager.download_chunks_concurrently( blob_mock, @@ -555,6 +555,7 @@ def test_download_chunks_concurrently(): chunk_size=CHUNK_SIZE, download_kwargs=DOWNLOAD_KWARGS, worker_type=transfer_manager.THREAD, + crc32c_checksum=False, ) for x in range(MULTIPLE): blob_mock._prep_and_do_download.assert_any_call( @@ -567,7 +568,64 @@ def test_download_chunks_concurrently(): assert result is None -def test_download_chunks_concurrently_raises_on_start_and_end(): +def test_download_chunks_concurrently_with_crc32c(): + blob_mock = mock.Mock(spec=Blob) + FILENAME = "file_a.txt" + MULTIPLE = 4 + BLOB_CHUNK = b"abcdefgh" + BLOB_CONTENTS = BLOB_CHUNK * MULTIPLE + blob_mock.size = len(BLOB_CONTENTS) + blob_mock.crc32c = "eOVVVw==" + + expected_download_kwargs = EXPECTED_DOWNLOAD_KWARGS.copy() + expected_download_kwargs["command"] = "tm.download_sharded" + + def write_to_file(f, *args, **kwargs): + f.write(BLOB_CHUNK) + + blob_mock._prep_and_do_download.side_effect = write_to_file + + with mock.patch("google.cloud.storage.transfer_manager.open", mock.mock_open()): + transfer_manager.download_chunks_concurrently( + blob_mock, + FILENAME, + chunk_size=CHUNK_SIZE, + download_kwargs=DOWNLOAD_KWARGS, + worker_type=transfer_manager.THREAD, + crc32c_checksum=True, + ) + + +def test_download_chunks_concurrently_with_crc32c_failure(): + blob_mock = mock.Mock(spec=Blob) + FILENAME = "file_a.txt" + MULTIPLE = 4 + BLOB_CHUNK = b"abcdefgh" + BLOB_CONTENTS = BLOB_CHUNK * MULTIPLE + blob_mock.size = len(BLOB_CONTENTS) + blob_mock.crc32c = "invalid" + + expected_download_kwargs = EXPECTED_DOWNLOAD_KWARGS.copy() + expected_download_kwargs["command"] = "tm.download_sharded" + + def write_to_file(f, *args, **kwargs): + f.write(BLOB_CHUNK) + + blob_mock._prep_and_do_download.side_effect = write_to_file + + with mock.patch("google.cloud.storage.transfer_manager.open", mock.mock_open()): + with pytest.raises(DataCorruption): + transfer_manager.download_chunks_concurrently( + blob_mock, + FILENAME, + chunk_size=CHUNK_SIZE, + download_kwargs=DOWNLOAD_KWARGS, + worker_type=transfer_manager.THREAD, + crc32c_checksum=True, + ) + + +def test_download_chunks_concurrently_raises_on_invalid_kwargs(): blob_mock = mock.Mock(spec=Blob) FILENAME = "file_a.txt" MULTIPLE = 4 @@ -594,6 +652,16 @@ def test_download_chunks_concurrently_raises_on_start_and_end(): "end": (CHUNK_SIZE * (MULTIPLE - 1)) - 1, }, ) + with pytest.raises(ValueError): + transfer_manager.download_chunks_concurrently( + blob_mock, + FILENAME, + chunk_size=CHUNK_SIZE, + worker_type=transfer_manager.THREAD, + download_kwargs={ + "checksum": "crc32c", + }, + ) def test_download_chunks_concurrently_passes_concurrency_options(): @@ -616,6 +684,7 @@ def test_download_chunks_concurrently_passes_concurrency_options(): deadline=DEADLINE, worker_type=transfer_manager.THREAD, max_workers=MAX_WORKERS, + crc32c_checksum=False, ) pool_patch.assert_called_with(max_workers=MAX_WORKERS) wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) @@ -819,6 +888,7 @@ def __init__( self.generation = generation self._size_after_reload = size_after_reload self._generation_after_reload = generation_after_reload + self.client = _PickleableMockClient() def reload(self): self.size = self._size_after_reload @@ -876,6 +946,7 @@ def test_download_chunks_concurrently_with_processes(): chunk_size=CHUNK_SIZE, download_kwargs=DOWNLOAD_KWARGS, worker_type=transfer_manager.PROCESS, + crc32c_checksum=False, ) assert result is None @@ -907,9 +978,9 @@ def test__download_and_write_chunk_in_place(): FILENAME = "file_a.txt" with mock.patch("google.cloud.storage.transfer_manager.open", mock.mock_open()): result = transfer_manager._download_and_write_chunk_in_place( - pickled_mock, FILENAME, 0, 8, {} + pickled_mock, FILENAME, 0, 8, {}, False ) - assert result == "SUCCESS" + assert result is not None def test__upload_part(): @@ -973,3 +1044,31 @@ def test__call_method_on_maybe_pickled_blob(): pickled_blob, "_prep_and_do_download" ) assert result == "SUCCESS" + + +def test__ChecksummingSparseFileWrapper(): + FILENAME = "file_a.txt" + import google_crc32c + + with mock.patch( + "google.cloud.storage.transfer_manager.open", mock.mock_open() + ) as open_mock: + # test no checksumming + wrapper = transfer_manager._ChecksummingSparseFileWrapper(FILENAME, 0, False) + wrapper.write(b"abcdefgh") + handle = open_mock() + handle.write.assert_called_with(b"abcdefgh") + wrapper.write(b"ijklmnop") + assert wrapper.crc is None + handle.write.assert_called_with(b"ijklmnop") + + with mock.patch( + "google.cloud.storage.transfer_manager.open", mock.mock_open() + ) as open_mock: + wrapper = transfer_manager._ChecksummingSparseFileWrapper(FILENAME, 0, True) + wrapper.write(b"abcdefgh") + handle = open_mock() + handle.write.assert_called_with(b"abcdefgh") + wrapper.write(b"ijklmnop") + assert wrapper.crc == google_crc32c.value(b"abcdefghijklmnop") + handle.write.assert_called_with(b"ijklmnop")