Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f427898
sketch out sync codecs + threadpool
d-v-b Feb 18, 2026
dbdc3d4
Merge branch 'main' into perf/faster-codecs
d-v-b Feb 18, 2026
65d1230
fix perf regressions
d-v-b Feb 19, 2026
e24fe7e
Merge branch 'perf/faster-codecs' of https://github.com/d-v-b/zarr-py…
d-v-b Feb 19, 2026
f979eaa
add partial encode / decode
d-v-b Feb 19, 2026
a934899
add sync hotpath
d-v-b Feb 19, 2026
b53ac3e
add comments and documentation
d-v-b Feb 19, 2026
73ac845
refactor sharding to allow sync
d-v-b Feb 19, 2026
aeecda8
fix array spec propagation
d-v-b Feb 19, 2026
69172fb
fix countingdict tests
d-v-b Feb 19, 2026
28d0def
update design doc
d-v-b Feb 19, 2026
f8e39e6
dynamic pool allocation
d-v-b Feb 19, 2026
b388911
default to 1 itemsize for data types that don't declare it
d-v-b Feb 19, 2026
7e29ef3
Merge branch 'main' into perf/faster-codecs
d-v-b Feb 19, 2026
00dde0b
Merge branch 'main' into perf/faster-codecs
d-v-b Feb 19, 2026
9d77ca5
remove extra codec pipeline
d-v-b Feb 19, 2026
88a4875
remove garbage
d-v-b Feb 19, 2026
284e5e2
lint
d-v-b Feb 19, 2026
b1b876a
use protocols for new sync behavior
d-v-b Feb 20, 2026
6996284
remove batch size parameter; add changelog entry
d-v-b Feb 20, 2026
204dda1
prune dead code, make protocols useful
d-v-b Feb 20, 2026
e9db616
restore batch size but it's only there for warnings
d-v-b Feb 20, 2026
01e1f73
fix type hints, prevent thread pool leakage, make codec pipeline intr…
d-v-b Feb 20, 2026
fbde3af
Merge branch 'main' of https://github.com/zarr-developers/zarr-python…
d-v-b Feb 20, 2026
11534b0
restore old comments / docstrings
d-v-b Feb 20, 2026
b40d53a
simplify threadpool management
d-v-b Feb 20, 2026
83c1dc1
use isinstance instead of explicit list of codec names
d-v-b Feb 21, 2026
e8a0cc6
consolidate thread pool configuration
d-v-b Feb 21, 2026
9a1d5eb
Merge branch 'main' of https://github.com/zarr-developers/zarr-python…
d-v-b Feb 21, 2026
bf7073f
Merge branch 'main' into perf/faster-codecs
d-v-b Feb 24, 2026
64ab320
Merge branch 'main' into perf/faster-codecs
d-v-b Feb 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 228 additions & 0 deletions docs/design/sync-bypass.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
# Design: Fully Synchronous Read/Write Bypass
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rabernat @dcherian have a look, this is claude's summary of the perf blockers addressed in this PR


## Problem

Zarr-python's read/write path is inherently async: every `Array.__getitem__`
or `Array.__setitem__` call passes through several layers of async machinery
before any actual work happens. For workloads where both the codec chain and
the store are fundamentally synchronous (e.g. gzip + MemoryStore, or
zstd + LocalStore), this async overhead dominates latency.

The call chain looks like this:

```
Array.__getitem__
└─ sync() # (1) thread hop: submits coroutine to background event loop
└─ AsyncArray._get_selection # runs on the event loop thread
└─ CodecPipeline.read # async pipeline
├─ concurrent_map # (2) launches tasks on event loop
│ └─ ByteGetter.get(prototype) # (3) async store IO
│ └─ MemoryStore.get() # just a dict lookup!
└─ codec.decode()
└─ asyncio.to_thread(...) # (4) thread hop for CPU work
└─ gzip.decompress(...) # actual compute
```

There are four sources of overhead, marked (1)-(4):

1. **`sync()` bridge**: Every synchronous `Array` method calls `sync()`, which
uses `asyncio.run_coroutine_threadsafe()` to submit work to a background
event loop thread. Even when the coroutine does zero awaiting, this costs
~30-50us for the round-trip through the event loop.

2. **`concurrent_map` batching**: The pipeline groups chunks into batches and
dispatches them via `concurrent_map`, which creates asyncio tasks. For
single-chunk reads (the common case), this is pure overhead.

3. **Async store IO**: `StorePath.get()` / `StorePath.set()` are `async def`.
For `MemoryStore` (a dict lookup) and `LocalStore` (a file read), the
underlying operation is synchronous — wrapping it in `async def` forces an
unnecessary context switch through the event loop.

