Skip to content

Commit

Permalink
Drop Blosc
Browse files Browse the repository at this point in the history
This provides value, but not much in practice.

It more often gets in the way and causes pain due to version mismatches.
  • Loading branch information
mrocklin committed Mar 30, 2022
1 parent cced80d commit 827c7f0
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 85 deletions.
1 change: 0 additions & 1 deletion continuous_integration/environment-3.9.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ dependencies:
- pytest-repeat
- pytest-rerunfailures
- pytest-timeout
- python-blosc # Only tested here
- python-snappy # Only tested here
- pytorch # Only tested here
- requests
Expand Down
2 changes: 1 addition & 1 deletion distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ properties:
description: |
The compression algorithm to use
This could be one of lz4, snappy, zstd, or blosc
This could be one of lz4, snappy, zstd
offload:
type:
Expand Down
28 changes: 1 addition & 27 deletions distributed/protocol/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,12 @@
import random
from collections.abc import Callable
from contextlib import suppress
from functools import partial
from typing import Literal

from tlz import identity

import dask

try:
import blosc

n = blosc.set_nthreads(2)
if hasattr("blosc", "releasegil"):
blosc.set_releasegil(True)
except ImportError:
blosc = False

from distributed.utils import ensure_bytes

compressions: dict[
Expand Down Expand Up @@ -122,15 +112,6 @@ def zstd_decompress(data):
compressions["zstd"] = {"compress": zstd_compress, "decompress": zstd_decompress}


with suppress(ImportError):
import blosc

compressions["blosc"] = {
"compress": partial(blosc.compress, clevel=5, cname="lz4"),
"decompress": blosc.decompress,
}


def get_default_compression():
default = dask.config.get("distributed.comm.compression")
if default != "auto":
Expand Down Expand Up @@ -212,14 +193,7 @@ def maybe_compress(
else:
nbytes = len(payload)

if default_compression and blosc and type(payload) is memoryview:
# Blosc does itemsize-aware shuffling, resulting in better compression
compressed = blosc.compress(
payload, typesize=payload.itemsize, cname="lz4", clevel=5
)
compression = "blosc"
else:
compressed = compress(ensure_bytes(payload))
compressed = compress(ensure_bytes(payload))

if len(compressed) > 0.9 * nbytes: # full data not very compressible
return None, payload
Expand Down
45 changes: 1 addition & 44 deletions distributed/protocol/tests/test_numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,6 @@ def test_compress_numpy():
assert sum(map(nbytes, frames)) < x.nbytes

header = msgpack.loads(frames[1], raw=False, use_list=False, strict_map_key=False)
try:
import blosc # noqa: F401
except ImportError:
pass
else:
assert all(c == "blosc" for c in header["compression"])


def test_compress_memoryview():
Expand All @@ -240,49 +234,12 @@ def test_compress_memoryview():
assert len(compressed) < len(mv)


@pytest.mark.skip
def test_dont_compress_uncompressable_data():
blosc = pytest.importorskip("blosc")
x = np.random.randint(0, 255, size=100000).astype("uint8")
header, [data] = serialize(x)
assert "compression" not in header
assert data == x.data

x = np.ones(1000000)
header, [data] = serialize(x)
assert header["compression"] == ["blosc"]
assert data != x.data

x = np.ones(100)
header, [data] = serialize(x)
assert "compression" not in header
if isinstance(data, memoryview):
assert data.obj.ctypes.data == x.ctypes.data


@gen_cluster(client=True, timeout=60)
async def test_dumps_large_blosc(c, s, a, b):
async def test_dumps_large(c, s, a, b):
x = c.submit(np.ones, BIG_BYTES_SHARD_SIZE * 2, dtype="u1")
await x


def test_compression_takes_advantage_of_itemsize():
pytest.importorskip("lz4")
blosc = pytest.importorskip("blosc")
x = np.arange(1000000, dtype="i8")

assert len(blosc.compress(x.data, typesize=8)) < len(
blosc.compress(x.data, typesize=1)
)

_, a = serialize(x)
aa = [maybe_compress(frame)[1] for frame in a]
_, b = serialize(x.view("u1"))
bb = [maybe_compress(frame)[1] for frame in b]

assert sum(map(nbytes, aa)) < sum(map(nbytes, bb))


@pytest.mark.parametrize(
"x",
[
Expand Down
13 changes: 3 additions & 10 deletions distributed/protocol/tests/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ def test_maybe_compress(lib, compression):

payload = b"0" * 10000
rc, rd = maybe_compress(f(payload), compression=compression)
# For some reason compressing memoryviews can force blosc...
assert rc in (compression, "blosc")
assert rc == compression
assert compressions[rc]["decompress"](rd) == payload


Expand Down Expand Up @@ -218,14 +217,8 @@ def test_maybe_compress_memoryviews():
pytest.importorskip("lz4")
x = np.arange(1000000, dtype="int64")
compression, payload = maybe_compress(x.data)
try:
import blosc # noqa: F401
except ImportError:
assert compression == "lz4"
assert len(payload) < x.nbytes * 0.75
else:
assert compression == "blosc"
assert len(payload) < x.nbytes / 10
assert compression == "lz4"
assert len(payload) < x.nbytes * 0.75


@pytest.mark.parametrize("serializers", [("dask",), ("cuda",)])
Expand Down
3 changes: 1 addition & 2 deletions distributed/versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
("numpy", lambda p: p.__version__),
("pandas", lambda p: p.__version__),
("lz4", lambda p: p.__version__),
("blosc", lambda p: p.__version__),
]


# only these scheduler packages will be checked for version mismatch
scheduler_relevant_packages = {pkg for pkg, _ in required_packages} | {"lz4", "blosc"}
scheduler_relevant_packages = {pkg for pkg, _ in required_packages} | {"lz4"}


# notes to be displayed for mismatch packages
Expand Down

0 comments on commit 827c7f0

Please sign in to comment.