Skip to content

Commit

Permalink
Drop Blosc (#6027)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Apr 8, 2022
1 parent b9cc650 commit a40f205
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 85 deletions.
1 change: 0 additions & 1 deletion continuous_integration/environment-3.9.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ dependencies:
- pytest-repeat
- pytest-rerunfailures
- pytest-timeout
- python-blosc # Only tested here
- python-snappy # Only tested here
- pytorch # Only tested here
- requests
Expand Down
2 changes: 1 addition & 1 deletion distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ properties:
description: |
The compression algorithm to use
This could be one of lz4, snappy, zstd, or blosc
This could be one of lz4, snappy, zstd
offload:
type:
Expand Down
28 changes: 1 addition & 27 deletions distributed/protocol/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,12 @@
import random
from collections.abc import Callable
from contextlib import suppress
from functools import partial
from typing import Literal

from tlz import identity

import dask

try:
import blosc

n = blosc.set_nthreads(2)
if hasattr("blosc", "releasegil"):
blosc.set_releasegil(True)
except ImportError:
blosc = False

from distributed.utils import ensure_bytes

compressions: dict[
Expand Down Expand Up @@ -122,15 +112,6 @@ def zstd_decompress(data):
compressions["zstd"] = {"compress": zstd_compress, "decompress": zstd_decompress}


with suppress(ImportError):
import blosc

compressions["blosc"] = {
"compress": partial(blosc.compress, clevel=5, cname="lz4"),
"decompress": blosc.decompress,
}


def get_default_compression():
default = dask.config.get("distributed.comm.compression")
if default != "auto":
Expand Down Expand Up @@ -212,14 +193,7 @@ def maybe_compress(
else:
nbytes = len(payload)

if default_compression and blosc and type(payload) is memoryview:
# Blosc does itemsize-aware shuffling, resulting in better compression
compressed = blosc.compress(
payload, typesize=payload.itemsize, cname="lz4", clevel=5
)
compression = "blosc"
else:
compressed = compress(ensure_bytes(payload))
compressed = compress(ensure_bytes(payload))

if len(compressed) > 0.9 * nbytes: # full data not very compressible
return None, payload
Expand Down
45 changes: 1 addition & 44 deletions distributed/protocol/tests/test_numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,6 @@ def test_compress_numpy():
assert sum(map(nbytes, frames)) < x.nbytes

header = msgpack.loads(frames[1], raw=False, use_list=False, strict_map_key=False)
try:
import blosc # noqa: F401
except ImportError:
pass
else:
assert all(c == "blosc" for c in header["compression"])


def test_compress_memoryview():
Expand All @@ -240,49 +234,12 @@ def test_compress_memoryview():
assert len(compressed) < len(mv)


@pytest.mark.skip
def test_dont_compress_uncompressable_data():
blosc = pytest.importorskip("blosc")
x = np.random.randint(0, 255, size=100000).astype("uint8")
header, [data] = serialize(x)
assert "compression" not in header
assert data == x.data

x = np.ones(1000000)
header, [data] = serialize(x)
assert header["compression"] == ["blosc"]
assert data != x.data

x = np.ones(100)
header, [data] = serialize(x)
assert "compression" not in header
if isinstance(data, memoryview):
assert data.obj.ctypes.data == x.ctypes.data


@gen_cluster(client=True, timeout=60)
async def test_dumps_large_blosc(c, s, a, b):
async def test_dumps_large(c, s, a, b):
x = c.submit(np.ones, BIG_BYTES_SHARD_SIZE * 2, dtype="u1")
await x


def test_compression_takes_advantage_of_itemsize():
pytest.importorskip("lz4")
blosc = pytest.importorskip("blosc")
x = np.arange(1000000, dtype="i8")

assert len(blosc.compress(x.data, typesize=8)) < len(
blosc.compress(x.data, typesize=1)
)

_, a = serialize(x)
aa = [maybe_compress(frame)[1] for frame in a]
_, b = serialize(x.view("u1"))
bb = [maybe_compress(frame)[1] for frame in b]

assert sum(map(nbytes, aa)) < sum(map(nbytes, bb))


@pytest.mark.parametrize(
"x",
[
Expand Down
13 changes: 3 additions & 10 deletions distributed/protocol/tests/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ def test_maybe_compress(lib, compression):

payload = b"0" * 10000
rc, rd = maybe_compress(f(payload), compression=compression)
# For some reason compressing memoryviews can force blosc...
assert rc in (compression, "blosc")
assert rc == compression
assert compressions[rc]["decompress"](rd) == payload


Expand Down Expand Up @@ -218,14 +217,8 @@ def test_maybe_compress_memoryviews():
pytest.importorskip("lz4")
x = np.arange(1000000, dtype="int64")
compression, payload = maybe_compress(x.data)
try:
import blosc # noqa: F401
except ImportError:
assert compression == "lz4"
assert len(payload) < x.nbytes * 0.75
else:
assert compression == "blosc"
assert len(payload) < x.nbytes / 10
assert compression == "lz4"
assert len(payload) < x.nbytes * 0.75


@pytest.mark.parametrize("serializers", [("dask",), ("cuda",)])
Expand Down
3 changes: 1 addition & 2 deletions distributed/versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
("numpy", lambda p: p.__version__),
("pandas", lambda p: p.__version__),
("lz4", lambda p: p.__version__),
("blosc", lambda p: p.__version__),
]


# only these scheduler packages will be checked for version mismatch
scheduler_relevant_packages = {pkg for pkg, _ in required_packages} | {"lz4", "blosc"}
scheduler_relevant_packages = {pkg for pkg, _ in required_packages} | {"lz4"}


# notes to be displayed for mismatch packages
Expand Down

0 comments on commit a40f205

Please sign in to comment.