4. **`asyncio.to_thread` for codec compute**: `BatchedCodecPipeline` runs each
codec's encode/decode in `asyncio.to_thread()`, adding another thread hop.
`SyncCodecPipeline` (the foundation this work builds on) already eliminates
this by calling `_decode_sync` / `_encode_sync` inline.

The net effect: a MemoryStore read of a single small chunk spends more time
in async machinery than in actual decompression.


## Solution

When the codec pipeline and store both support synchronous operation, bypass
the event loop entirely: run IO, codec compute, and buffer scatter all on the
calling thread, with zero async overhead.

The solution has three layers:

### Layer 1: Sync Store IO

Add `supports_sync`, `get_sync()`, `set_sync()`, and `delete_sync()` to the
store abstraction. These are opt-in: the `Store` ABC provides default
implementations that raise `NotImplementedError`, and only stores with native
sync capabilities override them.

```
Store ABC (defaults: supports_sync=False, methods raise NotImplementedError)
├── MemoryStore (supports_sync=True, direct dict access)
├── LocalStore (supports_sync=True, direct file IO via _get/_put)
└── FsspecStore (unchanged, remains async-only)

StorePath delegates to its underlying Store:
get_sync() → self.store.get_sync(self.path, ...)
set_sync() → self.store.set_sync(self.path, ...)
```

**Key decision**: `StorePath` is what gets passed to the codec pipeline as a
`ByteGetter` / `ByteSetter`. By adding sync methods to `StorePath`, the
pipeline can call them directly without knowing the concrete store type.

**Protocol gap**: The `ByteGetter` / `ByteSetter` protocols only define async
methods (`get`, `set`, `delete`). Rather than modifying these widely-used
protocols, the sync pipeline methods use `Any` type annotations for the
byte_getter/byte_setter parameters and call `.get_sync()` etc. at runtime.
This is a pragmatic tradeoff: the sync path is an optimization that only
activates when `supports_sync` is True, so the runtime type is always a
`StorePath` that has these methods.

### Layer 2: Sync Codec Pipeline IO

Add `supports_sync_io`, `read_sync()`, and `write_sync()` to the
`CodecPipeline` ABC (non-abstract, default raises `NotImplementedError`).

`SyncCodecPipeline` implements these with a simple sequential loop:

```python
# read_sync: for each chunk
for byte_getter, chunk_spec, chunk_sel, out_sel, _ in batch_info:
chunk_bytes = byte_getter.get_sync(prototype=chunk_spec.prototype) # sync IO
chunk_array = self._decode_one(chunk_bytes, ...) # sync compute
out[out_selection] = chunk_array[chunk_selection] # scatter
```

No batching, no `concurrent_map`, no event loop — just a Python for-loop.

**Sharding fallback**: When `supports_partial_decode` is True (i.e. the codec
pipeline uses sharding), `supports_sync_io` returns False and the Array falls
back to the standard `sync()` path. This is because `ShardingCodec`'s
`decode_partial` is async (it reads sub-ranges from the store) and does not
have a sync equivalent.

### Layer 3: Array Bypass

Each of the 10 sync `Array` selection methods (5 getters, 5 setters) gains a
fast path:

```python
def get_basic_selection(self, selection, *, out=None, prototype=None, fields=None):
indexer = BasicIndexer(selection, self.shape, self.metadata.chunk_grid)
if self._can_use_sync_path():
return _get_selection_sync(
self.async_array.store_path, self.async_array.metadata,
self.async_array.codec_pipeline, self.async_array.config,
indexer, out=out, fields=fields, prototype=prototype,
)
return sync(self.async_array._get_selection(indexer, ...))
```

`_can_use_sync_path()` checks three conditions:
1. The codec pipeline supports sync IO (`supports_sync_io`)
2. No partial decode is active (rules out sharding)
3. The store supports sync (`supports_sync`)

When all three hold, `_get_selection_sync` / `_set_selection_sync` run the
entire operation on the calling thread. These functions mirror the async
`_get_selection` / `_set_selection` exactly, but call `codec_pipeline.read_sync()`
/ `write_sync()` instead of `await codec_pipeline.read()` / `write()`.


## Resulting Call Chain

With the sync bypass active, the call chain becomes:

```
Array.__getitem__
└─ _get_selection_sync # runs on calling thread
└─ SyncCodecPipeline.read_sync
├─ StorePath.get_sync # direct dict/file access, no event loop
├─ _decode_one # inline codec chain, no to_thread
└─ out[sel] = array # scatter into output
```

