diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index d3a336bf51f..14d539e84d4 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -1,11 +1,17 @@ import base64 import io import os.path +import os import pickle from contextlib import nullcontext from typing import Union, BinaryIO, TYPE_CHECKING, Type, Optional, Generator -from ....helper import __windows__, get_compress_ctx, decompress_bytes +from ....helper import ( + __windows__, + get_compress_ctx, + decompress_bytes, + protocol_and_compress_from_file_path, +) if TYPE_CHECKING: from ....types import T @@ -36,12 +42,23 @@ def load_binary( :param streaming: if `True` returns a generator over `Document` objects. In case protocol is pickle the `Documents` are streamed from disk to save memory usage :return: a DocumentArray object + + .. note:: + If `file` is `str` it can specify `protocol` and `compress` as file extensions. + This functionality assumes `file=file_name.$protocol.$compress` where `$protocol` and `$compress` refer to a + string interpolation of the respective `protocol` and `compress` methods. + For example if `file=my_docarray.protobuf.lz4` then the binary data will be loaded assuming `protocol=protobuf` + and `compress=lz4`. """ + if isinstance(file, io.BufferedReader): file_ctx = nullcontext(file) elif isinstance(file, bytes): file_ctx = nullcontext(file) elif os.path.exists(file): + protocol, compress = protocol_and_compress_from_file_path( + file, protocol, compress + ) file_ctx = open(file, 'rb') else: raise ValueError(f'unsupported input {file!r}') @@ -191,18 +208,33 @@ def save_binary( ) -> None: """Save array elements into a binary file. + :param file: File or filename to which the data is saved. + :param protocol: protocol to use + :param compress: compress algorithm to use + + .. note:: + If `file` is `str` it can specify `protocol` and `compress` as file extensions. + This functionality assumes `file=file_name.$protocol.$compress` where `$protocol` and `$compress` refer to a + string interpolation of the respective `protocol` and `compress` methods. + For example if `file=my_docarray.protobuf.lz4` then the binary data will be created using `protocol=protobuf` + and `compress=lz4`. + Comparing to :meth:`save_json`, it is faster and the file is smaller, but not human-readable. .. note:: To get a binary presentation in memory, use ``bytes(...)``. - :param protocol: protocol to use - :param compress: compress algorithm to use - :param file: File or filename to which the data is saved. """ if isinstance(file, io.BufferedWriter): file_ctx = nullcontext(file) else: + _protocol, _compress = protocol_and_compress_from_file_path(file) + + if _protocol is not None: + protocol = _protocol + if _compress is not None: + compress = _compress + file_ctx = open(file, 'wb') self.to_bytes(protocol=protocol, compress=compress, _file_ctx=file_ctx) diff --git a/docarray/helper.py b/docarray/helper.py index 811241e2da2..e1bb5f0d08e 100644 --- a/docarray/helper.py +++ b/docarray/helper.py @@ -2,8 +2,12 @@ import random import sys import uuid +import pathlib import warnings -from typing import Any, Dict, Optional, Sequence +from typing import Any, Dict, Optional, Sequence, Tuple + +ALLOWED_PROTOCOLS = {'pickle', 'protobuf', 'protobuf-array', 'pickle-array'} +ALLOWED_COMPRESSIONS = {'lz4', 'bz2', 'lzma', 'zlib', 'gzip'} __windows__ = sys.platform == 'win32' @@ -321,3 +325,66 @@ def dataclass_from_dict(klass, dikt): if isinstance(dikt, (tuple, list)): return [dataclass_from_dict(klass.__args__[0], f) for f in dikt] return dikt + + +def protocol_and_compress_from_file_path( + file_path: str, + default_protocol: Optional[str] = None, + default_compress: Optional[str] = None, +) -> Tuple[Optional[str], Optional[str]]: + """Extract protocol and compression algorithm from a string, use defaults if not found. + + :param file_path: path of a file. + :param default_protocol: default serialization protocol used in case not found. + :param default_compress: default compression method used in case not found. + + Examples: + + >>> protocol_and_compress_from_file_path('./docarray_fashion_mnist.protobuf.gzip') + ('protobuf', 'gzip') + + >>> protocol_and_compress_from_file_path('/Documents/docarray_fashion_mnist.protobuf') + ('protobuf', None) + + >>> protocol_and_compress_from_file_path('/Documents/docarray_fashion_mnist.gzip') + (None, gzip) + """ + + protocol = default_protocol + compress = default_compress + + file_extensions = [e.replace('.', '') for e in pathlib.Path(file_path).suffixes] + for extension in file_extensions: + if extension in ALLOWED_PROTOCOLS: + protocol = extension + elif extension in ALLOWED_COMPRESSIONS: + compress = extension + + return protocol, compress + + +def add_protocol_and_compress_to_file_path( + file_path: str, protocol: Optional[str] = None, compress: Optional[str] = None +) -> str: + """Creates a new file path with the protocol and compression methods as extensions. + + :param file_path: path of a file. + :param protocol: chosen protocol. + :param compress: compression algorithm. + + Examples: + + >>> add_protocol_and_compress_to_file_path('docarray_fashion_mnist.bin') + 'docarray_fashion_mnist.bin' + + >>> add_protocol_and_compress_to_file_path('docarray_fashion_mnist', 'protobuf', 'gzip') + 'docarray_fashion_mnist.protobuf.gzip' + """ + + file_path_extended = file_path + if protocol: + file_path_extended += '.' + protocol + if compress: + file_path_extended += '.' + compress + + return file_path_extended diff --git a/docs/fundamentals/documentarray/serialization.md b/docs/fundamentals/documentarray/serialization.md index ca3fe2a2728..50bdd01c18f 100644 --- a/docs/fundamentals/documentarray/serialization.md +++ b/docs/fundamentals/documentarray/serialization.md @@ -1,11 +1,14 @@ (docarray-serialization)= # Serialization -DocArray is designed to be "ready-to-wire" at anytime. Serialization is important. DocumentArray provides multiple serialization methods that allows one transfer DocumentArray object over network and across different microservices. +DocArray is designed to be "ready-to-wire" at anytime. Serialization is important. +DocumentArray provides multiple serialization methods that allows one transfer DocumentArray object over network and across different microservices. +Moreover, there is the ability to store/load `DocumentArray` objects to/from disk. - JSON string: `.from_json()`/`.to_json()` - Pydantic model: `.from_pydantic_model()`/`.to_pydantic_model()` - Bytes (compressed): `.from_bytes()`/`.to_bytes()` + - Disk serialization: `.save_binary()`/`.load_binary()` - Base64 (compressed): `.from_base64()`/`.to_base64()` - Protobuf Message: `.from_protobuf()`/`.to_protobuf()` - Python List: `.from_list()`/`.to_list()` @@ -15,7 +18,6 @@ DocArray is designed to be "ready-to-wire" at anytime. Serialization is importan - ## From/to JSON @@ -161,6 +163,46 @@ Afterwards, `doc1_bytes` describes how many bytes are used to serialize `doc1`, The pattern `dock_bytes` and `dock.to_bytes` is repeated `len(docs)` times. +### From/to Disk + +If you want to store a `DocumentArray` to disk you can use `.save_binary(filename, protocol, compress)` where `protocol` and `compress` refer to the protocol and compression methods used to serialize the data. +If you want to load a `DocumentArray` from disk you can use `.load_binary(filename, protocol, compress)`. + +For example, the following snippet shows how to save/load a `DocumentArray` in `my_docarray.bin`. + +```python +from docarray import DocumentArray, Document + +da = DocumentArray([Document(text='hello'), Document(text='world')]) + +da.save_binary('my_docarray.bin', protocol='protobuf', compress='lz4') +da_rec = DocumentArray.load_binary('my_docarray.bin', protocol='protobuf', compress='lz4') +da_rec == da +``` + +Note that in the previous code snippet the user needs to remember the protol and compression methods used to store the data in order to load it back correctly. `DocArray` allows you to specify `protocol` and `compress` as file extensions. +By doing so you can forget later on which protocol and compression methods were used to serialize the data to disk. +This functionality assumes `.save_binary` and `.load_binary` are called with `filename` following the form `file_name.$protocol.$compress`, where `$protocol` and `$compress` refer to a string interpolation of the respective `protocol` and `compress` methods. + +For example if `file=my_docarray.protobuf.lz4` then the binary data will be created using `protocol=protobuf` and `compress=lz4`. + +The previous code snippet can be simplified to + +```python +from docarray import DocumentArray, Document + +da = DocumentArray([Document(text='hello'), Document(text='world')]) + +da.save_binary('my_docarray.protobuf.lz4') +da_rec = DocumentArray.load_binary('my_docarray.protobuf.lz4') +da_rec == da +``` + +```{tip} +If you don't want to specify and remember `protocol` and `compress` to store/load to/from disk, save your `DocumentArray` `da` using +`da.save_binary('file_name.$protocol.$compress')` so that it can be loaded back with `DocumentArray.load_binary('file_name.$protocol.$compress')` +``` + ### Stream large binary serialization from disk In particular, if a serialization uses `protocol='pickle'` or `protocol='protobuf'`, then you can load it via streaming with a constant memory consumption by setting `streaming=True`: diff --git a/tests/unit/array/test_from_to_bytes.py b/tests/unit/array/test_from_to_bytes.py index 8ecc5083e0e..50c94dbf5b5 100644 --- a/tests/unit/array/test_from_to_bytes.py +++ b/tests/unit/array/test_from_to_bytes.py @@ -11,6 +11,9 @@ from tests import random_docs +from docarray.helper import add_protocol_and_compress_to_file_path + + def get_ndarrays_for_ravel(): a = np.random.random([100, 3]) a[a > 0.5] = 0 @@ -60,17 +63,30 @@ def test_to_from_bytes(target_da, protocol, compress, ndarray_val, is_sparse): ) @pytest.mark.parametrize('compress', ['lz4', 'bz2', 'lzma', 'zlib', 'gzip', None]) def test_save_bytes(target_da, protocol, compress, tmpfile): + + # tests .save_binary(file, protocol=protocol, compress=compress) target_da.save_binary(tmpfile, protocol=protocol, compress=compress) target_da.save_binary(str(tmpfile), protocol=protocol, compress=compress) with open(tmpfile, 'wb') as fp: target_da.save_binary(fp, protocol=protocol, compress=compress) - DocumentArray.load_binary(tmpfile, protocol=protocol, compress=compress) + da_from_protocol_compress = DocumentArray.load_binary( + tmpfile, protocol=protocol, compress=compress + ) DocumentArray.load_binary(str(tmpfile), protocol=protocol, compress=compress) with open(tmpfile, 'rb') as fp: DocumentArray.load_binary(fp, protocol=protocol, compress=compress) + # tests .save_binary(file.protocol.compress) without arguments `compression` and `protocol` + file_path_extended = add_protocol_and_compress_to_file_path( + str(tmpfile), protocol, compress + ) + + target_da.save_binary(file_path_extended) + da_from_file_extension = DocumentArray.load_binary(file_path_extended) + assert da_from_protocol_compress == da_from_file_extension + @pytest.mark.parametrize('target_da', [DocumentArray.empty(100), random_docs(100)]) def test_from_to_protobuf(target_da): @@ -106,6 +122,7 @@ def test_save_bytes_stream(tmpfile, protocol, compress): [Document(text='aaa'), Document(text='bbb'), Document(text='ccc')] ) da.save_binary(tmpfile, protocol=protocol, compress=compress) + da_reconstructed = DocumentArray.load_binary( tmpfile, protocol=protocol, compress=compress, streaming=True ) diff --git a/tests/unit/test_helper.py b/tests/unit/test_helper.py new file mode 100644 index 00000000000..fb2057c6df0 --- /dev/null +++ b/tests/unit/test_helper.py @@ -0,0 +1,50 @@ +import pytest +import pathlib + +from docarray.helper import ( + protocol_and_compress_from_file_path, + add_protocol_and_compress_to_file_path, +) + + +@pytest.mark.parametrize( + 'file_path', ['doc_array', '../docarray', './a_folder/docarray'] +) +@pytest.mark.parametrize( + 'protocol', ['protobuf', 'protobuf-array', 'pickle', 'pickle-array'] +) +@pytest.mark.parametrize('compress', ['lz4', 'bz2', 'lzma', 'zlib', 'gzip', None]) +def test_protocol_and_compress_from_file_path(file_path, protocol, compress): + + file_path_extended = file_path + if protocol: + file_path_extended += '.' + protocol + if compress: + file_path_extended += '.' + compress + + _protocol, _compress = protocol_and_compress_from_file_path(file_path_extended) + + assert _protocol in {'protobuf', 'protobuf-array', 'pickle', 'pickle-array', None} + assert _compress in {'lz4', 'bz2', 'lzma', 'zlib', 'gzip', None} + + assert protocol == _protocol + assert compress == _compress + + +@pytest.mark.parametrize('file_path', ['doc_array', './some_folder/doc_array']) +@pytest.mark.parametrize( + 'protocol', ['protobuf', 'protobuf-array', 'pickle', 'pickle-array'] +) +@pytest.mark.parametrize('compress', ['lz4', 'bz2', 'lzma', 'zlib', 'gzip']) +def test_add_protocol_and_compress_to_file_path(file_path, compress, protocol): + file_path_extended = add_protocol_and_compress_to_file_path( + file_path, compress, protocol + ) + file_path_suffixes = [ + e.replace('.', '') for e in pathlib.Path(file_path_extended).suffixes + ] + + if compress: + assert compress in file_path_suffixes + if protocol: + assert protocol in file_path_suffixes