From 25bd60b730f60d7226615c43a50adfa48fcb9b1e Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 16:26:43 +0100 Subject: [PATCH 01/11] feat: add v1 streaming protocol --- docarray/array/mixins/io/binary.py | 133 +++++++++++++++++++++++------ 1 file changed, 109 insertions(+), 24 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index f4451808801..544d0b5266c 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -1,6 +1,6 @@ +import base64 import io import os.path -import base64 import pickle from contextlib import nullcontext from typing import Union, BinaryIO, TYPE_CHECKING, Type, Optional @@ -22,6 +22,7 @@ def load_binary( protocol: str = 'pickle-array', compress: Optional[str] = None, _show_progress: bool = False, + return_iterator: bool = False, ) -> 'T': """Load array elements from a LZ4-compressed binary file. @@ -29,10 +30,10 @@ def load_binary( :param protocol: protocol to use :param compress: compress algorithm to use :param _show_progress: show progress bar, only works when protocol is `pickle` or `protobuf` - + :param return_iterator: returns an iterator over the DocumentArray. + In case protocol is pickle the `Documents` are streamed from disk to save memory usage :return: a DocumentArray object """ - if isinstance(file, io.BufferedReader): file_ctx = nullcontext(file) elif isinstance(file, bytes): @@ -41,37 +42,100 @@ def load_binary( file_ctx = open(file, 'rb') else: raise ValueError(f'unsupported input {file!r}') + if return_iterator: + return cls._load_binary_stream( + file_ctx, protocol=protocol, compress=compress + ) + else: + return cls._load_binary_all(file_ctx, protocol, compress, _show_progress) + + @classmethod + def _load_binary_stream( + cls: Type['T'], file_ctx: str, protocol=None, compress=None, show_progress=False + ) -> 'T': from .... import Document + if show_progress: + from rich.progress import track as _track + + track = lambda x: _track(x, description='Deserializing') + else: + track = lambda x: x + + with file_ctx as f: + version_numdocs_lendoc0 = f.read(9) + # 1 byte (uint8) + version = int.from_bytes(version_numdocs_lendoc0[0:1], 'big', signed=False) + # 8 bytes (uint64) + num_docs = int.from_bytes(version_numdocs_lendoc0[1:9], 'big', signed=False) + + for _ in track(range(num_docs)): + # 4 bytes (uint32) + len_current_doc_in_bytes = int.from_bytes( + f.read(4), 'big', signed=False + ) + yield Document.from_bytes( + f.read(len_current_doc_in_bytes), + protocol=protocol, + compress=compress, + ) + + @classmethod + def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): + from .... import Document + with file_ctx as fp: d = fp.read() if hasattr(fp, 'read') else fp + + if protocol == 'pickle-array' or protocol == 'protobuf-array': if get_compress_ctx(algorithm=compress) is not None: d = decompress_bytes(d, algorithm=compress) compress = None - if protocol == 'protobuf-array': - from ....proto.docarray_pb2 import DocumentArrayProto + if protocol == 'protobuf-array': + from ....proto.docarray_pb2 import DocumentArrayProto + + dap = DocumentArrayProto() + dap.ParseFromString(d) - dap = DocumentArrayProto() - dap.ParseFromString(d) + return cls.from_protobuf(dap) + elif protocol == 'pickle-array': + return pickle.loads(d) - return cls.from_protobuf(dap) - elif protocol == 'pickle-array': - return pickle.loads(d) + # Binary format for streaming case + else: + # 1 byte (uint8) + version = int.from_bytes(d[0:1], 'big', signed=False) + # 8 bytes (uint64) + num_docs = int.from_bytes(d[1:9], 'big', signed=False) + if show_progress: + from rich.progress import track as _track + + track = lambda x: _track(x, description='Deserializing') else: - _len = len(random_uuid().bytes) - _binary_delimiter = d[:_len] # first get delimiter - if _show_progress: - from rich.progress import track as _track + track = lambda x: x - track = lambda x: _track(x, description='Deserializing') - else: - track = lambda x: x - return cls( - Document.from_bytes(od, protocol=protocol, compress=compress) - for od in track(d[_len:].split(_binary_delimiter)) + # this 9 is version + num_docs bytes used + start_pos = 9 + docs = [] + + for _ in track(range(num_docs)): + # 4 bytes (uint32) + len_current_doc_in_bytes = int.from_bytes( + d[start_pos : start_pos + 4], 'big', signed=False + ) + start_doc_pos = start_pos + 4 + end_doc_pos = start_doc_pos + len_current_doc_in_bytes + start_pos = end_doc_pos + + # variable length bytes doc + doc = Document.from_bytes( + d[start_doc_pos:end_doc_pos], protocol=protocol, compress=compress ) + docs.append(doc) + + return cls(docs) @classmethod def from_bytes( @@ -130,8 +194,11 @@ def to_bytes( :return: the binary serialization in bytes """ - _binary_delimiter = random_uuid().bytes - compress_ctx = get_compress_ctx(compress, mode='wb') + if protocol == 'protobuf-array' or protocol == 'pickle-array': + compress_ctx = get_compress_ctx(compress, mode='wb') + else: + compress_ctx = None + with (_file_ctx or io.BytesIO()) as bf: if compress_ctx is None: # if compress do not support streaming then postpone the compress @@ -141,12 +208,14 @@ def to_bytes( f = compress_ctx(bf) fc = f compress = None + with fc: if protocol == 'protobuf-array': f.write(self.to_protobuf().SerializePartialToString()) elif protocol == 'pickle-array': f.write(pickle.dumps(self)) else: + # Binary format for streaming case if _show_progress: from rich.progress import track as _track @@ -154,9 +223,25 @@ def to_bytes( else: track = lambda x: x + # V1 DocArray streaming serialization format + # | 1 byte | 8 bytes | 4 bytes | variable | 4 bytes | variable ... + + # 1 byte (uint8) + version_byte = b'\x01' + # 8 bytes (uint64) + num_docs_as_bytes = len(self).to_bytes(8, 'big', signed=False) + f.write(version_byte + num_docs_as_bytes) + for d in track(self): - f.write(_binary_delimiter) - f.write(d.to_bytes(protocol=protocol, compress=compress)) + # 4 bytes (uint32) + doc_as_bytes = d.to_bytes(protocol=protocol, compress=compress) + + # variable size bytes + len_doc_as_bytes = len(doc_as_bytes).to_bytes( + 4, 'big', signed=False + ) + f.write(len_doc_as_bytes + doc_as_bytes) + if not _file_ctx: return bf.getvalue() From 9c3f24a1f1f091132b004fde25f1eb6ff3561b2f Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 16:27:17 +0100 Subject: [PATCH 02/11] test: add tests stream --- tests/unit/array/test_from_to_bytes.py | 37 ++++++++++++++++++-------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/tests/unit/array/test_from_to_bytes.py b/tests/unit/array/test_from_to_bytes.py index ffb0f5f3550..042573dafe0 100644 --- a/tests/unit/array/test_from_to_bytes.py +++ b/tests/unit/array/test_from_to_bytes.py @@ -1,10 +1,12 @@ +import types + import numpy as np import pytest import tensorflow as tf import torch from scipy.sparse import csr_matrix, coo_matrix, bsr_matrix, csc_matrix -from docarray import DocumentArray +from docarray import DocumentArray, Document from docarray.math.ndarray import to_numpy_array from tests import random_docs @@ -38,7 +40,7 @@ def test_to_from_bytes(target_da, protocol, compress, ndarray_val, is_sparse): assert len(da2) == len(target_da) target_da.embeddings = ndarray_val - target_da.tensors = ndarray_val + target_da.blobs = ndarray_val bstr = target_da.to_bytes(protocol=protocol, compress=compress) print(protocol, compress, len(bstr)) da2 = DocumentArray.from_bytes(bstr, protocol=protocol, compress=compress) @@ -48,7 +50,7 @@ def test_to_from_bytes(target_da, protocol, compress, ndarray_val, is_sparse): to_numpy_array(target_da.embeddings), to_numpy_array(da2.embeddings) ) np.testing.assert_almost_equal( - to_numpy_array(target_da.tensors), to_numpy_array(da2.tensors) + to_numpy_array(target_da.blobs), to_numpy_array(da2.blobs) ) @@ -70,19 +72,32 @@ def test_save_bytes(target_da, protocol, compress, tmpfile): DocumentArray.load_binary(fp, protocol=protocol, compress=compress) +# Note protocol = ['protobuf-array', 'pickle-array'] not supported with Document.from_bytes +@pytest.mark.parametrize('protocol', ['protobuf', 'pickle']) +@pytest.mark.parametrize( + 'compress', ['lz4', 'bz2', 'lzma', 'gzip', 'zlib', 'gzib', None] +) +def test_save_bytes_stream(tmpfile, protocol, compress): + da = DocumentArray( + [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] + ) + da.save_binary(tmpfile, protocol=protocol, compress=compress) + da_reconstructed = DocumentArray.load_binary( + tmpfile, protocol=protocol, compress=compress, return_iterator=True + ) + assert isinstance(da_reconstructed, types.GeneratorType) + for d, d_rec in zip(da, da_reconstructed): + assert d == d_rec + + @pytest.mark.parametrize('target_da', [DocumentArray.empty(100), random_docs(100)]) def test_from_to_protobuf(target_da): DocumentArray.from_protobuf(target_da.to_protobuf()) -@pytest.mark.parametrize('target', [DocumentArray.empty(10), random_docs(10)]) -@pytest.mark.parametrize('protocol', ['jsonschema', 'protobuf']) -@pytest.mark.parametrize('to_fn', ['dict', 'json']) -def test_from_to_safe_list(target, protocol, to_fn): - da_r = getattr(DocumentArray, f'from_{to_fn}')( - getattr(target, f'to_{to_fn}')(protocol=protocol), protocol=protocol - ) - assert da_r == target +@pytest.mark.parametrize('target_da', [DocumentArray.empty(100), random_docs(100)]) +def test_from_to_safe_list(target_da): + DocumentArray.from_list(target_da.to_list()) @pytest.mark.parametrize('protocol', ['protobuf', 'pickle']) From 0d83dbde20f862c7341e1541fbc025cfac16c383 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 16:28:31 +0100 Subject: [PATCH 03/11] docs: add v1 serialization definition --- .../documentarray/serialization.md | 55 ++++++------------- 1 file changed, 17 insertions(+), 38 deletions(-) diff --git a/docs/fundamentals/documentarray/serialization.md b/docs/fundamentals/documentarray/serialization.md index 87ceb1b10af..b04aa3dd179 100644 --- a/docs/fundamentals/documentarray/serialization.md +++ b/docs/fundamentals/documentarray/serialization.md @@ -3,8 +3,7 @@ DocArray is designed to be "ready-to-wire" at anytime. Serialization is important. DocumentArray provides multiple serialization methods that allows one transfer DocumentArray object over network and across different microservices. -- JSON string: `.from_json()`/`.to_json()` - - Pydantic model: `.from_pydantic_model()`/`.to_pydantic_model()` +- JSON string: `.from_json()`/`.to_json()` - Bytes (compressed): `.from_bytes()`/`.to_bytes()` - Base64 (compressed): `.from_base64()`/`.to_base64()` - Protobuf Message: `.from_protobuf()`/`.to_protobuf()` @@ -12,23 +11,12 @@ DocArray is designed to be "ready-to-wire" at anytime. Serialization is importan - Pandas Dataframe: `.from_dataframe()`/`.to_dataframe()` - Cloud: `.push()`/`.pull()` - - - - ## From/to JSON - -```{tip} -If you are building a webservice and want to use JSON for passing DocArray objects, then data validation and field-filtering can be crucial. In this case, it is highly recommended to check out {ref}`fastapi-support` and follow the methods there. -``` - ```{important} -Depending on which protocol you use, this feature requires `pydantic` or `protobuf` dependency. You can do `pip install "docarray[full]"` to install it. +This feature requires `protobuf` dependency. You can do `pip install "docarray[full]"` to install it. ``` - - ```python from docarray import DocumentArray, Document @@ -37,7 +25,7 @@ da.to_json() ``` ```text -[{"id": "a677577877b611eca3811e008a366d49", "parent_id": null, "granularity": null, "adjacency": null, "blob": null, "tensor": null, "mime_type": "text/plain", "text": "hello", "weight": null, "uri": null, "tags": null, "offset": null, "location": null, "embedding": null, "modality": null, "evaluations": null, "scores": null, "chunks": null, "matches": null}, {"id": "a67758f477b611eca3811e008a366d49", "parent_id": null, "granularity": null, "adjacency": null, "blob": null, "tensor": null, "mime_type": "text/plain", "text": "world", "weight": null, "uri": null, "tags": null, "offset": null, "location": null, "embedding": null, "modality": null, "evaluations": null, "scores": null, "chunks": null, "matches": null}] +[{"id": "72db9a7e6e3211ec97f51e008a366d49", "text": "hello", "mime_type": "text/plain"}, {"id": "72db9cb86e3211ec97f51e008a366d49", "text": "world", "mime_type": "text/plain"}] ``` @@ -65,11 +53,6 @@ da_r.summary() ``` -```{seealso} -More parameters and usages can be found in the Document-level {ref}`doc-json`. -``` - - ## From/to bytes ```{important} @@ -143,21 +126,23 @@ Depending on how you want to interpret the results, the figures above can be an ### Wire format of `pickle` and `protobuf` -When set `protocol=pickle` or `protobuf`, the result binary string looks like the following: +When set `protocol=pickle` or `protobuf`, the resulting bytes look like the following: ```text ------------------------------------------------------------------------------------ -| Delimiter | doc1.to_bytes() | Delimiter | doc2.to_bytes() | Delimiter | ... ------------------------------------------------------------------------------------ - | | - | | - | | - Fixed-length | - | - Variable-length +-------------------------------------------------------------------------------------------------------- +| version | len(docs) | doc1_bytes | doc1.to_bytes() | doc2_bytes | doc2.to_bytes() ... +--------------------------------------------------------------------------------------------------------- +| Fixed-length | Fixed-length | Fixed-length | Variable-length | Fixed-length | Variable-length ... +-------------------------------------------------------------------------------------------------------- + | | | | | | + uint8 uint64 uint32 Variable-length ... ... + ``` -Here `Delimiter` is a 16-bytes separator such as `b'g\x81\xcc\x1c\x0f\x93L\xed\xa2\xb0s)\x9c\xf9\xf6\xf2'` used for setting the boundary of each Document's serialization. Given a `to_bytes(protocol='pickle/protobuf')` binary string, once we know the first 16 bytes, the boundary is clear. Consequently, one can leverage this format to stream Documents, drop, skip, or early-stop, etc. +Here `version` is a `uint8` that specifies the serialization version of the `DocumentArray` serialization format, followed by `len(docs)` which is a `uint64` that specifies the amount of serialized documents. +Afterwards, `doc1_bytes` describes how many bytes are used to serialize `doc1`, followed by `doc1.to_bytes()` which is the bytes data of the document itself. +The pattern `dock_bytes` and `dock.to_bytes` is repeated `len(docs)` times. + ## From/to base64 @@ -226,10 +211,6 @@ docs { ## From/to list -```{important} -This feature requires `protobuf` or `pydantic` dependency. You can do `pip install "docarray[full]"` to install it. -``` - Serializing to/from Python list is less frequently used for the same reason as `Document.to_dict()`: it is often an intermediate step of serializing to JSON. You can do: ```python @@ -243,9 +224,7 @@ da.to_list() [{'id': 'ae55782a6e4d11ec803c1e008a366d49', 'text': 'hello', 'mime_type': 'text/plain'}, {'id': 'ae557a146e4d11ec803c1e008a366d49', 'text': 'world', 'mime_type': 'text/plain'}] ``` -```{seealso} -More parameters and usages can be found in the Document-level {ref}`doc-dict`. -``` +There is an argument `strict` shares {ref}`the same semantic` as in `Document.to_dict()`. ## From/to dataframe From 2eaf9490f4eb225db8c6f19b88044521230bfc38 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 17:05:09 +0100 Subject: [PATCH 04/11] test: add missing import --- tests/unit/array/test_from_to_bytes.py | 50 ++++++++++++++------------ 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/tests/unit/array/test_from_to_bytes.py b/tests/unit/array/test_from_to_bytes.py index 042573dafe0..e014feb5426 100644 --- a/tests/unit/array/test_from_to_bytes.py +++ b/tests/unit/array/test_from_to_bytes.py @@ -1,5 +1,4 @@ import types - import numpy as np import pytest import tensorflow as tf @@ -40,7 +39,7 @@ def test_to_from_bytes(target_da, protocol, compress, ndarray_val, is_sparse): assert len(da2) == len(target_da) target_da.embeddings = ndarray_val - target_da.blobs = ndarray_val + target_da.tensors = ndarray_val bstr = target_da.to_bytes(protocol=protocol, compress=compress) print(protocol, compress, len(bstr)) da2 = DocumentArray.from_bytes(bstr, protocol=protocol, compress=compress) @@ -50,7 +49,7 @@ def test_to_from_bytes(target_da, protocol, compress, ndarray_val, is_sparse): to_numpy_array(target_da.embeddings), to_numpy_array(da2.embeddings) ) np.testing.assert_almost_equal( - to_numpy_array(target_da.blobs), to_numpy_array(da2.blobs) + to_numpy_array(target_da.tensors), to_numpy_array(da2.tensors) ) @@ -72,6 +71,30 @@ def test_save_bytes(target_da, protocol, compress, tmpfile): DocumentArray.load_binary(fp, protocol=protocol, compress=compress) +@pytest.mark.parametrize('target_da', [DocumentArray.empty(100), random_docs(100)]) +def test_from_to_protobuf(target_da): + DocumentArray.from_protobuf(target_da.to_protobuf()) + + +@pytest.mark.parametrize('target', [DocumentArray.empty(10), random_docs(10)]) +@pytest.mark.parametrize('protocol', ['jsonschema', 'protobuf']) +@pytest.mark.parametrize('to_fn', ['dict', 'json']) +def test_from_to_safe_list(target, protocol, to_fn): + da_r = getattr(DocumentArray, f'from_{to_fn}')( + getattr(target, f'to_{to_fn}')(protocol=protocol), protocol=protocol + ) + assert da_r == target + + +@pytest.mark.parametrize('protocol', ['protobuf', 'pickle']) +@pytest.mark.parametrize('show_progress', [True, False]) +def test_push_pull_show_progress(show_progress, protocol): + da = DocumentArray.empty(1000) + r = da.to_bytes(_show_progress=show_progress, protocol=protocol) + da_r = DocumentArray.from_bytes(r, _show_progress=show_progress, protocol=protocol) + assert da == da_r + + # Note protocol = ['protobuf-array', 'pickle-array'] not supported with Document.from_bytes @pytest.mark.parametrize('protocol', ['protobuf', 'pickle']) @pytest.mark.parametrize( @@ -79,7 +102,7 @@ def test_save_bytes(target_da, protocol, compress, tmpfile): ) def test_save_bytes_stream(tmpfile, protocol, compress): da = DocumentArray( - [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] + [Document(text='aaa'), Document(text='bbb'), Document(text='dasdw')] ) da.save_binary(tmpfile, protocol=protocol, compress=compress) da_reconstructed = DocumentArray.load_binary( @@ -88,22 +111,3 @@ def test_save_bytes_stream(tmpfile, protocol, compress): assert isinstance(da_reconstructed, types.GeneratorType) for d, d_rec in zip(da, da_reconstructed): assert d == d_rec - - -@pytest.mark.parametrize('target_da', [DocumentArray.empty(100), random_docs(100)]) -def test_from_to_protobuf(target_da): - DocumentArray.from_protobuf(target_da.to_protobuf()) - - -@pytest.mark.parametrize('target_da', [DocumentArray.empty(100), random_docs(100)]) -def test_from_to_safe_list(target_da): - DocumentArray.from_list(target_da.to_list()) - - -@pytest.mark.parametrize('protocol', ['protobuf', 'pickle']) -@pytest.mark.parametrize('show_progress', [True, False]) -def test_push_pull_show_progress(show_progress, protocol): - da = DocumentArray.empty(1000) - r = da.to_bytes(_show_progress=show_progress, protocol=protocol) - da_r = DocumentArray.from_bytes(r, _show_progress=show_progress, protocol=protocol) - assert da == da_r From 6746e2e213ef25c6a3229fe09dbb52471dbbdfa3 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 17:34:50 +0100 Subject: [PATCH 05/11] refactor: add explicit elif branch --- docarray/array/mixins/io/binary.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 544d0b5266c..cfeb1278cd1 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -214,7 +214,7 @@ def to_bytes( f.write(self.to_protobuf().SerializePartialToString()) elif protocol == 'pickle-array': f.write(pickle.dumps(self)) - else: + elif protocol in ('pickle', 'protobuf'): # Binary format for streaming case if _show_progress: from rich.progress import track as _track @@ -241,6 +241,10 @@ def to_bytes( 4, 'big', signed=False ) f.write(len_doc_as_bytes + doc_as_bytes) + else: + raise ValueError( + f'protocol={protocol} is not supported. Can be only `protobuf`,`pickle`,`protobuf-array`,`pickle-array`.' + ) if not _file_ctx: return bf.getvalue() From 5f28792228885c34026cd5acf1c704a86bdaad6d Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Wed, 19 Jan 2022 11:31:55 +0100 Subject: [PATCH 06/11] docs: update documentation --- docarray/array/mixins/io/binary.py | 32 ++++++++++---- .../documentarray/serialization.md | 42 ++++++++++++++++--- 2 files changed, 60 insertions(+), 14 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index cfeb1278cd1..fc740ae458a 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -3,7 +3,7 @@ import os.path import pickle from contextlib import nullcontext -from typing import Union, BinaryIO, TYPE_CHECKING, Type, Optional +from typing import Union, BinaryIO, TYPE_CHECKING, Type, Optional, Generator from ....helper import random_uuid, __windows__, get_compress_ctx, decompress_bytes @@ -22,15 +22,15 @@ def load_binary( protocol: str = 'pickle-array', compress: Optional[str] = None, _show_progress: bool = False, - return_iterator: bool = False, - ) -> 'T': - """Load array elements from a LZ4-compressed binary file. + streaming: bool = False, + ) -> Union['DocumentArray', Generator['Document']]: + """Load array elements from a compressed binary file. :param file: File or filename or serialized bytes where the data is stored. :param protocol: protocol to use :param compress: compress algorithm to use :param _show_progress: show progress bar, only works when protocol is `pickle` or `protobuf` - :param return_iterator: returns an iterator over the DocumentArray. + :param streaming: if `True` returns a generator over `Document` objects. In case protocol is pickle the `Documents` are streamed from disk to save memory usage :return: a DocumentArray object """ @@ -42,9 +42,9 @@ def load_binary( file_ctx = open(file, 'rb') else: raise ValueError(f'unsupported input {file!r}') - if return_iterator: - return cls._load_binary_stream( - file_ctx, protocol=protocol, compress=compress + if streaming: + yield from cls._load_binary_stream( + file_ctx, protocol=protocol, compress=compress, _show_progress ) else: return cls._load_binary_all(file_ctx, protocol, compress, _show_progress) @@ -52,7 +52,14 @@ def load_binary( @classmethod def _load_binary_stream( cls: Type['T'], file_ctx: str, protocol=None, compress=None, show_progress=False - ) -> 'T': + ) -> Generator['Document']: + """Yield `Document` objects from a binary file + + :param protocol: protocol to use + :param compress: compress algorithm to use + :param _show_progress: show progress bar, only works when protocol is `pickle` or `protobuf` + :return: a generator of `Document` objects + """ from .... import Document @@ -83,6 +90,13 @@ def _load_binary_stream( @classmethod def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): + """Read a `DocumentArray` object from a binary file + + :param protocol: protocol to use + :param compress: compress algorithm to use + :param _show_progress: show progress bar, only works when protocol is `pickle` or `protobuf` + :return: a `DocumentArray` + """ from .... import Document with file_ctx as fp: diff --git a/docs/fundamentals/documentarray/serialization.md b/docs/fundamentals/documentarray/serialization.md index b04aa3dd179..96c50a5cec1 100644 --- a/docs/fundamentals/documentarray/serialization.md +++ b/docs/fundamentals/documentarray/serialization.md @@ -3,7 +3,8 @@ DocArray is designed to be "ready-to-wire" at anytime. Serialization is important. DocumentArray provides multiple serialization methods that allows one transfer DocumentArray object over network and across different microservices. -- JSON string: `.from_json()`/`.to_json()` +- JSON string: `.from_json()`/`.to_json()` + - Pydantic model: `.from_pydantic_model()`/`.to_pydantic_model()` - Bytes (compressed): `.from_bytes()`/`.to_bytes()` - Base64 (compressed): `.from_base64()`/`.to_base64()` - Protobuf Message: `.from_protobuf()`/`.to_protobuf()` @@ -11,12 +12,23 @@ DocArray is designed to be "ready-to-wire" at anytime. Serialization is importan - Pandas Dataframe: `.from_dataframe()`/`.to_dataframe()` - Cloud: `.push()`/`.pull()` + + + + ## From/to JSON + +```{tip} +If you are building a webservice and want to use JSON for passing DocArray objects, then data validation and field-filtering can be crucial. In this case, it is highly recommended to check out {ref}`fastapi-support` and follow the methods there. +``` + ```{important} -This feature requires `protobuf` dependency. You can do `pip install "docarray[full]"` to install it. +Depending on which protocol you use, this feature requires `pydantic` or `protobuf` dependency. You can do `pip install "docarray[full]"` to install it. ``` + + ```python from docarray import DocumentArray, Document @@ -25,7 +37,7 @@ da.to_json() ``` ```text -[{"id": "72db9a7e6e3211ec97f51e008a366d49", "text": "hello", "mime_type": "text/plain"}, {"id": "72db9cb86e3211ec97f51e008a366d49", "text": "world", "mime_type": "text/plain"}] +[{"id": "a677577877b611eca3811e008a366d49", "parent_id": null, "granularity": null, "adjacency": null, "blob": null, "tensor": null, "mime_type": "text/plain", "text": "hello", "weight": null, "uri": null, "tags": null, "offset": null, "location": null, "embedding": null, "modality": null, "evaluations": null, "scores": null, "chunks": null, "matches": null}, {"id": "a67758f477b611eca3811e008a366d49", "parent_id": null, "granularity": null, "adjacency": null, "blob": null, "tensor": null, "mime_type": "text/plain", "text": "world", "weight": null, "uri": null, "tags": null, "offset": null, "location": null, "embedding": null, "modality": null, "evaluations": null, "scores": null, "chunks": null, "matches": null}] ``` @@ -53,6 +65,11 @@ da_r.summary() ``` +```{seealso} +More parameters and usages can be found in the Document-level {ref}`doc-json`. +``` + + ## From/to bytes ```{important} @@ -144,6 +161,13 @@ Afterwards, `doc1_bytes` describes how many bytes are used to serialize `doc1`, The pattern `dock_bytes` and `dock.to_bytes` is repeated `len(docs)` times. +### Streaming + +A `DocumentArray` can be streammed from a serialized file as shown in the following example + + + + ## From/to base64 ```{important} @@ -211,6 +235,10 @@ docs { ## From/to list +```{important} +This feature requires `protobuf` or `pydantic` dependency. You can do `pip install "docarray[full]"` to install it. +``` + Serializing to/from Python list is less frequently used for the same reason as `Document.to_dict()`: it is often an intermediate step of serializing to JSON. You can do: ```python @@ -224,7 +252,9 @@ da.to_list() [{'id': 'ae55782a6e4d11ec803c1e008a366d49', 'text': 'hello', 'mime_type': 'text/plain'}, {'id': 'ae557a146e4d11ec803c1e008a366d49', 'text': 'world', 'mime_type': 'text/plain'}] ``` -There is an argument `strict` shares {ref}`the same semantic` as in `Document.to_dict()`. +```{seealso} +More parameters and usages can be found in the Document-level {ref}`doc-dict`. +``` ## From/to dataframe @@ -284,4 +314,6 @@ da = DocumentArray.pull(token='myda123') Now you can continue the work at local, analyzing `da` or visualizing it. Your friends & colleagues who know the token `myda123` can also pull that DocumentArray. It's useful when you want to quickly share the results with your colleagues & friends. -The maximum size of an upload is 4GB under the `protocol='protobuf'` and `compress='gzip'` setting. The lifetime of an upload is one week after its creation. \ No newline at end of file +The maximum size of an upload is 4GB under the `protocol='protobuf'` and `compress='gzip'` setting. The lifetime of an upload is one week after its creation. + + From 763c3a669c981b91e7bdb17ab34f81c43632ea2f Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Wed, 19 Jan 2022 11:46:47 +0100 Subject: [PATCH 07/11] style: update type hints --- docarray/array/mixins/io/binary.py | 12 ++++++------ tests/unit/array/test_from_to_bytes.py | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index fc740ae458a..11bbb91093f 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -23,7 +23,7 @@ def load_binary( compress: Optional[str] = None, _show_progress: bool = False, streaming: bool = False, - ) -> Union['DocumentArray', Generator['Document']]: + ) -> Union['DocumentArray', Generator['Document', None, None]]: """Load array elements from a compressed binary file. :param file: File or filename or serialized bytes where the data is stored. @@ -43,16 +43,16 @@ def load_binary( else: raise ValueError(f'unsupported input {file!r}') if streaming: - yield from cls._load_binary_stream( - file_ctx, protocol=protocol, compress=compress, _show_progress + return cls._load_binary_stream( + file_ctx, protocol=protocol, compress=compress, _show_progress=_show_progress ) else: return cls._load_binary_all(file_ctx, protocol, compress, _show_progress) @classmethod def _load_binary_stream( - cls: Type['T'], file_ctx: str, protocol=None, compress=None, show_progress=False - ) -> Generator['Document']: + cls: Type['T'], file_ctx: str, protocol=None, compress=None, _show_progress=False + ) -> Generator['Document', None, None]: """Yield `Document` objects from a binary file :param protocol: protocol to use @@ -63,7 +63,7 @@ def _load_binary_stream( from .... import Document - if show_progress: + if _show_progress: from rich.progress import track as _track track = lambda x: _track(x, description='Deserializing') diff --git a/tests/unit/array/test_from_to_bytes.py b/tests/unit/array/test_from_to_bytes.py index e014feb5426..c2f5a8a0665 100644 --- a/tests/unit/array/test_from_to_bytes.py +++ b/tests/unit/array/test_from_to_bytes.py @@ -102,11 +102,11 @@ def test_push_pull_show_progress(show_progress, protocol): ) def test_save_bytes_stream(tmpfile, protocol, compress): da = DocumentArray( - [Document(text='aaa'), Document(text='bbb'), Document(text='dasdw')] + [Document(text='aaa'), Document(text='bbb'), Document(text='ccc')] ) da.save_binary(tmpfile, protocol=protocol, compress=compress) da_reconstructed = DocumentArray.load_binary( - tmpfile, protocol=protocol, compress=compress, return_iterator=True + tmpfile, protocol=protocol, compress=compress, streaming=True ) assert isinstance(da_reconstructed, types.GeneratorType) for d, d_rec in zip(da, da_reconstructed): From dfa40e52b7d074dcda3f7291cb152fa76a95293d Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Wed, 19 Jan 2022 11:47:29 +0100 Subject: [PATCH 08/11] style: follow black --- docarray/array/mixins/io/binary.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 11bbb91093f..2cb9731e87d 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -44,14 +44,21 @@ def load_binary( raise ValueError(f'unsupported input {file!r}') if streaming: return cls._load_binary_stream( - file_ctx, protocol=protocol, compress=compress, _show_progress=_show_progress + file_ctx, + protocol=protocol, + compress=compress, + _show_progress=_show_progress, ) else: return cls._load_binary_all(file_ctx, protocol, compress, _show_progress) @classmethod def _load_binary_stream( - cls: Type['T'], file_ctx: str, protocol=None, compress=None, _show_progress=False + cls: Type['T'], + file_ctx: str, + protocol=None, + compress=None, + _show_progress=False, ) -> Generator['Document', None, None]: """Yield `Document` objects from a binary file From 891e7868b06443d54efec779d341f214587a9920 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Wed, 19 Jan 2022 11:49:28 +0100 Subject: [PATCH 09/11] style: add type checking info for document --- docarray/array/mixins/io/binary.py | 1 + 1 file changed, 1 insertion(+) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 2cb9731e87d..3b7b772c1c6 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -10,6 +10,7 @@ if TYPE_CHECKING: from ....types import T from ....proto.docarray_pb2 import DocumentArrayProto + from .... import Document, DocumentArray class BinaryIOMixin: From 2c385c5635a97162c80692989d906a93d0d32eeb Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Wed, 19 Jan 2022 12:18:44 +0100 Subject: [PATCH 10/11] docs: add code snippet --- docs/fundamentals/documentarray/serialization.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/fundamentals/documentarray/serialization.md b/docs/fundamentals/documentarray/serialization.md index 96c50a5cec1..e276e43b8ac 100644 --- a/docs/fundamentals/documentarray/serialization.md +++ b/docs/fundamentals/documentarray/serialization.md @@ -163,8 +163,14 @@ The pattern `dock_bytes` and `dock.to_bytes` is repeated `len(docs)` times. ### Streaming -A `DocumentArray` can be streammed from a serialized file as shown in the following example +A `DocumentArray` can be streamed from a serialized file as shown in the following example +```python +da_generator = DocumentArray.load_binary('documentarray.bin', protocol='pickle', compress='gzip', streaming=True) +for d in da_reconstructed: + # work here with `d` as a Document object + print(d.text) +``` From 0c17da6f944d5253c3875c750d9bc4b26d74b328 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Wed, 19 Jan 2022 12:32:35 +0100 Subject: [PATCH 11/11] docs: update python snippet --- docs/fundamentals/documentarray/serialization.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/fundamentals/documentarray/serialization.md b/docs/fundamentals/documentarray/serialization.md index e276e43b8ac..be95fd452a6 100644 --- a/docs/fundamentals/documentarray/serialization.md +++ b/docs/fundamentals/documentarray/serialization.md @@ -167,7 +167,7 @@ A `DocumentArray` can be streamed from a serialized file as shown in the followi ```python da_generator = DocumentArray.load_binary('documentarray.bin', protocol='pickle', compress='gzip', streaming=True) -for d in da_reconstructed: +for d in da_generator: # work here with `d` as a Document object print(d.text) ```