No `sync()`, no event loop, no `asyncio.to_thread`, no `concurrent_map`.


## Additional Optimization: Codec Instance Caching

`GzipCodec` was creating a new `GZip(level)` instance on every encode/decode
call. `ZstdCodec` and `BloscCodec` already cache their codec instances via
`@cached_property`. We apply the same pattern to `GzipCodec`:

```python
@cached_property
def _gzip_codec(self) -> GZip:
return GZip(self.level)
```

This is safe because `GzipCodec` is a frozen dataclass — `level` never
changes after construction, so the cached instance is always valid.


## What Stays Unchanged

- **`BatchedCodecPipeline`**: Unmodified. It inherits the default
`supports_sync_io=False` from the ABC.
- **Remote stores** (`FsspecStore`): `supports_sync` stays `False`. All
remote IO remains async.
- **Sharded arrays**: Fall back to the `sync()` path because
`supports_partial_decode` is True.
- **All async APIs**: `AsyncArray`, `async def read/write`, etc. are
completely untouched. The sync bypass is an optimization of the
synchronous `Array` class only.


## Files Modified

| File | Layer | Change |
|------|-------|--------|
| `src/zarr/abc/store.py` | 1 | `supports_sync`, `get_sync`, `set_sync`, `delete_sync` on `Store` ABC |
| `src/zarr/storage/_memory.py` | 1 | Sync store methods (direct dict access) |
| `src/zarr/storage/_local.py` | 1 | Sync store methods (direct `_get`/`_put` calls) |
| `src/zarr/storage/_common.py` | 1 | Sync methods on `StorePath` (delegates to store) |
| `src/zarr/abc/codec.py` | 2 | `supports_sync_io`, `read_sync`, `write_sync` on `CodecPipeline` ABC |
| `src/zarr/experimental/sync_codecs.py` | 2 | `read_sync`, `write_sync` implementation |
| `src/zarr/core/array.py` | 3 | `_can_use_sync_path`, `_get_selection_sync`, `_set_selection_sync`, 10 method modifications |
| `src/zarr/codecs/gzip.py` | — | `@cached_property` for GZip instance |


## Design Tradeoffs

**Duplication of `_get_selection` / `_set_selection`**: The sync versions
(`_get_selection_sync`, `_set_selection_sync`) duplicate the setup logic
(dtype resolution, buffer creation, value coercion) from the async originals.
This is intentional: extracting shared helpers would add complexity and
indirection to the hot path for no functional benefit. The two versions
should be kept in sync manually.

**Sequential chunk processing**: `read_sync` and `write_sync` process chunks
sequentially in a for-loop, with no parallelism. For the target use case
(MemoryStore, LocalStore), this is optimal: MemoryStore is a dict lookup
(~1us), LocalStore is a file read that benefits from OS page cache, and
Python's GIL prevents true parallelism for CPU-bound codec work anyway. The
async path with `concurrent_map` is better for remote stores where IO latency
can be overlapped.

**`Any` type annotations**: The `read_sync` and `write_sync` methods on
`SyncCodecPipeline` use `Any` for the byte_getter/byte_setter type in the
`batch_info` tuples. This avoids modifying the `ByteGetter`/`ByteSetter`
protocols, which are public API. The runtime type is always `StorePath`, which
has the sync methods; the type system just can't express this constraint
through the existing protocol hierarchy.

**No sync partial decode/encode**: Sharding's `decode_partial` /
`encode_partial` methods are inherently async (they issue byte-range reads to
the store). Rather than adding sync variants to the sharding codec (which
would require significant refactoring), we simply fall back to the `sync()`
path for sharded arrays. This is the right tradeoff because sharded arrays
typically involve remote stores where async IO is beneficial anyway.
1 change: 1 addition & 0 deletions src/zarr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from zarr.core.array import Array, AsyncArray
from zarr.core.config import config
from zarr.core.group import AsyncGroup, Group
from zarr.experimental.sync_codecs import SyncCodecPipeline # noqa: F401 (registers pipeline)

