Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion src/zarr/abc/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from abc import abstractmethod
from collections.abc import Mapping
from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal, Protocol, TypeGuard, runtime_checkable

from typing_extensions import ReadOnly, TypedDict
Expand All @@ -18,7 +19,7 @@
from zarr.abc.store import ByteGetter, ByteSetter, Store
from zarr.core.array_spec import ArraySpec
from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar, ZDType
from zarr.core.indexing import SelectorTuple
from zarr.core.indexing import ChunkProjection, SelectorTuple
from zarr.core.metadata import ArrayMetadata
from zarr.core.metadata.v3 import ChunkGridMetadata

Expand All @@ -33,6 +34,8 @@
"CodecOutput",
"CodecPipeline",
"GetResult",
"PreparedWrite",
"SupportsChunkCodec",
"SupportsSyncCodec",
]

Expand Down Expand Up @@ -82,6 +85,25 @@ def _decode_sync(self, chunk_data: CO, chunk_spec: ArraySpec) -> CI: ...
def _encode_sync(self, chunk_data: CI, chunk_spec: ArraySpec) -> CO | None: ...


class SupportsChunkCodec(Protocol):
"""Protocol for objects that can decode/encode whole chunks synchronously.

`ChunkTransform` satisfies this protocol. The ``chunk_shape`` parameter
allows decoding/encoding chunks of different shapes (e.g. rectilinear
grids) without rebuilding the transform.
"""

array_spec: ArraySpec

def decode_chunk(
self, chunk_bytes: Buffer, chunk_shape: tuple[int, ...] | None = None
) -> NDBuffer: ...

def encode_chunk(
self, chunk_array: NDBuffer, chunk_shape: tuple[int, ...] | None = None
) -> Buffer | None: ...


class BaseCodec[CI: CodecInput, CO: CodecOutput](Metadata):
"""Generic base class for codecs.

Expand Down Expand Up @@ -207,6 +229,37 @@ class ArrayArrayCodec(BaseCodec[NDBuffer, NDBuffer]):
"""Base class for array-to-array codecs."""


@dataclass
class PreparedWrite:
"""Intermediate state between reading existing data and writing new data.

Created by `prepare_write_sync` / `prepare_write`, consumed by
`finalize_write_sync` / `finalize_write`. The compute phase sits
in between: iterate over `indexer`, decode the corresponding entry
in `chunk_dict`, merge new data, re-encode, and store the result
back into `chunk_dict`.

