Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,8 @@ tests/.hypothesis

zarr/version.py
zarr.egg-info/

# Local agent / planning notes (not versioned)
.claude/
CLAUDE.md
docs/superpowers/
1 change: 1 addition & 0 deletions changes/3907.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add protocols for stores that support byte-range-writes. This is necessary to support in-place writes of sharded arrays.
1 change: 1 addition & 0 deletions changes/3908.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reuse a constant `ArraySpec` during indexing when possible.
18 changes: 18 additions & 0 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"Store",
"SupportsDeleteSync",
"SupportsGetSync",
"SupportsSetRange",
"SupportsSetSync",
"SupportsSyncStore",
"set_or_delete",
Expand Down Expand Up @@ -709,6 +710,23 @@ async def delete(self) -> None: ...
async def set_if_not_exists(self, default: Buffer) -> None: ...


@runtime_checkable
class SupportsSetRange(Protocol):
"""Protocol for stores that support writing to a byte range within an existing value.

Overwrites ``len(value)`` bytes starting at byte offset ``start`` within the
existing stored value for ``key``. The key must already exist and the write
must fit within the existing value (i.e., ``start + len(value) <= len(existing)``).

Behavior when the write extends past the end of the existing value is
implementation-specific and should not be relied upon.
"""

async def set_range(self, key: str, value: Buffer, start: int) -> None: ...

def set_range_sync(self, key: str, value: Buffer, start: int) -> None: ...


@runtime_checkable
class SupportsGetSync(Protocol):
def get_sync(
Expand Down
37 changes: 20 additions & 17 deletions src/zarr/codecs/_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ class V2Codec(ArrayBytesCodec):

is_fixed_size = False

async def _decode_single(
def _decode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
cdata = chunk_bytes.as_array_like()
# decompress
if self.compressor:
chunk = await asyncio.to_thread(self.compressor.decode, cdata)
chunk = self.compressor.decode(cdata)
else:
chunk = cdata

# apply filters
if self.filters:
for f in reversed(self.filters):
chunk = await asyncio.to_thread(f.decode, chunk)
chunk = f.decode(chunk)

# view as numpy array with correct dtype
chunk = ensure_ndarray_like(chunk)
Expand All @@ -48,20 +48,9 @@ async def _decode_single(
try:
chunk = chunk.view(chunk_spec.dtype.to_native_dtype())
except TypeError:
# this will happen if the dtype of the chunk
# does not match the dtype of the array spec i.g. if
# the dtype of the chunk_spec is a string dtype, but the chunk
# is an object array. In this case, we need to convert the object
# array to the correct dtype.

chunk = np.array(chunk).astype(chunk_spec.dtype.to_native_dtype())

elif chunk.dtype != object:
# If we end up here, someone must have hacked around with the filters.
# We cannot deal with object arrays unless there is an object
# codec in the filter chain, i.e., a filter that converts from object
# array to something else during encoding, and converts back to object
# array during decoding.
raise RuntimeError("cannot read object array without object codec")

# ensure correct chunk shape
Expand All @@ -70,7 +59,7 @@ async def _decode_single(

return get_ndbuffer_class().from_ndarray_like(chunk)

async def _encode_single(
def _encode_sync(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
Expand All @@ -83,18 +72,32 @@ async def _encode_single(
# apply filters
if self.filters:
for f in self.filters:
chunk = await asyncio.to_thread(f.encode, chunk)
chunk = f.encode(chunk)
# check object encoding
if ensure_ndarray_like(chunk).dtype == object:
raise RuntimeError("cannot write object array without object codec")

# compress
if self.compressor:
cdata = await asyncio.to_thread(self.compressor.encode, chunk)
cdata = self.compressor.encode(chunk)
else:
cdata = chunk
cdata = ensure_bytes(cdata)
return chunk_spec.prototype.buffer.from_bytes(cdata)

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec)

async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return await asyncio.to_thread(self._encode_sync, chunk_array, chunk_spec)

def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError
50 changes: 30 additions & 20 deletions src/zarr/codecs/numcodecs/_codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
if TYPE_CHECKING:
from zarr.abc.numcodec import Numcodec
from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer, BufferPrototype, NDBuffer
from zarr.core.buffer import Buffer, NDBuffer

CODEC_PREFIX = "numcodecs."

Expand Down Expand Up @@ -132,53 +132,63 @@ class _NumcodecsBytesBytesCodec(_NumcodecsCodec, BytesBytesCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(
as_numpy_array_wrapper,
self._codec.decode,
chunk_data,
chunk_spec.prototype,
)
def _decode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return as_numpy_array_wrapper(self._codec.decode, chunk_data, chunk_spec.prototype)

def _encode(self, chunk_data: Buffer, prototype: BufferPrototype) -> Buffer:
def _encode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
encoded = self._codec.encode(chunk_data.as_array_like())
if isinstance(encoded, np.ndarray): # Required for checksum codecs
return prototype.buffer.from_bytes(encoded.tobytes())
return prototype.buffer.from_bytes(encoded)
return chunk_spec.prototype.buffer.from_bytes(encoded.tobytes())
return chunk_spec.prototype.buffer.from_bytes(encoded)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)

async def _encode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._encode, chunk_data, chunk_spec.prototype)
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)


class _NumcodecsArrayArrayCodec(_NumcodecsCodec, ArrayArrayCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

async def _decode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
def _decode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_ndarray = chunk_data.as_ndarray_like()
out = await asyncio.to_thread(self._codec.decode, chunk_ndarray)
out = self._codec.decode(chunk_ndarray)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out.reshape(chunk_spec.shape))

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
def _encode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_ndarray = chunk_data.as_ndarray_like()
out = await asyncio.to_thread(self._codec.encode, chunk_ndarray)
out = self._codec.encode(chunk_ndarray)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out)

async def _decode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)


class _NumcodecsArrayBytesCodec(_NumcodecsCodec, ArrayBytesCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
def _decode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_bytes = chunk_data.to_bytes()
out = await asyncio.to_thread(self._codec.decode, chunk_bytes)
out = self._codec.decode(chunk_bytes)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out.reshape(chunk_spec.shape))

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
def _encode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
chunk_ndarray = chunk_data.as_ndarray_like()
out = await asyncio.to_thread(self._codec.encode, chunk_ndarray)
out = self._codec.encode(chunk_ndarray)
return chunk_spec.prototype.buffer.from_bytes(out)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)


# bytes-to-bytes codecs
class Blosc(_NumcodecsBytesBytesCodec, codec_name="blosc"):
Expand Down
Loading
Loading