-
-
Notifications
You must be signed in to change notification settings - Fork 398
perf/chunktransform #3722
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf/chunktransform #3722
Changes from 15 commits
2b64daa
cd4efb0
41b7a6a
5a2a884
4e262b1
71a780b
23344e0
55821b8
f64de89
4b22c46
d22b6f0
df7be19
31f9b24
2c93b93
6cf8957
2cbdfc8
9d01432
5fd5b6e
a306e72
64f25f3
92754be
5fda260
28fc503
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from dataclasses import dataclass | ||
| from dataclasses import dataclass, field | ||
| from itertools import islice, pairwise | ||
| from typing import TYPE_CHECKING, Any, TypeVar | ||
| from warnings import warn | ||
|
|
@@ -14,6 +14,7 @@ | |
| Codec, | ||
| CodecPipeline, | ||
| GetResult, | ||
| SupportsSyncCodec, | ||
| ) | ||
| from zarr.core.common import concurrent_map | ||
| from zarr.core.config import config | ||
|
|
@@ -69,6 +70,115 @@ def fill_value_or_default(chunk_spec: ArraySpec) -> Any: | |
| return fill_value | ||
|
|
||
|
|
||
| @dataclass(slots=True, kw_only=True) | ||
| class ChunkTransform: | ||
| """A synchronous codec chain bound to an ArraySpec. | ||
|
|
||
| Provides ``encode_chunk`` and ``decode_chunk`` for pure-compute | ||
| codec operations (no IO, no threading, no batching). | ||
|
|
||
| ``shape`` and ``dtype`` reflect the representation **after** all | ||
| ArrayArrayCodec transforms — i.e. the spec that feeds the | ||
| ArrayBytesCodec. | ||
|
|
||
| All codecs must implement ``SupportsSyncCodec``. Construction will | ||
| raise ``TypeError`` if any codec does not. | ||
| """ | ||
|
|
||
| codecs: tuple[Codec, ...] | ||
| array_spec: ArraySpec | ||
|
|
||
| # (ArrayArrayCodec, input_spec) pairs in pipeline order. | ||
| _aa_codecs: tuple[tuple[ArrayArrayCodec, ArraySpec], ...] = field( | ||
| init=False, repr=False, compare=False | ||
| ) | ||
| _ab_codec: ArrayBytesCodec = field(init=False, repr=False, compare=False) | ||
| _ab_spec: ArraySpec = field(init=False, repr=False, compare=False) | ||
| _bb_codecs: tuple[BytesBytesCodec, ...] = field(init=False, repr=False, compare=False) | ||
|
|
||
| def __post_init__(self) -> None: | ||
| non_sync = [c for c in self.codecs if not isinstance(c, SupportsSyncCodec)] | ||
| if non_sync: | ||
| names = ", ".join(type(c).__name__ for c in non_sync) | ||
| raise TypeError( | ||
| f"All codecs must implement SupportsSyncCodec. The following do not: {names}" | ||
| ) | ||
|
|
||
| aa, ab, bb = codecs_from_list(list(self.codecs)) | ||
|
|
||
| aa_codecs: tuple[tuple[ArrayArrayCodec, ArraySpec], ...] = () | ||
| spec = self.array_spec | ||
| for aa_codec in aa: | ||
| aa_codecs = (*aa_codecs, (aa_codec, spec)) | ||
| spec = aa_codec.resolve_metadata(spec) | ||
|
|
||
| self._aa_codecs = aa_codecs | ||
| self._ab_codec = ab | ||
| self._ab_spec = spec | ||
| self._bb_codecs = bb | ||
|
|
||
| @property | ||
| def shape(self) -> tuple[int, ...]: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How are
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm on the fence about these attributes actually. I wanted to model the fact that the output of a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these attributes are gone, we can add them later if they are actually valuable |
||
| """Shape after all ArrayArrayCodec transforms (input to the ArrayBytesCodec).""" | ||
| return self._ab_spec.shape | ||
|
|
||
| @property | ||
| def dtype(self) -> ZDType[TBaseDType, TBaseScalar]: | ||
| """Dtype after all ArrayArrayCodec transforms (input to the ArrayBytesCodec).""" | ||
| return self._ab_spec.dtype | ||
|
|
||
| def decode( | ||
| self, | ||
| chunk_bytes: Buffer, | ||
| ) -> NDBuffer: | ||
| """Decode a single chunk through the full codec chain, synchronously. | ||
|
|
||
| Pure compute -- no IO. | ||
| """ | ||
| bb_out: Any = chunk_bytes | ||
| for bb_codec in reversed(self._bb_codecs): | ||
| bb_out = bb_codec._decode_sync(bb_out, self._ab_spec) # type: ignore[attr-defined] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a comment about why the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the type: ignore is gone thanks to making the protocol generic. |
||
|
|
||
| ab_out: Any = self._ab_codec._decode_sync(bb_out, self._ab_spec) # type: ignore[attr-defined] | ||
|
|
||
| for aa_codec, spec in reversed(self._aa_codecs): | ||
| ab_out = aa_codec._decode_sync(ab_out, spec) # type: ignore[attr-defined] | ||
|
|
||
| return ab_out # type: ignore[no-any-return] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this was because the protocol for synchronous encoding / decoding was not generic over input and output types. I fixed that, so the type: ignore statement is gone |
||
|
|
||
| def encode( | ||
| self, | ||
| chunk_array: NDBuffer, | ||
| ) -> Buffer | None: | ||
| """Encode a single chunk through the full codec chain, synchronously. | ||
|
|
||
| Pure compute -- no IO. | ||
| """ | ||
| aa_out: Any = chunk_array | ||
|
|
||
| for aa_codec, spec in self._aa_codecs: | ||
| if aa_out is None: | ||
| return None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could be moved out of the loop? Can
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
unfortunately yes, in keeping with methods already defined on the |
||
| aa_out = aa_codec._encode_sync(aa_out, spec) # type: ignore[attr-defined] | ||
|
|
||
| if aa_out is None: | ||
| return None | ||
| bb_out: Any = self._ab_codec._encode_sync(aa_out, self._ab_spec) # type: ignore[attr-defined] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is quite hard to read/review: "is the output of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what do you suggest
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok so it's an issue with the variable names, I will see if I can make them more clear
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the properties are an issue too but I don't have a suggestion for that.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [points accusingly at the v3 spec] making 3 different kinds of functions all "codecs" was maybe not the best choice. We could use the "filters, serializer, compressors" trinity used in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm going to punt on this for now. I legit think the naming issues here are a basic flaw in the v3 spec. |
||
|
|
||
| for bb_codec in self._bb_codecs: | ||
| if bb_out is None: | ||
| return None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could be hoisted out of the loop
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or maybe not? I'm confused regardless.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the whole thing with encoding potentially returning none is problematic and needs to go. Unfortunately that's based on the codec ABC which I am not touching right now :( |
||
| bb_out = bb_codec._encode_sync(bb_out, self._ab_spec) # type: ignore[attr-defined] | ||
|
|
||
| return bb_out # type: ignore[no-any-return] | ||
|
|
||
| def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: | ||
| for codec in self.codecs: | ||
| byte_length = codec.compute_encoded_size(byte_length, array_spec) | ||
| array_spec = codec.resolve_metadata(array_spec) | ||
| return byte_length | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class BatchedCodecPipeline(CodecPipeline): | ||
| """Default codec pipeline. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,222 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from typing import Any | ||
|
|
||
| import numpy as np | ||
| import pytest | ||
|
|
||
| from zarr.abc.codec import ArrayBytesCodec | ||
| from zarr.codecs.bytes import BytesCodec | ||
| from zarr.codecs.crc32c_ import Crc32cCodec | ||
| from zarr.codecs.gzip import GzipCodec | ||
| from zarr.codecs.transpose import TransposeCodec | ||
| from zarr.codecs.zstd import ZstdCodec | ||
| from zarr.core.array_spec import ArrayConfig, ArraySpec | ||
| from zarr.core.buffer import Buffer, NDBuffer, default_buffer_prototype | ||
| from zarr.core.codec_pipeline import ChunkTransform | ||
| from zarr.core.dtype import get_data_type_from_native_dtype | ||
|
|
||
|
|
||
| def _make_array_spec(shape: tuple[int, ...], dtype: np.dtype[np.generic]) -> ArraySpec: | ||
| zdtype = get_data_type_from_native_dtype(dtype) | ||
| return ArraySpec( | ||
| shape=shape, | ||
| dtype=zdtype, | ||
| fill_value=zdtype.cast_scalar(0), | ||
| config=ArrayConfig(order="C", write_empty_chunks=True), | ||
| prototype=default_buffer_prototype(), | ||
| ) | ||
|
|
||
|
|
||
| def _make_nd_buffer(arr: np.ndarray[Any, np.dtype[Any]]) -> NDBuffer: | ||
| return default_buffer_prototype().nd_buffer.from_numpy_array(arr) | ||
|
|
||
|
|
||
| class TestChunkTransform: | ||
| def test_construction_bytes_only(self) -> None: | ||
| # Construction succeeds when all codecs implement SupportsSyncCodec. | ||
| spec = _make_array_spec((100,), np.dtype("float64")) | ||
| ChunkTransform(codecs=(BytesCodec(),), array_spec=spec) | ||
|
|
||
| def test_construction_with_compression(self) -> None: | ||
| # AB + BB codec chain where both implement SupportsSyncCodec. | ||
| spec = _make_array_spec((100,), np.dtype("float64")) | ||
| ChunkTransform(codecs=(BytesCodec(), GzipCodec()), array_spec=spec) | ||
|
|
||
| def test_construction_full_chain(self) -> None: | ||
| # All three codec types (AA + AB + BB), all implementing SupportsSyncCodec. | ||
| spec = _make_array_spec((3, 4), np.dtype("float64")) | ||
| ChunkTransform( | ||
| codecs=(TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec()), array_spec=spec | ||
| ) | ||
|
|
||
| def test_encode_decode_roundtrip_bytes_only(self) -> None: | ||
| # Minimal round-trip: BytesCodec serializes the array to bytes and back. | ||
| # No compression, no AA transform. | ||
| arr = np.arange(100, dtype="float64") | ||
| spec = _make_array_spec(arr.shape, arr.dtype) | ||
| chain = ChunkTransform(codecs=(BytesCodec(),), array_spec=spec) | ||
| nd_buf = _make_nd_buffer(arr) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW, I'd be fine with consolidating these tests with the construction tests. I do like having focused construction tests when the constructors are complicated, but these seem simple enough that just seeing the traceback pointing to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good idea. I kept the encoding / decoding separate from the constructor tests, but I also made the constructor tests much more compact. |
||
|
|
||
| encoded = chain.encode(nd_buf) | ||
| assert encoded is not None | ||
| decoded = chain.decode(encoded) | ||
| np.testing.assert_array_equal(arr, decoded.as_numpy_array()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be worth refactoring this to a test helper, Also, not worth worrying about currently, |
||
|
|
||
| def test_shape_dtype_no_aa_codecs(self) -> None: | ||
| # Without AA codecs, shape and dtype should match the input ArraySpec | ||
| # (no transforms applied before the AB codec). | ||
| spec = _make_array_spec((100,), np.dtype("float64")) | ||
| chunk = ChunkTransform(codecs=(BytesCodec(),), array_spec=spec) | ||
| assert chunk.shape == (100,) | ||
| assert chunk.dtype == spec.dtype | ||
|
|
||
| def test_shape_dtype_with_transpose(self) -> None: | ||
| # TransposeCodec(order=(1,0)) on a (3, 4) array produces (4, 3). | ||
| # shape/dtype reflect what the AB codec sees after all AA transforms. | ||
| spec = _make_array_spec((3, 4), np.dtype("float64")) | ||
| chunk = ChunkTransform(codecs=(TransposeCodec(order=(1, 0)), BytesCodec()), array_spec=spec) | ||
| assert chunk.shape == (4, 3) | ||
| assert chunk.dtype == spec.dtype | ||
|
|
||
| def test_encode_decode_roundtrip_with_compression(self) -> None: | ||
| # Round-trip with a BB codec (GzipCodec) to verify that bytes-bytes | ||
| # compression/decompression is wired correctly. | ||
| arr = np.arange(100, dtype="float64") | ||
| spec = _make_array_spec(arr.shape, arr.dtype) | ||
| chain = ChunkTransform(codecs=(BytesCodec(), GzipCodec(level=1)), array_spec=spec) | ||
| nd_buf = _make_nd_buffer(arr) | ||
|
|
||
| encoded = chain.encode(nd_buf) | ||
| assert encoded is not None | ||
| decoded = chain.decode(encoded) | ||
| np.testing.assert_array_equal(arr, decoded.as_numpy_array()) | ||
|
|
||
| def test_encode_decode_roundtrip_with_transpose(self) -> None: | ||
| # Full AA + AB + BB chain round-trip. Transpose permutes axes on encode, | ||
| # then BytesCodec serializes, then ZstdCodec compresses. Decode reverses | ||
| # all three stages. Verifies the full pipeline works end to end. | ||
| arr = np.arange(12, dtype="float64").reshape(3, 4) | ||
| spec = _make_array_spec(arr.shape, arr.dtype) | ||
| chain = ChunkTransform( | ||
| codecs=(TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec(level=1)), | ||
| array_spec=spec, | ||
| ) | ||
| nd_buf = _make_nd_buffer(arr) | ||
|
|
||
| encoded = chain.encode(nd_buf) | ||
| assert encoded is not None | ||
| decoded = chain.decode(encoded) | ||
| np.testing.assert_array_equal(arr, decoded.as_numpy_array()) | ||
|
|
||
| def test_rejects_non_sync_codec(self) -> None: | ||
| # Construction must raise TypeError when a codec lacks SupportsSyncCodec. | ||
|
|
||
| class AsyncOnlyCodec(ArrayBytesCodec): | ||
| is_fixed_size = True | ||
|
|
||
| async def _decode_single(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> NDBuffer: | ||
| raise NotImplementedError # pragma: no cover | ||
|
|
||
| async def _encode_single( | ||
| self, chunk_array: NDBuffer, chunk_spec: ArraySpec | ||
| ) -> Buffer | None: | ||
| raise NotImplementedError # pragma: no cover | ||
|
|
||
| def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: | ||
| return input_byte_length # pragma: no cover | ||
|
d-v-b marked this conversation as resolved.
Outdated
|
||
|
|
||
| spec = _make_array_spec((100,), np.dtype("float64")) | ||
| with pytest.raises(TypeError, match="AsyncOnlyCodec"): | ||
| ChunkTransform(codecs=(AsyncOnlyCodec(),), array_spec=spec) | ||
|
|
||
| def test_rejects_mixed_sync_and_non_sync(self) -> None: | ||
| # Even if some codecs support sync, a single non-sync codec should | ||
| # cause construction to fail. | ||
|
|
||
| class AsyncOnlyCodec(ArrayBytesCodec): | ||
| is_fixed_size = True | ||
|
|
||
| async def _decode_single(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> NDBuffer: | ||
| raise NotImplementedError # pragma: no cover | ||
|
|
||
| async def _encode_single( | ||
| self, chunk_array: NDBuffer, chunk_spec: ArraySpec | ||
| ) -> Buffer | None: | ||
| raise NotImplementedError # pragma: no cover | ||
|
|
||
| def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: | ||
| return input_byte_length # pragma: no cover | ||
|
|
||
| spec = _make_array_spec((3, 4), np.dtype("float64")) | ||
| with pytest.raises(TypeError, match="AsyncOnlyCodec"): | ||
| ChunkTransform( | ||
| codecs=(TransposeCodec(order=(1, 0)), AsyncOnlyCodec()), | ||
| array_spec=spec, | ||
| ) | ||
|
|
||
| def test_compute_encoded_size_bytes_only(self) -> None: | ||
| # BytesCodec is size-preserving: encoded size == input size. | ||
| spec = _make_array_spec((100,), np.dtype("float64")) | ||
| chain = ChunkTransform(codecs=(BytesCodec(),), array_spec=spec) | ||
| assert chain.compute_encoded_size(800, spec) == 800 | ||
|
|
||
| def test_compute_encoded_size_with_crc32c(self) -> None: | ||
| # Crc32cCodec appends a 4-byte checksum, so encoded size = input + 4. | ||
| spec = _make_array_spec((100,), np.dtype("float64")) | ||
| chain = ChunkTransform(codecs=(BytesCodec(), Crc32cCodec()), array_spec=spec) | ||
| assert chain.compute_encoded_size(800, spec) == 804 | ||
|
|
||
| def test_compute_encoded_size_with_transpose(self) -> None: | ||
| # TransposeCodec reorders axes but doesn't change the byte count. | ||
| # Verifies that compute_encoded_size walks through AA codecs correctly. | ||
| spec = _make_array_spec((3, 4), np.dtype("float64")) | ||
| chain = ChunkTransform(codecs=(TransposeCodec(order=(1, 0)), BytesCodec()), array_spec=spec) | ||
| assert chain.compute_encoded_size(96, spec) == 96 | ||
|
|
||
| def test_encode_returns_none_propagation(self) -> None: | ||
| # When an AA codec returns None (signaling "this chunk is the fill value, | ||
| # don't store it"), encode must short-circuit and return None | ||
| # instead of passing None into the next codec. | ||
|
|
||
| class NoneReturningAACodec(TransposeCodec): | ||
| """An ArrayArrayCodec that always returns None from encode.""" | ||
|
|
||
| def _encode_sync(self, chunk_array: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer | None: | ||
| return None | ||
|
|
||
| spec = _make_array_spec((3, 4), np.dtype("float64")) | ||
| chain = ChunkTransform( | ||
| codecs=(NoneReturningAACodec(order=(1, 0)), BytesCodec()), | ||
| array_spec=spec, | ||
| ) | ||
| arr = np.arange(12, dtype="float64").reshape(3, 4) | ||
| nd_buf = _make_nd_buffer(arr) | ||
| assert chain.encode(nd_buf) is None | ||
|
|
||
| def test_encode_decode_roundtrip_with_crc32c(self) -> None: | ||
| # Round-trip through BytesCodec + Crc32cCodec. Crc32c appends a checksum | ||
| # on encode and verifies it on decode, so this tests that the BB codec | ||
| # pipeline runs correctly in both directions. | ||
| arr = np.arange(100, dtype="float64") | ||
| spec = _make_array_spec(arr.shape, arr.dtype) | ||
| chain = ChunkTransform(codecs=(BytesCodec(), Crc32cCodec()), array_spec=spec) | ||
| nd_buf = _make_nd_buffer(arr) | ||
|
|
||
| encoded = chain.encode(nd_buf) | ||
| assert encoded is not None | ||
| decoded = chain.decode(encoded) | ||
| np.testing.assert_array_equal(arr, decoded.as_numpy_array()) | ||
|
|
||
| def test_encode_decode_roundtrip_int32(self) -> None: | ||
| # Round-trip with int32 data to verify that the codec chain is not | ||
| # float-specific. Exercises a different dtype path through BytesCodec. | ||
| arr = np.arange(50, dtype="int32") | ||
| spec = _make_array_spec(arr.shape, arr.dtype) | ||
| chain = ChunkTransform(codecs=(BytesCodec(), ZstdCodec(level=1)), array_spec=spec) | ||
| nd_buf = _make_nd_buffer(arr) | ||
|
|
||
| encoded = chain.encode(nd_buf) | ||
| assert encoded is not None | ||
| decoded = chain.decode(encoded) | ||
| np.testing.assert_array_equal(arr, decoded.as_numpy_array()) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idle thought without having yet looked at the implementation: make
Codecgeneric overSupportsSyncCodec(via a protocol) so that this can be caught before runtime?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to avoid touching the codec ABC, so maybe we defer this for a later effort