Attributes
----------
chunk_dict : dict[tuple[int, ...], Buffer | None]
Per-inner-chunk encoded bytes, keyed by chunk coordinates.
For a regular array this is `{(0,): <bytes>}`. For a sharded
array it contains one entry per inner chunk in the shard,
including chunks not being modified (they pass through
unchanged). `None` means the chunk did not exist on disk.
indexer : list[ChunkProjection]
The inner chunks to modify. Each entry's `chunk_coords`
corresponds to a key in `chunk_dict`. `chunk_selection`
identifies the region within that inner chunk, and
`out_selection` identifies the corresponding region in the
source value array. This is a subset of `chunk_dict`'s keys
— untouched chunks are not listed.
"""

chunk_dict: dict[tuple[int, ...], Buffer | None]
indexer: list[ChunkProjection]


class ArrayBytesCodec(BaseCodec[NDBuffer, Buffer]):
"""Base class for array-to-bytes codecs."""

Expand Down
37 changes: 20 additions & 17 deletions src/zarr/codecs/_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ class V2Codec(ArrayBytesCodec):

is_fixed_size = False

async def _decode_single(
def _decode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
cdata = chunk_bytes.as_array_like()
# decompress
if self.compressor:
chunk = await asyncio.to_thread(self.compressor.decode, cdata)
chunk = self.compressor.decode(cdata)
else:
chunk = cdata

# apply filters
if self.filters:
for f in reversed(self.filters):
chunk = await asyncio.to_thread(f.decode, chunk)
chunk = f.decode(chunk)

# view as numpy array with correct dtype
chunk = ensure_ndarray_like(chunk)
Expand All @@ -48,20 +48,9 @@ async def _decode_single(
try:
chunk = chunk.view(chunk_spec.dtype.to_native_dtype())
except TypeError:
# this will happen if the dtype of the chunk
# does not match the dtype of the array spec i.g. if
# the dtype of the chunk_spec is a string dtype, but the chunk
# is an object array. In this case, we need to convert the object
# array to the correct dtype.

chunk = np.array(chunk).astype(chunk_spec.dtype.to_native_dtype())

elif chunk.dtype != object:
# If we end up here, someone must have hacked around with the filters.
# We cannot deal with object arrays unless there is an object
# codec in the filter chain, i.e., a filter that converts from object
# array to something else during encoding, and converts back to object
# array during decoding.
raise RuntimeError("cannot read object array without object codec")

# ensure correct chunk shape
Expand All @@ -70,7 +59,7 @@ async def _decode_single(

return get_ndbuffer_class().from_ndarray_like(chunk)

async def _encode_single(
def _encode_sync(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
Expand All @@ -83,18 +72,32 @@ async def _encode_single(
# apply filters
if self.filters:
for f in self.filters:
chunk = await asyncio.to_thread(f.encode, chunk)
chunk = f.encode(chunk)
# check object encoding
if ensure_ndarray_like(chunk).dtype == object:
raise RuntimeError("cannot write object array without object codec")

# compress
if self.compressor:
cdata = await asyncio.to_thread(self.compressor.encode, chunk)
cdata = self.compressor.encode(chunk)
else:
cdata = chunk
cdata = ensure_bytes(cdata)
return chunk_spec.prototype.buffer.from_bytes(cdata)

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec)

async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return await asyncio.to_thread(self._encode_sync, chunk_array, chunk_spec)

def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError
50 changes: 30 additions & 20 deletions src/zarr/codecs/numcodecs/_codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
if TYPE_CHECKING:
from zarr.abc.numcodec import Numcodec
from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer, BufferPrototype, NDBuffer
from zarr.core.buffer import Buffer, NDBuffer

CODEC_PREFIX = "numcodecs."

Expand Down Expand Up @@ -132,53 +132,63 @@ class _NumcodecsBytesBytesCodec(_NumcodecsCodec, BytesBytesCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(
as_numpy_array_wrapper,
self._codec.decode,
chunk_data,
chunk_spec.prototype,
)
def _decode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return as_numpy_array_wrapper(self._codec.decode, chunk_data, chunk_spec.prototype)

def _encode(self, chunk_data: Buffer, prototype: BufferPrototype) -> Buffer:
def _encode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
encoded = self._codec.encode(chunk_data.as_array_like())
if isinstance(encoded, np.ndarray): # Required for checksum codecs
return prototype.buffer.from_bytes(encoded.tobytes())
return prototype.buffer.from_bytes(encoded)
return chunk_spec.prototype.buffer.from_bytes(encoded.tobytes())
return chunk_spec.prototype.buffer.from_bytes(encoded)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)

async def _encode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._encode, chunk_data, chunk_spec.prototype)
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)


class _NumcodecsArrayArrayCodec(_NumcodecsCodec, ArrayArrayCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

async def _decode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
def _decode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_ndarray = chunk_data.as_ndarray_like()
out = await asyncio.to_thread(self._codec.decode, chunk_ndarray)
out = self._codec.decode(chunk_ndarray)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out.reshape(chunk_spec.shape))

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
def _encode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_ndarray = chunk_data.as_ndarray_like()
out = await asyncio.to_thread(self._codec.encode, chunk_ndarray)
out = self._codec.encode(chunk_ndarray)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out)

async def _decode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)


class _NumcodecsArrayBytesCodec(_NumcodecsCodec, ArrayBytesCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
def _decode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_bytes = chunk_data.to_bytes()
out = await asyncio.to_thread(self._codec.decode, chunk_bytes)
out = self._codec.decode(chunk_bytes)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out.reshape(chunk_spec.shape))

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
def _encode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
chunk_ndarray = chunk_data.as_ndarray_like()
out = await asyncio.to_thread(self._codec.encode, chunk_ndarray)
out = self._codec.encode(chunk_ndarray)
return chunk_spec.prototype.buffer.from_bytes(out)

async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)

async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)


# bytes-to-bytes codecs
class Blosc(_NumcodecsBytesBytesCodec, codec_name="blosc"):
Expand Down
Loading
Loading