# in case setuptools scm screw up and find version to be 0.0.0
assert not __version__.startswith("0.0.0")
Expand Down
70 changes: 70 additions & 0 deletions src/zarr/abc/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,21 @@ def validate(
The array chunk grid
"""

def _decode_sync(self, chunk_data: CodecOutput, chunk_spec: ArraySpec) -> CodecInput:
"""Synchronously decode a single chunk. Override in subclasses to enable
SyncCodecPipeline support."""
raise NotImplementedError # pragma: no cover

def _encode_sync(self, chunk_data: CodecInput, chunk_spec: ArraySpec) -> CodecOutput | None:
"""Synchronously encode a single chunk. Override in subclasses to enable
SyncCodecPipeline support."""
raise NotImplementedError # pragma: no cover

@property
def supports_sync(self) -> bool:
"""Whether this codec has synchronous encode/decode implementations."""
return type(self)._decode_sync is not BaseCodec._decode_sync

async def _decode_single(self, chunk_data: CodecOutput, chunk_spec: ArraySpec) -> CodecInput:
raise NotImplementedError # pragma: no cover

Expand Down Expand Up @@ -459,6 +474,61 @@ async def write(
"""
...

# -------------------------------------------------------------------
# Fully synchronous read/write (opt-in)
#
# When a CodecPipeline subclass can run the entire read/write path
# (store IO + codec compute + buffer scatter) without touching the
# event loop, it overrides these methods and sets supports_sync_io
# to True. This lets Array selection methods bypass sync() entirely.
#
# The default implementations raise NotImplementedError, so
# BatchedCodecPipeline (the standard pipeline) is unaffected.
#
# See docs/design/sync-bypass.md for the full design rationale.
# -------------------------------------------------------------------

@property
def supports_sync_io(self) -> bool:
"""Whether this pipeline can run read/write entirely on the calling thread.

True when:
- All codecs support synchronous encode/decode (_decode_sync/_encode_sync)
- The pipeline's read_sync/write_sync methods are implemented

Checked by ``Array._can_use_sync_path()`` to decide whether to bypass
the ``sync()`` event-loop bridge.
"""
return False

def read_sync(
self,
batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]],
out: NDBuffer,
drop_axes: tuple[int, ...] = (),
) -> None:
"""Synchronous read: fetch bytes from store, decode, scatter into out.

Runs entirely on the calling thread. Only available when
``supports_sync_io`` is True. Called by ``_get_selection_sync`` in
``array.py`` when the sync bypass is active.
"""
raise NotImplementedError

def write_sync(
self,
batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool]],
value: NDBuffer,
drop_axes: tuple[int, ...] = (),
) -> None:
"""Synchronous write: gather from value, encode, persist to store.

Runs entirely on the calling thread. Only available when
``supports_sync_io`` is True. Called by ``_set_selection_sync`` in
``array.py`` when the sync bypass is active.
"""
raise NotImplementedError


async def _batching_helper(
func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
Expand Down
56 changes: 56 additions & 0 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,62 @@ def supports_partial_writes(self) -> Literal[False]:
"""
return False

# -----------------------------------------------------------------------
# Synchronous IO interface (opt-in)
#
# These methods enable the SyncCodecPipeline to bypass the event loop
# entirely for store IO. The default implementations raise
# NotImplementedError; stores that wrap fundamentally synchronous
# operations (MemoryStore, LocalStore) override them with direct
# implementations. Remote/cloud stores (FsspecStore) leave them as-is
# and remain async-only.
#
# See docs/design/sync-bypass.md for the full design rationale.
# -----------------------------------------------------------------------

@property
def supports_sync(self) -> bool:
"""Whether this store has native synchronous get/set/delete methods.

When True, ``SyncCodecPipeline.read_sync`` / ``write_sync`` will call
``get_sync`` / ``set_sync`` / ``delete_sync`` directly on the calling
thread, avoiding the event loop overhead of the async equivalents.

Subclasses that override the sync methods below should also override
this property to return True.
"""
return False

def get_sync(
self,
key: str,
prototype: BufferPrototype,
byte_range: ByteRequest | None = None,
) -> Buffer | None:
"""Synchronous version of ``get()``.

Called by ``SyncCodecPipeline.read_sync`` to fetch chunk bytes without
going through the event loop. Only called when ``supports_sync`` is
True, so the default ``NotImplementedError`` is never hit in practice.
"""
raise NotImplementedError

def set_sync(self, key: str, value: Buffer) -> None:
"""Synchronous version of ``set()``.

Called by ``SyncCodecPipeline.write_sync`` to persist encoded chunk
bytes without going through the event loop.
"""
raise NotImplementedError

def delete_sync(self, key: str) -> None:
"""Synchronous version of ``delete()``.

Called by ``SyncCodecPipeline.write_sync`` when a chunk should be
removed (e.g. an empty chunk with ``write_empty_chunks=False``).
"""
raise NotImplementedError

@property
@abstractmethod
def supports_listing(self) -> bool:
Expand Down
Loading
Loading