-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concatenate small input chunks before P2P rechunking #8832
Changes from all commits
6fd92b8
5b85a65
dac50a9
d8a9c8d
c23fd8f
81760fa
6f7a107
2ae27fd
1b7941d
d269e03
24bafbe
1db2284
100fa43
8035a4a
2c79a6e
73dfd99
3a119a5
54f234e
e4fdfdb
34f6ee1
d86ae35
5487c23
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -850,7 +850,8 @@ async def test_rechunk_avoid_needless_chunking(c, s, *ws): | |
x = da.ones(16, chunks=2) | ||
y = x.rechunk(8, method="p2p") | ||
dsk = y.__dask_graph__() | ||
assert len(dsk) <= 8 + 2 | ||
# 8 inputs, 2 concatenations of small inputs, 2 outputs | ||
assert len(dsk) <= 8 + 2 + 2 | ||
|
||
|
||
@pytest.mark.parametrize( | ||
|
@@ -1340,7 +1341,7 @@ async def test_partial_rechunk_taskgroups(c, s): | |
), | ||
timeout=5, | ||
) | ||
assert len(s.task_groups) < 6 | ||
assert len(s.task_groups) < 7 | ||
|
||
|
||
@pytest.mark.parametrize( | ||
|
@@ -1354,25 +1355,107 @@ async def test_partial_rechunk_taskgroups(c, s): | |
], | ||
) | ||
def test_calculate_prechunking_1d(old, new, expected): | ||
actual = _calculate_prechunking(old, new) | ||
actual = _calculate_prechunking(old, new, np.dtype, None) | ||
assert actual == expected | ||
|
||
|
||
@pytest.mark.parametrize( | ||
["old", "new", "expected"], | ||
[ | ||
[((2, 2), (3, 3)), ((2, 2), (3, 3)), ((2, 2), (3, 3))], | ||
[((2, 2), (3, 3)), ((4,), (3, 3)), ((2, 2), (3, 3))], | ||
[((2, 2), (3, 3)), ((4,), (3, 3)), ((4,), (3, 3))], | ||
[((2, 2), (3, 3)), ((1, 1, 1, 1), (3, 3)), ((2, 2), (3, 3))], | ||
[ | ||
((2, 2, 2), (3, 3, 3)), | ||
((1, 2, 2, 1), (2, 3, 4)), | ||
((1, 1, 1, 1, 1, 1), (2, 1, 2, 1, 3)), | ||
((1, 2, 2, 1), (2, 3, 4)), | ||
], | ||
[((1, np.nan), (3, 3)), ((1, np.nan), (2, 2, 2)), ((1, np.nan), (2, 1, 1, 2))], | ||
[((4,), (1, 1, 1)), ((1, 1, 1, 1), (3,)), ((4,), (1, 1, 1))], | ||
[((4,), (1, 1, 1)), ((1, 1, 1, 1), (3,)), ((4,), (3,))], | ||
], | ||
) | ||
def test_calculate_prechunking_2d(old, new, expected): | ||
actual = _calculate_prechunking(old, new) | ||
actual = _calculate_prechunking(old, new, np.dtype(np.int16), None) | ||
assert actual == expected | ||
|
||
|
||
@pytest.mark.parametrize( | ||
["old", "new", "expected"], | ||
[ | ||
( | ||
((2, 2), (1, 1, 1, 1), (1, 1, 1, 1)), | ||
((1, 1, 1, 1), (4,), (2, 2)), | ||
((2, 2), (4,), (1, 1, 1, 1)), | ||
), | ||
( | ||
((2, 2), (1, 1, 1, 1), (1, 1, 1, 1)), | ||
((1, 1, 1, 1), (2, 2), (2, 2)), | ||
((2, 2), (2, 2), (2, 2)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one worries me a little bit. the max input chunk is 2, max output chunk is 4 but the algorithm concatenates in a way that we end with 8, which is not great There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is the block size limit the upper bound here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, https://github.com/dask/distributed/pull/8832/files#diff-0b80e83452ff3472b265026d4516846014500b991e12f3de4a41b39a990afbc6R494 is the limit here, so in this case it's 8 because |
||
), | ||
( | ||
((2, 2), (1, 1, 1, 1), (1, 1, 1, 1)), | ||
((1, 1, 1, 1), (2, 2), (4,)), | ||
((2, 2), (2, 2), (2, 2)), | ||
), | ||
( | ||
((1, 1, 1, 1), (1, 1, 1, 1), (2, 2)), | ||
((2, 2), (4,), (1, 1, 1, 1)), | ||
((2, 2), (2, 2), (2, 2)), | ||
), | ||
], | ||
) | ||
def test_calculate_prechunking_3d(old, new, expected): | ||
with dask.config.set({"array.chunk-size": "16 B"}): | ||
actual = _calculate_prechunking(old, new, np.dtype(np.int16), None) | ||
assert actual == expected | ||
|
||
|
||
@pytest.mark.parametrize( | ||
["chunk_size", "expected"], | ||
[ | ||
("1 B", ((10,), (1,) * 10)), | ||
("20 B", ((10,), (1,) * 10)), | ||
("40 B", ((10,), (2, 2, 1, 2, 2, 1))), | ||
("100 B", ((10,), (5, 5))), | ||
], | ||
) | ||
def test_calculate_prechunking_concatenation(chunk_size, expected): | ||
old = ((10,), (1,) * 10) | ||
new = ((2,) * 5, (5, 5)) | ||
with dask.config.set({"array.chunk-size": chunk_size}): | ||
actual = _calculate_prechunking(old, new, np.dtype(np.int16), None) | ||
assert actual == expected | ||
|
||
|
||
def test_calculate_prechunking_does_not_concatenate_object_type(): | ||
old = ((10,), (1,) * 10) | ||
new = ((2,) * 5, (5, 5)) | ||
|
||
# Ensure that int dtypes get concatenated | ||
new = ((2,) * 5, (5, 5)) | ||
with dask.config.set({"array.chunk-size": "100 B"}): | ||
actual = _calculate_prechunking(old, new, np.dtype(np.int16), None) | ||
assert actual == ((10,), (5, 5)) | ||
|
||
# Ensure object dtype chunks do not get concatenated | ||
with dask.config.set({"array.chunk-size": "100 B"}): | ||
actual = _calculate_prechunking(old, new, np.dtype(object), None) | ||
assert actual == old | ||
|
||
|
||
@pytest.mark.parametrize( | ||
["old", "new", "expected"], | ||
[ | ||
[((2, 2), (3, 3)), ((4,), (3, 3)), ((2, 2), (3, 3))], | ||
[ | ||
((2, 2, 2), (3, 3, 3)), | ||
((1, 2, 2, 1), (2, 3, 4)), | ||
((1, 1, 1, 1, 1, 1), (2, 1, 2, 1, 3)), | ||
], | ||
[((4,), (1, 1, 1)), ((1, 1, 1, 1), (3,)), ((4,), (1, 1, 1))], | ||
], | ||
) | ||
def test_calculate_prechunking_splitting(old, new, expected): | ||
# _calculate_prechunking does not concatenate on object | ||
actual = _calculate_prechunking(old, new, np.dtype(object), None) | ||
assert actual == expected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a sentences what these 2 variables represent when you define them above? Took me a bit to figure this out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done