Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions docarray/array/mixins/io/binary.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import base64
import io
import os.path
import os
import pickle
from contextlib import nullcontext
from typing import Union, BinaryIO, TYPE_CHECKING, Type, Optional, Generator

from ....helper import __windows__, get_compress_ctx, decompress_bytes
from ....helper import (
__windows__,
get_compress_ctx,
decompress_bytes,
protocol_and_compress_from_file_path,
)

if TYPE_CHECKING:
from ....types import T
Expand Down Expand Up @@ -36,12 +42,23 @@ def load_binary(
:param streaming: if `True` returns a generator over `Document` objects.
In case protocol is pickle the `Documents` are streamed from disk to save memory usage
:return: a DocumentArray object

.. note::
If `file` is `str` it can specify `protocol` and `compress` as file extensions.
This functionality assumes `file=file_name.$protocol.$compress` where `$protocol` and `$compress` refer to a
string interpolation of the respective `protocol` and `compress` methods.
For example if `file=my_docarray.protobuf.lz4` then the binary data will be loaded assuming `protocol=protobuf`
and `compress=lz4`.
"""

if isinstance(file, io.BufferedReader):
file_ctx = nullcontext(file)
elif isinstance(file, bytes):
file_ctx = nullcontext(file)
elif os.path.exists(file):
protocol, compress = protocol_and_compress_from_file_path(
file, protocol, compress
)
file_ctx = open(file, 'rb')
else:
raise ValueError(f'unsupported input {file!r}')
Expand Down Expand Up @@ -191,18 +208,33 @@ def save_binary(
) -> None:
"""Save array elements into a binary file.

:param file: File or filename to which the data is saved.
:param protocol: protocol to use
:param compress: compress algorithm to use

.. note::
If `file` is `str` it can specify `protocol` and `compress` as file extensions.
This functionality assumes `file=file_name.$protocol.$compress` where `$protocol` and `$compress` refer to a
string interpolation of the respective `protocol` and `compress` methods.
For example if `file=my_docarray.protobuf.lz4` then the binary data will be created using `protocol=protobuf`
and `compress=lz4`.

Comparing to :meth:`save_json`, it is faster and the file is smaller, but not human-readable.

.. note::
To get a binary presentation in memory, use ``bytes(...)``.

:param protocol: protocol to use
:param compress: compress algorithm to use
:param file: File or filename to which the data is saved.
"""
if isinstance(file, io.BufferedWriter):
file_ctx = nullcontext(file)
else:
_protocol, _compress = protocol_and_compress_from_file_path(file)

if _protocol is not None:
protocol = _protocol
if _compress is not None:
compress = _compress

file_ctx = open(file, 'wb')

self.to_bytes(protocol=protocol, compress=compress, _file_ctx=file_ctx)
Expand Down
69 changes: 68 additions & 1 deletion docarray/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
import random
import sys
import uuid
import pathlib
import warnings
from typing import Any, Dict, Optional, Sequence
from typing import Any, Dict, Optional, Sequence, Tuple

ALLOWED_PROTOCOLS = {'pickle', 'protobuf', 'protobuf-array', 'pickle-array'}
ALLOWED_COMPRESSIONS = {'lz4', 'bz2', 'lzma', 'zlib', 'gzip'}

__windows__ = sys.platform == 'win32'

Expand Down Expand Up @@ -321,3 +325,66 @@ def dataclass_from_dict(klass, dikt):
if isinstance(dikt, (tuple, list)):
return [dataclass_from_dict(klass.__args__[0], f) for f in dikt]
return dikt


def protocol_and_compress_from_file_path(
file_path: str,
default_protocol: Optional[str] = None,
default_compress: Optional[str] = None,
) -> Tuple[Optional[str], Optional[str]]:
"""Extract protocol and compression algorithm from a string, use defaults if not found.

:param file_path: path of a file.
:param default_protocol: default serialization protocol used in case not found.
:param default_compress: default compression method used in case not found.

Examples:

>>> protocol_and_compress_from_file_path('./docarray_fashion_mnist.protobuf.gzip')
('protobuf', 'gzip')

>>> protocol_and_compress_from_file_path('/Documents/docarray_fashion_mnist.protobuf')
('protobuf', None)

>>> protocol_and_compress_from_file_path('/Documents/docarray_fashion_mnist.gzip')
(None, gzip)
"""

protocol = default_protocol
compress = default_compress

file_extensions = [e.replace('.', '') for e in pathlib.Path(file_path).suffixes]
for extension in file_extensions:
if extension in ALLOWED_PROTOCOLS:
protocol = extension
elif extension in ALLOWED_COMPRESSIONS:
compress = extension

return protocol, compress


def add_protocol_and_compress_to_file_path(
file_path: str, protocol: Optional[str] = None, compress: Optional[str] = None
) -> str:
"""Creates a new file path with the protocol and compression methods as extensions.

:param file_path: path of a file.
:param protocol: chosen protocol.
:param compress: compression algorithm.

Examples:

>>> add_protocol_and_compress_to_file_path('docarray_fashion_mnist.bin')
'docarray_fashion_mnist.bin'

>>> add_protocol_and_compress_to_file_path('docarray_fashion_mnist', 'protobuf', 'gzip')
'docarray_fashion_mnist.protobuf.gzip'
"""

file_path_extended = file_path
if protocol:
file_path_extended += '.' + protocol
if compress:
file_path_extended += '.' + compress

return file_path_extended
46 changes: 44 additions & 2 deletions docs/fundamentals/documentarray/serialization.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
(docarray-serialization)=
# Serialization

DocArray is designed to be "ready-to-wire" at anytime. Serialization is important. DocumentArray provides multiple serialization methods that allows one transfer DocumentArray object over network and across different microservices.
DocArray is designed to be "ready-to-wire" at anytime. Serialization is important.
DocumentArray provides multiple serialization methods that allows one transfer DocumentArray object over network and across different microservices.
Moreover, there is the ability to store/load `DocumentArray` objects to/from disk.

- JSON string: `.from_json()`/`.to_json()`
- Pydantic model: `.from_pydantic_model()`/`.to_pydantic_model()`
- Bytes (compressed): `.from_bytes()`/`.to_bytes()`
- Disk serialization: `.save_binary()`/`.load_binary()`
- Base64 (compressed): `.from_base64()`/`.to_base64()`
- Protobuf Message: `.from_protobuf()`/`.to_protobuf()`
- Python List: `.from_list()`/`.to_list()`
Expand All @@ -15,7 +18,6 @@ DocArray is designed to be "ready-to-wire" at anytime. Serialization is importan




## From/to JSON


Expand Down Expand Up @@ -161,6 +163,46 @@ Afterwards, `doc1_bytes` describes how many bytes are used to serialize `doc1`,
The pattern `dock_bytes` and `dock.to_bytes` is repeated `len(docs)` times.


### From/to Disk

If you want to store a `DocumentArray` to disk you can use `.save_binary(filename, protocol, compress)` where `protocol` and `compress` refer to the protocol and compression methods used to serialize the data.
If you want to load a `DocumentArray` from disk you can use `.load_binary(filename, protocol, compress)`.

For example, the following snippet shows how to save/load a `DocumentArray` in `my_docarray.bin`.

```python
from docarray import DocumentArray, Document

da = DocumentArray([Document(text='hello'), Document(text='world')])

da.save_binary('my_docarray.bin', protocol='protobuf', compress='lz4')
da_rec = DocumentArray.load_binary('my_docarray.bin', protocol='protobuf', compress='lz4')
da_rec == da
```

Note that in the previous code snippet the user needs to remember the protol and compression methods used to store the data in order to load it back correctly. `DocArray` allows you to specify `protocol` and `compress` as file extensions.
By doing so you can forget later on which protocol and compression methods were used to serialize the data to disk.
This functionality assumes `.save_binary` and `.load_binary` are called with `filename` following the form `file_name.$protocol.$compress`, where `$protocol` and `$compress` refer to a string interpolation of the respective `protocol` and `compress` methods.

For example if `file=my_docarray.protobuf.lz4` then the binary data will be created using `protocol=protobuf` and `compress=lz4`.

The previous code snippet can be simplified to

```python
from docarray import DocumentArray, Document

da = DocumentArray([Document(text='hello'), Document(text='world')])

da.save_binary('my_docarray.protobuf.lz4')
da_rec = DocumentArray.load_binary('my_docarray.protobuf.lz4')
da_rec == da
```

```{tip}
If you don't want to specify and remember `protocol` and `compress` to store/load to/from disk, save your `DocumentArray` `da` using
`da.save_binary('file_name.$protocol.$compress')` so that it can be loaded back with `DocumentArray.load_binary('file_name.$protocol.$compress')`
```

### Stream large binary serialization from disk

In particular, if a serialization uses `protocol='pickle'` or `protocol='protobuf'`, then you can load it via streaming with a constant memory consumption by setting `streaming=True`:
Expand Down
19 changes: 18 additions & 1 deletion tests/unit/array/test_from_to_bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from tests import random_docs


from docarray.helper import add_protocol_and_compress_to_file_path


def get_ndarrays_for_ravel():
a = np.random.random([100, 3])
a[a > 0.5] = 0
Expand Down Expand Up @@ -60,17 +63,30 @@ def test_to_from_bytes(target_da, protocol, compress, ndarray_val, is_sparse):
)
@pytest.mark.parametrize('compress', ['lz4', 'bz2', 'lzma', 'zlib', 'gzip', None])
def test_save_bytes(target_da, protocol, compress, tmpfile):

# tests .save_binary(file, protocol=protocol, compress=compress)
target_da.save_binary(tmpfile, protocol=protocol, compress=compress)
target_da.save_binary(str(tmpfile), protocol=protocol, compress=compress)

with open(tmpfile, 'wb') as fp:
target_da.save_binary(fp, protocol=protocol, compress=compress)

DocumentArray.load_binary(tmpfile, protocol=protocol, compress=compress)
da_from_protocol_compress = DocumentArray.load_binary(
tmpfile, protocol=protocol, compress=compress
)
DocumentArray.load_binary(str(tmpfile), protocol=protocol, compress=compress)
with open(tmpfile, 'rb') as fp:
DocumentArray.load_binary(fp, protocol=protocol, compress=compress)

# tests .save_binary(file.protocol.compress) without arguments `compression` and `protocol`
file_path_extended = add_protocol_and_compress_to_file_path(
str(tmpfile), protocol, compress
)

target_da.save_binary(file_path_extended)
da_from_file_extension = DocumentArray.load_binary(file_path_extended)
assert da_from_protocol_compress == da_from_file_extension


@pytest.mark.parametrize('target_da', [DocumentArray.empty(100), random_docs(100)])
def test_from_to_protobuf(target_da):
Expand Down Expand Up @@ -106,6 +122,7 @@ def test_save_bytes_stream(tmpfile, protocol, compress):
[Document(text='aaa'), Document(text='bbb'), Document(text='ccc')]
)
da.save_binary(tmpfile, protocol=protocol, compress=compress)

da_reconstructed = DocumentArray.load_binary(
tmpfile, protocol=protocol, compress=compress, streaming=True
)
Expand Down
50 changes: 50 additions & 0 deletions tests/unit/test_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import pytest
import pathlib

from docarray.helper import (
protocol_and_compress_from_file_path,
add_protocol_and_compress_to_file_path,
)


@pytest.mark.parametrize(
'file_path', ['doc_array', '../docarray', './a_folder/docarray']
)
@pytest.mark.parametrize(
'protocol', ['protobuf', 'protobuf-array', 'pickle', 'pickle-array']
)
@pytest.mark.parametrize('compress', ['lz4', 'bz2', 'lzma', 'zlib', 'gzip', None])
def test_protocol_and_compress_from_file_path(file_path, protocol, compress):

file_path_extended = file_path
if protocol:
file_path_extended += '.' + protocol
if compress:
file_path_extended += '.' + compress

_protocol, _compress = protocol_and_compress_from_file_path(file_path_extended)

assert _protocol in {'protobuf', 'protobuf-array', 'pickle', 'pickle-array', None}
assert _compress in {'lz4', 'bz2', 'lzma', 'zlib', 'gzip', None}

assert protocol == _protocol
assert compress == _compress


@pytest.mark.parametrize('file_path', ['doc_array', './some_folder/doc_array'])
@pytest.mark.parametrize(
'protocol', ['protobuf', 'protobuf-array', 'pickle', 'pickle-array']
)
@pytest.mark.parametrize('compress', ['lz4', 'bz2', 'lzma', 'zlib', 'gzip'])
def test_add_protocol_and_compress_to_file_path(file_path, compress, protocol):
file_path_extended = add_protocol_and_compress_to_file_path(
file_path, compress, protocol
)
file_path_suffixes = [
e.replace('.', '') for e in pathlib.Path(file_path_extended).suffixes
]

if compress:
assert compress in file_path_suffixes
if protocol:
assert protocol in file_path_suffixes