From 3c1fbc7c2d6c1829ba76a1e3675fe28383df184f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20Ho=CC=88nicke?= Date: Thu, 13 Jan 2022 19:21:41 +0100 Subject: [PATCH 01/28] feat: initial draft --- docarray/array/mixins/io/binary.py | 39 ++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index f4451808801..89d8ba96bb2 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -73,6 +73,45 @@ def load_binary( for od in track(d[_len:].split(_binary_delimiter)) ) + + @classmethod + def load_binary_stream( + cls: Type['T'], + file: Union[str, BinaryIO, bytes], + ) -> 'T': + + if os.path.exists(file): + file_ctx = open(file, 'rb') + else: + raise ValueError(f'unsupported input {file!r}') + + from .... import Document + + delimiter = None + current_bytes = b'' + with file_ctx as fp: + while True: + b = current_bytes + fp.read(500) + if delimiter is None: + _len = len(random_uuid().bytes) + _binary_delimiter = b[:_len] + split = b.split(_binary_delimiter) + for d, _ in zip(split, range(len(split) - 1)): + if len(d) > 0: + d = Document.from_bytes(d) + print(d) + yield d + current_bytes = split[-1] + if b == b'': + break + + + + @classmethod + def _get_batches(cls, gen, batch_size): + for i in range(0, len(gen), batch_size): + yield gen[i:i + batch_size] + @classmethod def from_bytes( cls: Type['T'], From 5403b03d7eb51ceceb963785574161a134d24a27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20Ho=CC=88nicke?= Date: Thu, 13 Jan 2022 20:12:44 +0100 Subject: [PATCH 02/28] feat: constant delimiter len --- docarray/array/mixins/io/binary.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 89d8ba96bb2..aad39bcaebc 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -93,8 +93,7 @@ def load_binary_stream( while True: b = current_bytes + fp.read(500) if delimiter is None: - _len = len(random_uuid().bytes) - _binary_delimiter = b[:_len] + _binary_delimiter = b[:16] # 16 is the length of the delimiter split = b.split(_binary_delimiter) for d, _ in zip(split, range(len(split) - 1)): if len(d) > 0: From b6a64919eecde6254b22bacfcdef807303ea6760 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20Ho=CC=88nicke?= Date: Thu, 13 Jan 2022 20:27:39 +0100 Subject: [PATCH 03/28] fix: offset error --- docarray/array/mixins/io/binary.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index aad39bcaebc..6325a63743a 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -91,17 +91,20 @@ def load_binary_stream( current_bytes = b'' with file_ctx as fp: while True: - b = current_bytes + fp.read(500) + new_bytes = fp.read(500) + if new_bytes == b'': + break + b = current_bytes + new_bytes if delimiter is None: - _binary_delimiter = b[:16] # 16 is the length of the delimiter - split = b.split(_binary_delimiter) + delimiter = b[:16] # 16 is the length of the delimiter + split = b.split(delimiter) for d, _ in zip(split, range(len(split) - 1)): if len(d) > 0: d = Document.from_bytes(d) print(d) yield d - current_bytes = split[-1] - if b == b'': + current_bytes = split[-1] + if new_bytes == b'': break From c30138fd4f0f9e5a26c2922ee6eeb123edd11f7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20Ho=CC=88nicke?= Date: Fri, 14 Jan 2022 09:35:00 +0100 Subject: [PATCH 04/28] test: load stream --- docarray/array/mixins/io/binary.py | 152 ++++++++++++------------- tests/unit/array/test_from_to_bytes.py | 12 +- 2 files changed, 85 insertions(+), 79 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 6325a63743a..8238c82f130 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -1,6 +1,6 @@ +import base64 import io import os.path -import base64 import pickle from contextlib import nullcontext from typing import Union, BinaryIO, TYPE_CHECKING, Type, Optional @@ -17,11 +17,12 @@ class BinaryIOMixin: @classmethod def load_binary( - cls: Type['T'], - file: Union[str, BinaryIO, bytes], - protocol: str = 'pickle-array', - compress: Optional[str] = None, - _show_progress: bool = False, + cls: Type['T'], + file: Union[str, BinaryIO, bytes], + protocol: str = 'pickle-array', + compress: Optional[str] = None, + _show_progress: bool = False, + return_iterator: bool = False, ) -> 'T': """Load array elements from a LZ4-compressed binary file. @@ -29,7 +30,8 @@ def load_binary( :param protocol: protocol to use :param compress: compress algorithm to use :param _show_progress: show progress bar, only works when protocol is `pickle` or `protobuf` - + :param return_iterator: returns an iterator over the DocumentArray. + In case protocol is pickle the `Documents` are streamed from disk to save memory usage :return: a DocumentArray object """ @@ -41,73 +43,67 @@ def load_binary( file_ctx = open(file, 'rb') else: raise ValueError(f'unsupported input {file!r}') - - from .... import Document - - with file_ctx as fp: - d = fp.read() if hasattr(fp, 'read') else fp - if get_compress_ctx(algorithm=compress) is not None: - d = decompress_bytes(d, algorithm=compress) - compress = None - - if protocol == 'protobuf-array': - from ....proto.docarray_pb2 import DocumentArrayProto - - dap = DocumentArrayProto() - dap.ParseFromString(d) - - return cls.from_protobuf(dap) - elif protocol == 'pickle-array': - return pickle.loads(d) - else: - _len = len(random_uuid().bytes) - _binary_delimiter = d[:_len] # first get delimiter - if _show_progress: - from rich.progress import track as _track - - track = lambda x: _track(x, description='Deserializing') - else: - track = lambda x: x - return cls( - Document.from_bytes(od, protocol=protocol, compress=compress) - for od in track(d[_len:].split(_binary_delimiter)) - ) - + if return_iterator: + return cls._load_binary_stream(file_ctx) + else: + return cls._load_binary_all(file_ctx, protocol, compress, _show_progress) @classmethod - def load_binary_stream( - cls: Type['T'], - file: Union[str, BinaryIO, bytes], + def _load_binary_stream( + cls: Type['T'], + file_ctx: str, + block_size: int = 500, ) -> 'T': - - if os.path.exists(file): - file_ctx = open(file, 'rb') - else: - raise ValueError(f'unsupported input {file!r}') - from .... import Document - delimiter = None current_bytes = b'' with file_ctx as fp: while True: - new_bytes = fp.read(500) - if new_bytes == b'': - break + new_bytes = fp.read(block_size) b = current_bytes + new_bytes if delimiter is None: delimiter = b[:16] # 16 is the length of the delimiter split = b.split(delimiter) for d, _ in zip(split, range(len(split) - 1)): if len(d) > 0: - d = Document.from_bytes(d) - print(d) - yield d + yield Document.from_bytes(d) current_bytes = split[-1] if new_bytes == b'': + if current_bytes != b'': + yield Document.from_bytes(current_bytes) break + @classmethod + def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): + from .... import Document + with file_ctx as fp: + d = fp.read() if hasattr(fp, 'read') else fp + if get_compress_ctx(algorithm=compress) is not None: + d = decompress_bytes(d, algorithm=compress) + compress = None + + if protocol == 'protobuf-array': + from ....proto.docarray_pb2 import DocumentArrayProto + dap = DocumentArrayProto() + dap.ParseFromString(d) + + return cls.from_protobuf(dap) + elif protocol == 'pickle-array': + return pickle.loads(d) + else: + _len = len(random_uuid().bytes) + _binary_delimiter = d[:_len] # first get delimiter + if show_progress: + from rich.progress import track as _track + + track = lambda x: _track(x, description='Deserializing') + else: + track = lambda x: x + return cls( + Document.from_bytes(od, protocol=protocol, compress=compress) + for od in track(d[_len:].split(_binary_delimiter)) + ) @classmethod def _get_batches(cls, gen, batch_size): @@ -116,21 +112,21 @@ def _get_batches(cls, gen, batch_size): @classmethod def from_bytes( - cls: Type['T'], - data: bytes, - protocol: str = 'pickle-array', - compress: Optional[str] = None, - _show_progress: bool = False, + cls: Type['T'], + data: bytes, + protocol: str = 'pickle-array', + compress: Optional[str] = None, + _show_progress: bool = False, ) -> 'T': return cls.load_binary( data, protocol=protocol, compress=compress, _show_progress=_show_progress ) def save_binary( - self, - file: Union[str, BinaryIO], - protocol: str = 'pickle-array', - compress: Optional[str] = None, + self, + file: Union[str, BinaryIO], + protocol: str = 'pickle-array', + compress: Optional[str] = None, ) -> None: """Save array elements into a binary file. @@ -154,11 +150,11 @@ def save_binary( self.to_bytes(protocol=protocol, compress=compress, _file_ctx=file_ctx) def to_bytes( - self, - protocol: str = 'pickle-array', - compress: Optional[str] = None, - _file_ctx: Optional[BinaryIO] = None, - _show_progress: bool = False, + self, + protocol: str = 'pickle-array', + compress: Optional[str] = None, + _file_ctx: Optional[BinaryIO] = None, + _show_progress: bool = False, ) -> bytes: """Serialize itself into bytes. @@ -220,11 +216,11 @@ def __bytes__(self): @classmethod def from_base64( - cls: Type['T'], - data: str, - protocol: str = 'pickle-array', - compress: Optional[str] = None, - _show_progress: bool = False, + cls: Type['T'], + data: str, + protocol: str = 'pickle-array', + compress: Optional[str] = None, + _show_progress: bool = False, ) -> 'T': return cls.load_binary( base64.b64decode(data), @@ -234,9 +230,9 @@ def from_base64( ) def to_base64( - self, - protocol: str = 'pickle-array', - compress: Optional[str] = None, - _show_progress: bool = False, + self, + protocol: str = 'pickle-array', + compress: Optional[str] = None, + _show_progress: bool = False, ) -> str: return base64.b64encode(self.to_bytes(protocol, compress)).decode('utf-8') diff --git a/tests/unit/array/test_from_to_bytes.py b/tests/unit/array/test_from_to_bytes.py index 2ac85ca9364..37941279bb3 100644 --- a/tests/unit/array/test_from_to_bytes.py +++ b/tests/unit/array/test_from_to_bytes.py @@ -1,10 +1,12 @@ +import types + import numpy as np import pytest import tensorflow as tf import torch from scipy.sparse import csr_matrix, coo_matrix, bsr_matrix, csc_matrix -from docarray import DocumentArray +from docarray import DocumentArray, Document from docarray.math.ndarray import to_numpy_array from tests import random_docs @@ -70,6 +72,14 @@ def test_save_bytes(target_da, protocol, compress, tmpfile): DocumentArray.load_binary(fp, protocol=protocol, compress=compress) +def test_save_bytes_stream(tmpfile): + da = DocumentArray([Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})]) + da.save_binary(tmpfile, protocol='pickle') + da_reconstructed = DocumentArray.load_binary(tmpfile, protocol='pickle', return_iterator=True) + assert isinstance(da_reconstructed, types.GeneratorType) + assert da == DocumentArray(da_reconstructed) + + @pytest.mark.parametrize('target_da', [DocumentArray.empty(100), random_docs(100)]) def test_from_to_protobuf(target_da): DocumentArray.from_protobuf(target_da.to_protobuf()) From 715f510cb7a7acb9f94de2668bda8913e5447ca7 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Mon, 17 Jan 2022 08:16:10 +0100 Subject: [PATCH 05/28] tests: add protocol test to stream load --- docarray/array/mixins/io/binary.py | 82 +++++++++++++++----------- tests/unit/array/test_from_to_bytes.py | 15 +++-- 2 files changed, 57 insertions(+), 40 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 8238c82f130..c3ee66d2648 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -17,12 +17,12 @@ class BinaryIOMixin: @classmethod def load_binary( - cls: Type['T'], - file: Union[str, BinaryIO, bytes], - protocol: str = 'pickle-array', - compress: Optional[str] = None, - _show_progress: bool = False, - return_iterator: bool = False, + cls: Type['T'], + file: Union[str, BinaryIO, bytes], + protocol: str = 'pickle-array', + compress: Optional[str] = None, + _show_progress: bool = False, + return_iterator: bool = False, ) -> 'T': """Load array elements from a LZ4-compressed binary file. @@ -44,17 +44,22 @@ def load_binary( else: raise ValueError(f'unsupported input {file!r}') if return_iterator: - return cls._load_binary_stream(file_ctx) + return cls._load_binary_stream( + file_ctx, protocol=protocol, compress=compress + ) else: return cls._load_binary_all(file_ctx, protocol, compress, _show_progress) @classmethod def _load_binary_stream( - cls: Type['T'], - file_ctx: str, - block_size: int = 500, + cls: Type['T'], + file_ctx: str, + block_size: int = 500, + protocol=None, + compress=None, ) -> 'T': from .... import Document + delimiter = None current_bytes = b'' with file_ctx as fp: @@ -66,16 +71,21 @@ def _load_binary_stream( split = b.split(delimiter) for d, _ in zip(split, range(len(split) - 1)): if len(d) > 0: - yield Document.from_bytes(d) + yield Document.from_bytes( + d, protocol=protocol, compress=compress + ) current_bytes = split[-1] if new_bytes == b'': if current_bytes != b'': - yield Document.from_bytes(current_bytes) + yield Document.from_bytes( + current_bytes, protocol=protocol, compress=compress + ) break @classmethod def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): from .... import Document + with file_ctx as fp: d = fp.read() if hasattr(fp, 'read') else fp if get_compress_ctx(algorithm=compress) is not None: @@ -108,25 +118,25 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): @classmethod def _get_batches(cls, gen, batch_size): for i in range(0, len(gen), batch_size): - yield gen[i:i + batch_size] + yield gen[i : i + batch_size] @classmethod def from_bytes( - cls: Type['T'], - data: bytes, - protocol: str = 'pickle-array', - compress: Optional[str] = None, - _show_progress: bool = False, + cls: Type['T'], + data: bytes, + protocol: str = 'pickle-array', + compress: Optional[str] = None, + _show_progress: bool = False, ) -> 'T': return cls.load_binary( data, protocol=protocol, compress=compress, _show_progress=_show_progress ) def save_binary( - self, - file: Union[str, BinaryIO], - protocol: str = 'pickle-array', - compress: Optional[str] = None, + self, + file: Union[str, BinaryIO], + protocol: str = 'pickle-array', + compress: Optional[str] = None, ) -> None: """Save array elements into a binary file. @@ -150,11 +160,11 @@ def save_binary( self.to_bytes(protocol=protocol, compress=compress, _file_ctx=file_ctx) def to_bytes( - self, - protocol: str = 'pickle-array', - compress: Optional[str] = None, - _file_ctx: Optional[BinaryIO] = None, - _show_progress: bool = False, + self, + protocol: str = 'pickle-array', + compress: Optional[str] = None, + _file_ctx: Optional[BinaryIO] = None, + _show_progress: bool = False, ) -> bytes: """Serialize itself into bytes. @@ -216,11 +226,11 @@ def __bytes__(self): @classmethod def from_base64( - cls: Type['T'], - data: str, - protocol: str = 'pickle-array', - compress: Optional[str] = None, - _show_progress: bool = False, + cls: Type['T'], + data: str, + protocol: str = 'pickle-array', + compress: Optional[str] = None, + _show_progress: bool = False, ) -> 'T': return cls.load_binary( base64.b64decode(data), @@ -230,9 +240,9 @@ def from_base64( ) def to_base64( - self, - protocol: str = 'pickle-array', - compress: Optional[str] = None, - _show_progress: bool = False, + self, + protocol: str = 'pickle-array', + compress: Optional[str] = None, + _show_progress: bool = False, ) -> str: return base64.b64encode(self.to_bytes(protocol, compress)).decode('utf-8') diff --git a/tests/unit/array/test_from_to_bytes.py b/tests/unit/array/test_from_to_bytes.py index 37941279bb3..727ab54a277 100644 --- a/tests/unit/array/test_from_to_bytes.py +++ b/tests/unit/array/test_from_to_bytes.py @@ -72,10 +72,17 @@ def test_save_bytes(target_da, protocol, compress, tmpfile): DocumentArray.load_binary(fp, protocol=protocol, compress=compress) -def test_save_bytes_stream(tmpfile): - da = DocumentArray([Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})]) - da.save_binary(tmpfile, protocol='pickle') - da_reconstructed = DocumentArray.load_binary(tmpfile, protocol='pickle', return_iterator=True) +# Note protocol = ['protobuf-array', 'pickle-array'] not supported with Document.from_bytes +@pytest.mark.parametrize('protocol', ['pickle', 'protobuf']) +@pytest.mark.parametrize('compress', ['zlib', None]) +def test_save_bytes_stream_stream(tmpfile, protocol, compress): + da = DocumentArray( + [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] + ) + da.save_binary(tmpfile, protocol=protocol, compress=compress) + da_reconstructed = DocumentArray.load_binary( + tmpfile, protocol=protocol, compress=compress, return_iterator=True + ) assert isinstance(da_reconstructed, types.GeneratorType) assert da == DocumentArray(da_reconstructed) From 281786d6d85d03de6b4ac67b5bb9d84d56a2ce68 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Mon, 17 Jan 2022 08:16:10 +0100 Subject: [PATCH 06/28] test: add protocol test to stream load --- docarray/array/mixins/io/binary.py | 82 +++++++++++++++----------- tests/unit/array/test_from_to_bytes.py | 15 +++-- 2 files changed, 57 insertions(+), 40 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 8238c82f130..c3ee66d2648 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -17,12 +17,12 @@ class BinaryIOMixin: @classmethod def load_binary( - cls: Type['T'], - file: Union[str, BinaryIO, bytes], - protocol: str = 'pickle-array', - compress: Optional[str] = None, - _show_progress: bool = False, - return_iterator: bool = False, + cls: Type['T'], + file: Union[str, BinaryIO, bytes], + protocol: str = 'pickle-array', + compress: Optional[str] = None, + _show_progress: bool = False, + return_iterator: bool = False, ) -> 'T': """Load array elements from a LZ4-compressed binary file. @@ -44,17 +44,22 @@ def load_binary( else: raise ValueError(f'unsupported input {file!r}') if return_iterator: - return cls._load_binary_stream(file_ctx) + return cls._load_binary_stream( + file_ctx, protocol=protocol, compress=compress + ) else: return cls._load_binary_all(file_ctx, protocol, compress, _show_progress) @classmethod def _load_binary_stream( - cls: Type['T'], - file_ctx: str, - block_size: int = 500, + cls: Type['T'], + file_ctx: str, + block_size: int = 500, + protocol=None, + compress=None, ) -> 'T': from .... import Document + delimiter = None current_bytes = b'' with file_ctx as fp: @@ -66,16 +71,21 @@ def _load_binary_stream( split = b.split(delimiter) for d, _ in zip(split, range(len(split) - 1)): if len(d) > 0: - yield Document.from_bytes(d) + yield Document.from_bytes( + d, protocol=protocol, compress=compress + ) current_bytes = split[-1] if new_bytes == b'': if current_bytes != b'': - yield Document.from_bytes(current_bytes) + yield Document.from_bytes( + current_bytes, protocol=protocol, compress=compress + ) break @classmethod def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): from .... import Document + with file_ctx as fp: d = fp.read() if hasattr(fp, 'read') else fp if get_compress_ctx(algorithm=compress) is not None: @@ -108,25 +118,25 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): @classmethod def _get_batches(cls, gen, batch_size): for i in range(0, len(gen), batch_size): - yield gen[i:i + batch_size] + yield gen[i : i + batch_size] @classmethod def from_bytes( - cls: Type['T'], - data: bytes, - protocol: str = 'pickle-array', - compress: Optional[str] = None, - _show_progress: bool = False, + cls: Type['T'], + data: bytes, + protocol: str = 'pickle-array', + compress: Optional[str] = None, + _show_progress: bool = False, ) -> 'T': return cls.load_binary( data, protocol=protocol, compress=compress, _show_progress=_show_progress ) def save_binary( - self, - file: Union[str, BinaryIO], - protocol: str = 'pickle-array', - compress: Optional[str] = None, + self, + file: Union[str, BinaryIO], + protocol: str = 'pickle-array', + compress: Optional[str] = None, ) -> None: """Save array elements into a binary file. @@ -150,11 +160,11 @@ def save_binary( self.to_bytes(protocol=protocol, compress=compress, _file_ctx=file_ctx) def to_bytes( - self, - protocol: str = 'pickle-array', - compress: Optional[str] = None, - _file_ctx: Optional[BinaryIO] = None, - _show_progress: bool = False, + self, + protocol: str = 'pickle-array', + compress: Optional[str] = None, + _file_ctx: Optional[BinaryIO] = None, + _show_progress: bool = False, ) -> bytes: """Serialize itself into bytes. @@ -216,11 +226,11 @@ def __bytes__(self): @classmethod def from_base64( - cls: Type['T'], - data: str, - protocol: str = 'pickle-array', - compress: Optional[str] = None, - _show_progress: bool = False, + cls: Type['T'], + data: str, + protocol: str = 'pickle-array', + compress: Optional[str] = None, + _show_progress: bool = False, ) -> 'T': return cls.load_binary( base64.b64decode(data), @@ -230,9 +240,9 @@ def from_base64( ) def to_base64( - self, - protocol: str = 'pickle-array', - compress: Optional[str] = None, - _show_progress: bool = False, + self, + protocol: str = 'pickle-array', + compress: Optional[str] = None, + _show_progress: bool = False, ) -> str: return base64.b64encode(self.to_bytes(protocol, compress)).decode('utf-8') diff --git a/tests/unit/array/test_from_to_bytes.py b/tests/unit/array/test_from_to_bytes.py index 37941279bb3..727ab54a277 100644 --- a/tests/unit/array/test_from_to_bytes.py +++ b/tests/unit/array/test_from_to_bytes.py @@ -72,10 +72,17 @@ def test_save_bytes(target_da, protocol, compress, tmpfile): DocumentArray.load_binary(fp, protocol=protocol, compress=compress) -def test_save_bytes_stream(tmpfile): - da = DocumentArray([Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})]) - da.save_binary(tmpfile, protocol='pickle') - da_reconstructed = DocumentArray.load_binary(tmpfile, protocol='pickle', return_iterator=True) +# Note protocol = ['protobuf-array', 'pickle-array'] not supported with Document.from_bytes +@pytest.mark.parametrize('protocol', ['pickle', 'protobuf']) +@pytest.mark.parametrize('compress', ['zlib', None]) +def test_save_bytes_stream_stream(tmpfile, protocol, compress): + da = DocumentArray( + [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] + ) + da.save_binary(tmpfile, protocol=protocol, compress=compress) + da_reconstructed = DocumentArray.load_binary( + tmpfile, protocol=protocol, compress=compress, return_iterator=True + ) assert isinstance(da_reconstructed, types.GeneratorType) assert da == DocumentArray(da_reconstructed) From 1827ff06e6c2d96131e35cfbef87639e1ee00035 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Mon, 17 Jan 2022 19:12:01 +0100 Subject: [PATCH 07/28] docs: v1 serialization format --- .../documentarray/serialization.md | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/docs/fundamentals/documentarray/serialization.md b/docs/fundamentals/documentarray/serialization.md index fc29e6de89b..b04aa3dd179 100644 --- a/docs/fundamentals/documentarray/serialization.md +++ b/docs/fundamentals/documentarray/serialization.md @@ -126,21 +126,23 @@ Depending on how you want to interpret the results, the figures above can be an ### Wire format of `pickle` and `protobuf` -When set `protocol=pickle` or `protobuf`, the result binary string looks like the following: +When set `protocol=pickle` or `protobuf`, the resulting bytes look like the following: ```text ------------------------------------------------------------------------------------ -| Delimiter | doc1.to_bytes() | Delimiter | doc2.to_bytes() | Delimiter | ... ------------------------------------------------------------------------------------ - | | - | | - | | - Fixed-length | - | - Variable-length +-------------------------------------------------------------------------------------------------------- +| version | len(docs) | doc1_bytes | doc1.to_bytes() | doc2_bytes | doc2.to_bytes() ... +--------------------------------------------------------------------------------------------------------- +| Fixed-length | Fixed-length | Fixed-length | Variable-length | Fixed-length | Variable-length ... +-------------------------------------------------------------------------------------------------------- + | | | | | | + uint8 uint64 uint32 Variable-length ... ... + ``` -Here `Delimiter` is a 16-bytes separator such as `b'g\x81\xcc\x1c\x0f\x93L\xed\xa2\xb0s)\x9c\xf9\xf6\xf2'` used for setting the boundary of each Document's serialization. Given a `to_bytes(protocol='pickle/protobuf')` binary string, once we know the first 16 bytes, the boundary is clear. Consequently, one can leverage this format to stream Documents, drop, skip, or early-stop, etc. +Here `version` is a `uint8` that specifies the serialization version of the `DocumentArray` serialization format, followed by `len(docs)` which is a `uint64` that specifies the amount of serialized documents. +Afterwards, `doc1_bytes` describes how many bytes are used to serialize `doc1`, followed by `doc1.to_bytes()` which is the bytes data of the document itself. +The pattern `dock_bytes` and `dock.to_bytes` is repeated `len(docs)` times. + ## From/to base64 From 14009334cbbb71a21602195b971c8ade33d59e91 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Mon, 17 Jan 2022 19:19:59 +0100 Subject: [PATCH 08/28] refactor: to bytes v1 serialization --- docarray/array/mixins/io/binary.py | 48 ++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index c3ee66d2648..bbcb3e4e3f1 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -54,7 +54,7 @@ def load_binary( def _load_binary_stream( cls: Type['T'], file_ctx: str, - block_size: int = 500, + block_size: int = 10000, protocol=None, compress=None, ) -> 'T': @@ -63,20 +63,37 @@ def _load_binary_stream( delimiter = None current_bytes = b'' with file_ctx as fp: + delimiter = fp.read(16) while True: new_bytes = fp.read(block_size) + b = current_bytes + new_bytes - if delimiter is None: - delimiter = b[:16] # 16 is the length of the delimiter split = b.split(delimiter) - for d, _ in zip(split, range(len(split) - 1)): + + # range(2-1) = range(1) == [0] + # d= whatever, _ = 0 + #for d , _ in zip(split, range(len(split)-1)): + breakpoint() + for d in split[:-1]: if len(d) > 0: + print('\n\n_load_binary_stream') + print(d) + print('\n\n') + yield Document.from_bytes( d, protocol=protocol, compress=compress ) current_bytes = split[-1] + + # |-------------| + # __XX__AOSDHAIUWDHAIUSHD__XX__ASJDAIODJ + # ['','AOSDHAIUW'] + + # reach_load_binary_stream directly if len(split)==2 and split[0]=='b' if new_bytes == b'': if current_bytes != b'': + print('\n\n_load_binary_stream LAST CASE') + print(Document.from_bytes( d, protocol=protocol, compress=compress)) yield Document.from_bytes( current_bytes, protocol=protocol, compress=compress ) @@ -101,6 +118,8 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): return cls.from_protobuf(dap) elif protocol == 'pickle-array': return pickle.loads(d) + + # Binary format for streaming case else: _len = len(random_uuid().bytes) _binary_delimiter = d[:_len] # first get delimiter @@ -194,6 +213,7 @@ def to_bytes( elif protocol == 'pickle-array': f.write(pickle.dumps(self)) else: + # Binary format for streaming case if _show_progress: from rich.progress import track as _track @@ -201,9 +221,25 @@ def to_bytes( else: track = lambda x: x + # V1 Docarray streaming serialization format + # | 1 byte | 8 bytes | 4 bytes | variable | 4 bytes | variable ... + + # 1 byte (uint8) + version_byte = b'\x01' + f.write(version_byte) + # 8 bytes (uint64) + num_docs_as_bytes = len(self).to_bytes(8, 'big', signed=False) + f.write(num_docs_as_bytes) + for d in track(self): - f.write(_binary_delimiter) - f.write(d.to_bytes(protocol=protocol, compress=compress)) + doc_as_bytes = d.to_bytes(protocol=protocol, compress=compress) + len_doc_as_bytes = len(doc_bytes).to_bytes(4, 'big', signed=False) + + # 8 bytes (uint32) + f.write(len_doc_as_bytes) + # variable size bytes + f.write(doc_as_bytes) + if not _file_ctx: return bf.getvalue() From c8c102478f00f1154fa9373e06889acef72015e2 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Mon, 17 Jan 2022 19:49:34 +0100 Subject: [PATCH 09/28] refactor: load v1 serialization protocol --- docarray/array/mixins/io/binary.py | 50 +++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index bbcb3e4e3f1..54193b7577a 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -72,7 +72,7 @@ def _load_binary_stream( # range(2-1) = range(1) == [0] # d= whatever, _ = 0 - #for d , _ in zip(split, range(len(split)-1)): + # for d , _ in zip(split, range(len(split)-1)): breakpoint() for d in split[:-1]: if len(d) > 0: @@ -93,7 +93,9 @@ def _load_binary_stream( if new_bytes == b'': if current_bytes != b'': print('\n\n_load_binary_stream LAST CASE') - print(Document.from_bytes( d, protocol=protocol, compress=compress)) + print( + Document.from_bytes(d, protocol=protocol, compress=compress) + ) yield Document.from_bytes( current_bytes, protocol=protocol, compress=compress ) @@ -121,18 +123,36 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): # Binary format for streaming case else: - _len = len(random_uuid().bytes) - _binary_delimiter = d[:_len] # first get delimiter + + # 1 byte (uint8) + version = int.from_bytes(d[0:1], 'big', signed=False) + # 8 bytes (uint64) + num_docs = int.from_bytes(d[1:9], 'big', signed=False) + if show_progress: from rich.progress import track as _track track = lambda x: _track(x, description='Deserializing') else: track = lambda x: x - return cls( - Document.from_bytes(od, protocol=protocol, compress=compress) - for od in track(d[_len:].split(_binary_delimiter)) - ) + + docs = [] + start_pos = 9 + for d in track(self): + # 4 bytes (uint32) + len_current_doc_in_bytes = int.from_bytes( + d[start_pos : start_pos + 1], 'big', signed=False + ) + start_doc_pos = start_pos + 1 + end_doc_pos = start_doc_pos + len_current_doc_in_bytes + start_pos = end_doc_pos + # variable length bytes doc + doc = Document.from_bytes( + d[start_doc_pos:end_doc_pos], protocol=protocol, compress=compress + ) + docs.append(doc) + + return cls(docs) @classmethod def _get_batches(cls, gen, batch_size): @@ -221,24 +241,24 @@ def to_bytes( else: track = lambda x: x - # V1 Docarray streaming serialization format + # V1 DocArray streaming serialization format # | 1 byte | 8 bytes | 4 bytes | variable | 4 bytes | variable ... # 1 byte (uint8) version_byte = b'\x01' - f.write(version_byte) # 8 bytes (uint64) num_docs_as_bytes = len(self).to_bytes(8, 'big', signed=False) - f.write(num_docs_as_bytes) + f.write(version_byte + num_docs_as_bytes) for d in track(self): + # 4 bytes (uint32) doc_as_bytes = d.to_bytes(protocol=protocol, compress=compress) - len_doc_as_bytes = len(doc_bytes).to_bytes(4, 'big', signed=False) - # 8 bytes (uint32) - f.write(len_doc_as_bytes) # variable size bytes - f.write(doc_as_bytes) + len_doc_as_bytes = len(doc_bytes).to_bytes( + 4, 'big', signed=False + ) + f.write(len_doc_as_bytes + doc_as_bytes) if not _file_ctx: return bf.getvalue() From 0cac456332d2313da41493aa2b595bd614a5dbf6 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Mon, 17 Jan 2022 19:56:25 +0100 Subject: [PATCH 10/28] refactor: load binary stream --- docarray/array/mixins/io/binary.py | 64 ++++++++++++------------------ 1 file changed, 25 insertions(+), 39 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 54193b7577a..19b6246e4df 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -54,52 +54,38 @@ def load_binary( def _load_binary_stream( cls: Type['T'], file_ctx: str, - block_size: int = 10000, protocol=None, compress=None, ) -> 'T': from .... import Document - delimiter = None current_bytes = b'' with file_ctx as fp: - delimiter = fp.read(16) - while True: - new_bytes = fp.read(block_size) - - b = current_bytes + new_bytes - split = b.split(delimiter) - - # range(2-1) = range(1) == [0] - # d= whatever, _ = 0 - # for d , _ in zip(split, range(len(split)-1)): - breakpoint() - for d in split[:-1]: - if len(d) > 0: - print('\n\n_load_binary_stream') - print(d) - print('\n\n') - - yield Document.from_bytes( - d, protocol=protocol, compress=compress - ) - current_bytes = split[-1] - - # |-------------| - # __XX__AOSDHAIUWDHAIUSHD__XX__ASJDAIODJ - # ['','AOSDHAIUW'] - - # reach_load_binary_stream directly if len(split)==2 and split[0]=='b' - if new_bytes == b'': - if current_bytes != b'': - print('\n\n_load_binary_stream LAST CASE') - print( - Document.from_bytes(d, protocol=protocol, compress=compress) - ) - yield Document.from_bytes( - current_bytes, protocol=protocol, compress=compress - ) - break + # 1 byte (uint8) + version = int.from_bytes(d[0:1], 'big', signed=False) + # 8 bytes (uint64) + num_docs = int.from_bytes(d[1:9], 'big', signed=False) + + if show_progress: + from rich.progress import track as _track + + track = lambda x: _track(x, description='Deserializing') + else: + track = lambda x: x + + start_pos = 9 + for d in track(self): + # 4 bytes (uint32) + len_current_doc_in_bytes = int.from_bytes( + d[start_pos : start_pos + 1], 'big', signed=False + ) + start_doc_pos = start_pos + 1 + end_doc_pos = start_doc_pos + len_current_doc_in_bytes + start_pos = end_doc_pos + # variable length bytes doc + yield Document.from_bytes( + d[start_doc_pos:end_doc_pos], protocol=protocol, compress=compress + ) @classmethod def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): From b7f12dec94aa8adbd49f0c8b17068336217688fa Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 09:05:01 +0100 Subject: [PATCH 11/28] refactor: load binary fix --- docarray/array/mixins/io/binary.py | 15 +++++++------- tests/unit/array/test_from_to_bytes.py | 27 ++++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 19b6246e4df..344c6cbc65c 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -109,12 +109,10 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): # Binary format for streaming case else: - # 1 byte (uint8) version = int.from_bytes(d[0:1], 'big', signed=False) # 8 bytes (uint64) num_docs = int.from_bytes(d[1:9], 'big', signed=False) - if show_progress: from rich.progress import track as _track @@ -122,16 +120,19 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): else: track = lambda x: x - docs = [] + # this 9 is version + num_docs bytes used start_pos = 9 - for d in track(self): + docs = [] + + for _ in track(range(num_docs)): # 4 bytes (uint32) len_current_doc_in_bytes = int.from_bytes( - d[start_pos : start_pos + 1], 'big', signed=False + d[start_pos : start_pos + 4], 'big', signed=False ) - start_doc_pos = start_pos + 1 + start_doc_pos = start_pos + 4 end_doc_pos = start_doc_pos + len_current_doc_in_bytes start_pos = end_doc_pos + # variable length bytes doc doc = Document.from_bytes( d[start_doc_pos:end_doc_pos], protocol=protocol, compress=compress @@ -241,7 +242,7 @@ def to_bytes( doc_as_bytes = d.to_bytes(protocol=protocol, compress=compress) # variable size bytes - len_doc_as_bytes = len(doc_bytes).to_bytes( + len_doc_as_bytes = len(doc_as_bytes).to_bytes( 4, 'big', signed=False ) f.write(len_doc_as_bytes + doc_as_bytes) diff --git a/tests/unit/array/test_from_to_bytes.py b/tests/unit/array/test_from_to_bytes.py index 727ab54a277..94ca654b624 100644 --- a/tests/unit/array/test_from_to_bytes.py +++ b/tests/unit/array/test_from_to_bytes.py @@ -73,11 +73,34 @@ def test_save_bytes(target_da, protocol, compress, tmpfile): # Note protocol = ['protobuf-array', 'pickle-array'] not supported with Document.from_bytes +@pytest.mark.parametrize('protocol', ['pickle']) +@pytest.mark.parametrize('compress', ['lzma']) +def test_save_bytes_stream_stream_new(tmpfile, protocol, compress): +# tmpfile = '/Users/davidbuchaca1/Documents/jina_stuff/docarray/test_aladdine.bin' + da = DocumentArray( + [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] + ) + da.save_binary(tmpfile, protocol=protocol, compress=compress) + da_reconstructed = DocumentArray.load_binary( + tmpfile, protocol=protocol, compress=compress, return_iterator=True + ) + da_reconstructed_normal = DocumentArray.load_binary( + tmpfile, protocol=protocol, compress=compress, return_iterator=False + ) + + assert isinstance(da_reconstructed, types.GeneratorType) + for d, d_rec in zip(da, da_reconstructed): + print(f'\n\ndoc assets!!\n\n') + print(f'd={d.text}, d_rec={d_rec.text}') + print(f'd==d_rec: {d == d_rec}') + assert d == d_rec + + @pytest.mark.parametrize('protocol', ['pickle', 'protobuf']) -@pytest.mark.parametrize('compress', ['zlib', None]) +@pytest.mark.parametrize('compress', ['zlib', 'gzib', None]) def test_save_bytes_stream_stream(tmpfile, protocol, compress): da = DocumentArray( - [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] + [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] ) da.save_binary(tmpfile, protocol=protocol, compress=compress) da_reconstructed = DocumentArray.load_binary( From f066282c00cdf1e69d9ed2d090734a68ddb16a55 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 09:07:23 +0100 Subject: [PATCH 12/28] update: test with new serialization --- tests/unit/array/test_from_to_bytes.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/array/test_from_to_bytes.py b/tests/unit/array/test_from_to_bytes.py index 94ca654b624..f902e606feb 100644 --- a/tests/unit/array/test_from_to_bytes.py +++ b/tests/unit/array/test_from_to_bytes.py @@ -76,9 +76,9 @@ def test_save_bytes(target_da, protocol, compress, tmpfile): @pytest.mark.parametrize('protocol', ['pickle']) @pytest.mark.parametrize('compress', ['lzma']) def test_save_bytes_stream_stream_new(tmpfile, protocol, compress): -# tmpfile = '/Users/davidbuchaca1/Documents/jina_stuff/docarray/test_aladdine.bin' + # tmpfile = '/Users/davidbuchaca1/Documents/jina_stuff/docarray/test_aladdine.bin' da = DocumentArray( - [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] + [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] ) da.save_binary(tmpfile, protocol=protocol, compress=compress) da_reconstructed = DocumentArray.load_binary( @@ -100,7 +100,7 @@ def test_save_bytes_stream_stream_new(tmpfile, protocol, compress): @pytest.mark.parametrize('compress', ['zlib', 'gzib', None]) def test_save_bytes_stream_stream(tmpfile, protocol, compress): da = DocumentArray( - [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] + [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] ) da.save_binary(tmpfile, protocol=protocol, compress=compress) da_reconstructed = DocumentArray.load_binary( From a6689d1c36d485682f2c066f7868673c25583a41 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 10:00:30 +0100 Subject: [PATCH 13/28] test: fix stream test --- docarray/array/mixins/io/binary.py | 38 +++++++++++--------------- tests/unit/array/test_from_to_bytes.py | 4 +-- 2 files changed, 18 insertions(+), 24 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 344c6cbc65c..168efac8e63 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -56,35 +56,29 @@ def _load_binary_stream( file_ctx: str, protocol=None, compress=None, + show_progress=False ) -> 'T': from .... import Document - current_bytes = b'' - with file_ctx as fp: - # 1 byte (uint8) - version = int.from_bytes(d[0:1], 'big', signed=False) - # 8 bytes (uint64) - num_docs = int.from_bytes(d[1:9], 'big', signed=False) + if show_progress: + from rich.progress import track as _track - if show_progress: - from rich.progress import track as _track + track = lambda x: _track(x, description='Deserializing') + else: + track = lambda x: x - track = lambda x: _track(x, description='Deserializing') - else: - track = lambda x: x + with file_ctx as f: + version_numdocs_lendoc0 = f.read(9) + # 1 byte (uint8) + version = int.from_bytes(version_numdocs_lendoc0[0:1], 'big', signed=False) + # 8 bytes (uint64) + num_docs = int.from_bytes(version_numdocs_lendoc0[1:9], 'big', signed=False) - start_pos = 9 - for d in track(self): + for _ in track(range(num_docs)): # 4 bytes (uint32) - len_current_doc_in_bytes = int.from_bytes( - d[start_pos : start_pos + 1], 'big', signed=False - ) - start_doc_pos = start_pos + 1 - end_doc_pos = start_doc_pos + len_current_doc_in_bytes - start_pos = end_doc_pos - # variable length bytes doc + len_current_doc_in_bytes = int.from_bytes(f.read(4), 'big', signed=False) yield Document.from_bytes( - d[start_doc_pos:end_doc_pos], protocol=protocol, compress=compress + f.read(len_current_doc_in_bytes), protocol=protocol, compress=compress ) @classmethod @@ -127,7 +121,7 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): for _ in track(range(num_docs)): # 4 bytes (uint32) len_current_doc_in_bytes = int.from_bytes( - d[start_pos : start_pos + 4], 'big', signed=False + d[start_pos:start_pos + 4], 'big', signed=False ) start_doc_pos = start_pos + 4 end_doc_pos = start_doc_pos + len_current_doc_in_bytes diff --git a/tests/unit/array/test_from_to_bytes.py b/tests/unit/array/test_from_to_bytes.py index f902e606feb..1f05b9d3393 100644 --- a/tests/unit/array/test_from_to_bytes.py +++ b/tests/unit/array/test_from_to_bytes.py @@ -75,7 +75,7 @@ def test_save_bytes(target_da, protocol, compress, tmpfile): # Note protocol = ['protobuf-array', 'pickle-array'] not supported with Document.from_bytes @pytest.mark.parametrize('protocol', ['pickle']) @pytest.mark.parametrize('compress', ['lzma']) -def test_save_bytes_stream_stream_new(tmpfile, protocol, compress): +def test_save_bytes_stream_new(tmpfile, protocol, compress): # tmpfile = '/Users/davidbuchaca1/Documents/jina_stuff/docarray/test_aladdine.bin' da = DocumentArray( [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] @@ -98,7 +98,7 @@ def test_save_bytes_stream_stream_new(tmpfile, protocol, compress): @pytest.mark.parametrize('protocol', ['pickle', 'protobuf']) @pytest.mark.parametrize('compress', ['zlib', 'gzib', None]) -def test_save_bytes_stream_stream(tmpfile, protocol, compress): +def test_save_bytes_stream(tmpfile, protocol, compress): da = DocumentArray( [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] ) From d1828520c278277d179c8a516c390db025314b1d Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 10:00:51 +0100 Subject: [PATCH 14/28] test: fix stream test --- docarray/array/mixins/io/binary.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 168efac8e63..f56a5c1d709 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -52,11 +52,7 @@ def load_binary( @classmethod def _load_binary_stream( - cls: Type['T'], - file_ctx: str, - protocol=None, - compress=None, - show_progress=False + cls: Type['T'], file_ctx: str, protocol=None, compress=None, show_progress=False ) -> 'T': from .... import Document @@ -76,9 +72,13 @@ def _load_binary_stream( for _ in track(range(num_docs)): # 4 bytes (uint32) - len_current_doc_in_bytes = int.from_bytes(f.read(4), 'big', signed=False) + len_current_doc_in_bytes = int.from_bytes( + f.read(4), 'big', signed=False + ) yield Document.from_bytes( - f.read(len_current_doc_in_bytes), protocol=protocol, compress=compress + f.read(len_current_doc_in_bytes), + protocol=protocol, + compress=compress, ) @classmethod @@ -121,7 +121,7 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): for _ in track(range(num_docs)): # 4 bytes (uint32) len_current_doc_in_bytes = int.from_bytes( - d[start_pos:start_pos + 4], 'big', signed=False + d[start_pos : start_pos + 4], 'big', signed=False ) start_doc_pos = start_pos + 4 end_doc_pos = start_doc_pos + len_current_doc_in_bytes From 927e5cb760dd2dd31fd16be10cac2ab22a783a05 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 10:00:51 +0100 Subject: [PATCH 15/28] test: fix stream test --- docarray/array/mixins/io/binary.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 168efac8e63..f56a5c1d709 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -52,11 +52,7 @@ def load_binary( @classmethod def _load_binary_stream( - cls: Type['T'], - file_ctx: str, - protocol=None, - compress=None, - show_progress=False + cls: Type['T'], file_ctx: str, protocol=None, compress=None, show_progress=False ) -> 'T': from .... import Document @@ -76,9 +72,13 @@ def _load_binary_stream( for _ in track(range(num_docs)): # 4 bytes (uint32) - len_current_doc_in_bytes = int.from_bytes(f.read(4), 'big', signed=False) + len_current_doc_in_bytes = int.from_bytes( + f.read(4), 'big', signed=False + ) yield Document.from_bytes( - f.read(len_current_doc_in_bytes), protocol=protocol, compress=compress + f.read(len_current_doc_in_bytes), + protocol=protocol, + compress=compress, ) @classmethod @@ -121,7 +121,7 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): for _ in track(range(num_docs)): # 4 bytes (uint32) len_current_doc_in_bytes = int.from_bytes( - d[start_pos:start_pos + 4], 'big', signed=False + d[start_pos : start_pos + 4], 'big', signed=False ) start_doc_pos = start_pos + 4 end_doc_pos = start_doc_pos + len_current_doc_in_bytes From 189f3b5d399f239e9ee1d86e0e24092e766629a7 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 15:12:43 +0100 Subject: [PATCH 16/28] test: add complete test with all compression --- docarray/array/mixins/io/binary.py | 19 ++++++++++------ docarray/document/mixins/porting.py | 1 + tests/unit/array/test_from_to_bytes.py | 30 +++++--------------------- 3 files changed, 19 insertions(+), 31 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index f56a5c1d709..4098b60102b 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -34,7 +34,6 @@ def load_binary( In case protocol is pickle the `Documents` are streamed from disk to save memory usage :return: a DocumentArray object """ - if isinstance(file, io.BufferedReader): file_ctx = nullcontext(file) elif isinstance(file, bytes): @@ -63,6 +62,7 @@ def _load_binary_stream( else: track = lambda x: x + print(f'type(file_ctx)={type(file_ctx)}') with file_ctx as f: version_numdocs_lendoc0 = f.read(9) # 1 byte (uint8) @@ -87,9 +87,11 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): with file_ctx as fp: d = fp.read() if hasattr(fp, 'read') else fp - if get_compress_ctx(algorithm=compress) is not None: - d = decompress_bytes(d, algorithm=compress) - compress = None + + if protocol == 'pickle-array' or protocol == 'protobuf-array': + if get_compress_ctx(algorithm=compress) is not None: + d = decompress_bytes(d, algorithm=compress) + compress = None if protocol == 'protobuf-array': from ....proto.docarray_pb2 import DocumentArrayProto @@ -197,8 +199,11 @@ def to_bytes( :return: the binary serialization in bytes """ - _binary_delimiter = random_uuid().bytes - compress_ctx = get_compress_ctx(compress, mode='wb') + if protocol == 'protobuf-array' or protocol == 'pickle-array': + compress_ctx = get_compress_ctx(compress, mode='wb') + else: + compress_ctx = None + with (_file_ctx or io.BytesIO()) as bf: if compress_ctx is None: # if compress do not support streaming then postpone the compress @@ -208,6 +213,7 @@ def to_bytes( f = compress_ctx(bf) fc = f compress = None + with fc: if protocol == 'protobuf-array': f.write(self.to_protobuf().SerializePartialToString()) @@ -233,6 +239,7 @@ def to_bytes( for d in track(self): # 4 bytes (uint32) + print(f'\nprotocol={protocol}, compress={compress}\n') doc_as_bytes = d.to_bytes(protocol=protocol, compress=compress) # variable size bytes diff --git a/docarray/document/mixins/porting.py b/docarray/document/mixins/porting.py index 6cbf7c1b357..f6077463c34 100644 --- a/docarray/document/mixins/porting.py +++ b/docarray/document/mixins/porting.py @@ -50,6 +50,7 @@ def to_bytes( raise ValueError( f'protocol={protocol} is not supported. Can be only `protobuf` or pickle protocols 0-5.' ) + return compress_bytes(bstr, algorithm=compress) @classmethod diff --git a/tests/unit/array/test_from_to_bytes.py b/tests/unit/array/test_from_to_bytes.py index 1f05b9d3393..042573dafe0 100644 --- a/tests/unit/array/test_from_to_bytes.py +++ b/tests/unit/array/test_from_to_bytes.py @@ -73,10 +73,11 @@ def test_save_bytes(target_da, protocol, compress, tmpfile): # Note protocol = ['protobuf-array', 'pickle-array'] not supported with Document.from_bytes -@pytest.mark.parametrize('protocol', ['pickle']) -@pytest.mark.parametrize('compress', ['lzma']) -def test_save_bytes_stream_new(tmpfile, protocol, compress): - # tmpfile = '/Users/davidbuchaca1/Documents/jina_stuff/docarray/test_aladdine.bin' +@pytest.mark.parametrize('protocol', ['protobuf', 'pickle']) +@pytest.mark.parametrize( + 'compress', ['lz4', 'bz2', 'lzma', 'gzip', 'zlib', 'gzib', None] +) +def test_save_bytes_stream(tmpfile, protocol, compress): da = DocumentArray( [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] ) @@ -84,32 +85,11 @@ def test_save_bytes_stream_new(tmpfile, protocol, compress): da_reconstructed = DocumentArray.load_binary( tmpfile, protocol=protocol, compress=compress, return_iterator=True ) - da_reconstructed_normal = DocumentArray.load_binary( - tmpfile, protocol=protocol, compress=compress, return_iterator=False - ) - assert isinstance(da_reconstructed, types.GeneratorType) for d, d_rec in zip(da, da_reconstructed): - print(f'\n\ndoc assets!!\n\n') - print(f'd={d.text}, d_rec={d_rec.text}') - print(f'd==d_rec: {d == d_rec}') assert d == d_rec -@pytest.mark.parametrize('protocol', ['pickle', 'protobuf']) -@pytest.mark.parametrize('compress', ['zlib', 'gzib', None]) -def test_save_bytes_stream(tmpfile, protocol, compress): - da = DocumentArray( - [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] - ) - da.save_binary(tmpfile, protocol=protocol, compress=compress) - da_reconstructed = DocumentArray.load_binary( - tmpfile, protocol=protocol, compress=compress, return_iterator=True - ) - assert isinstance(da_reconstructed, types.GeneratorType) - assert da == DocumentArray(da_reconstructed) - - @pytest.mark.parametrize('target_da', [DocumentArray.empty(100), random_docs(100)]) def test_from_to_protobuf(target_da): DocumentArray.from_protobuf(target_da.to_protobuf()) From 6e6ec5a234b132e59f40a7d8649b9c100d4bfdbf Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Mon, 17 Jan 2022 19:12:01 +0100 Subject: [PATCH 17/28] docs: v1 serialization format --- .../documentarray/serialization.md | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/docs/fundamentals/documentarray/serialization.md b/docs/fundamentals/documentarray/serialization.md index fc29e6de89b..b04aa3dd179 100644 --- a/docs/fundamentals/documentarray/serialization.md +++ b/docs/fundamentals/documentarray/serialization.md @@ -126,21 +126,23 @@ Depending on how you want to interpret the results, the figures above can be an ### Wire format of `pickle` and `protobuf` -When set `protocol=pickle` or `protobuf`, the result binary string looks like the following: +When set `protocol=pickle` or `protobuf`, the resulting bytes look like the following: ```text ------------------------------------------------------------------------------------ -| Delimiter | doc1.to_bytes() | Delimiter | doc2.to_bytes() | Delimiter | ... ------------------------------------------------------------------------------------ - | | - | | - | | - Fixed-length | - | - Variable-length +-------------------------------------------------------------------------------------------------------- +| version | len(docs) | doc1_bytes | doc1.to_bytes() | doc2_bytes | doc2.to_bytes() ... +--------------------------------------------------------------------------------------------------------- +| Fixed-length | Fixed-length | Fixed-length | Variable-length | Fixed-length | Variable-length ... +-------------------------------------------------------------------------------------------------------- + | | | | | | + uint8 uint64 uint32 Variable-length ... ... + ``` -Here `Delimiter` is a 16-bytes separator such as `b'g\x81\xcc\x1c\x0f\x93L\xed\xa2\xb0s)\x9c\xf9\xf6\xf2'` used for setting the boundary of each Document's serialization. Given a `to_bytes(protocol='pickle/protobuf')` binary string, once we know the first 16 bytes, the boundary is clear. Consequently, one can leverage this format to stream Documents, drop, skip, or early-stop, etc. +Here `version` is a `uint8` that specifies the serialization version of the `DocumentArray` serialization format, followed by `len(docs)` which is a `uint64` that specifies the amount of serialized documents. +Afterwards, `doc1_bytes` describes how many bytes are used to serialize `doc1`, followed by `doc1.to_bytes()` which is the bytes data of the document itself. +The pattern `dock_bytes` and `dock.to_bytes` is repeated `len(docs)` times. + ## From/to base64 From 9a799a8718d8ddd0e4396b60780f95c6bc1a3081 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Mon, 17 Jan 2022 19:19:59 +0100 Subject: [PATCH 18/28] refactor: to bytes v1 serialization --- docarray/array/mixins/io/binary.py | 48 ++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index c3ee66d2648..bbcb3e4e3f1 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -54,7 +54,7 @@ def load_binary( def _load_binary_stream( cls: Type['T'], file_ctx: str, - block_size: int = 500, + block_size: int = 10000, protocol=None, compress=None, ) -> 'T': @@ -63,20 +63,37 @@ def _load_binary_stream( delimiter = None current_bytes = b'' with file_ctx as fp: + delimiter = fp.read(16) while True: new_bytes = fp.read(block_size) + b = current_bytes + new_bytes - if delimiter is None: - delimiter = b[:16] # 16 is the length of the delimiter split = b.split(delimiter) - for d, _ in zip(split, range(len(split) - 1)): + + # range(2-1) = range(1) == [0] + # d= whatever, _ = 0 + #for d , _ in zip(split, range(len(split)-1)): + breakpoint() + for d in split[:-1]: if len(d) > 0: + print('\n\n_load_binary_stream') + print(d) + print('\n\n') + yield Document.from_bytes( d, protocol=protocol, compress=compress ) current_bytes = split[-1] + + # |-------------| + # __XX__AOSDHAIUWDHAIUSHD__XX__ASJDAIODJ + # ['','AOSDHAIUW'] + + # reach_load_binary_stream directly if len(split)==2 and split[0]=='b' if new_bytes == b'': if current_bytes != b'': + print('\n\n_load_binary_stream LAST CASE') + print(Document.from_bytes( d, protocol=protocol, compress=compress)) yield Document.from_bytes( current_bytes, protocol=protocol, compress=compress ) @@ -101,6 +118,8 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): return cls.from_protobuf(dap) elif protocol == 'pickle-array': return pickle.loads(d) + + # Binary format for streaming case else: _len = len(random_uuid().bytes) _binary_delimiter = d[:_len] # first get delimiter @@ -194,6 +213,7 @@ def to_bytes( elif protocol == 'pickle-array': f.write(pickle.dumps(self)) else: + # Binary format for streaming case if _show_progress: from rich.progress import track as _track @@ -201,9 +221,25 @@ def to_bytes( else: track = lambda x: x + # V1 Docarray streaming serialization format + # | 1 byte | 8 bytes | 4 bytes | variable | 4 bytes | variable ... + + # 1 byte (uint8) + version_byte = b'\x01' + f.write(version_byte) + # 8 bytes (uint64) + num_docs_as_bytes = len(self).to_bytes(8, 'big', signed=False) + f.write(num_docs_as_bytes) + for d in track(self): - f.write(_binary_delimiter) - f.write(d.to_bytes(protocol=protocol, compress=compress)) + doc_as_bytes = d.to_bytes(protocol=protocol, compress=compress) + len_doc_as_bytes = len(doc_bytes).to_bytes(4, 'big', signed=False) + + # 8 bytes (uint32) + f.write(len_doc_as_bytes) + # variable size bytes + f.write(doc_as_bytes) + if not _file_ctx: return bf.getvalue() From 57f769bb16638a627bc30564098c357f8e1ea4e8 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Mon, 17 Jan 2022 19:49:34 +0100 Subject: [PATCH 19/28] refactor: load v1 serialization protocol --- docarray/array/mixins/io/binary.py | 50 +++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index bbcb3e4e3f1..54193b7577a 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -72,7 +72,7 @@ def _load_binary_stream( # range(2-1) = range(1) == [0] # d= whatever, _ = 0 - #for d , _ in zip(split, range(len(split)-1)): + # for d , _ in zip(split, range(len(split)-1)): breakpoint() for d in split[:-1]: if len(d) > 0: @@ -93,7 +93,9 @@ def _load_binary_stream( if new_bytes == b'': if current_bytes != b'': print('\n\n_load_binary_stream LAST CASE') - print(Document.from_bytes( d, protocol=protocol, compress=compress)) + print( + Document.from_bytes(d, protocol=protocol, compress=compress) + ) yield Document.from_bytes( current_bytes, protocol=protocol, compress=compress ) @@ -121,18 +123,36 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): # Binary format for streaming case else: - _len = len(random_uuid().bytes) - _binary_delimiter = d[:_len] # first get delimiter + + # 1 byte (uint8) + version = int.from_bytes(d[0:1], 'big', signed=False) + # 8 bytes (uint64) + num_docs = int.from_bytes(d[1:9], 'big', signed=False) + if show_progress: from rich.progress import track as _track track = lambda x: _track(x, description='Deserializing') else: track = lambda x: x - return cls( - Document.from_bytes(od, protocol=protocol, compress=compress) - for od in track(d[_len:].split(_binary_delimiter)) - ) + + docs = [] + start_pos = 9 + for d in track(self): + # 4 bytes (uint32) + len_current_doc_in_bytes = int.from_bytes( + d[start_pos : start_pos + 1], 'big', signed=False + ) + start_doc_pos = start_pos + 1 + end_doc_pos = start_doc_pos + len_current_doc_in_bytes + start_pos = end_doc_pos + # variable length bytes doc + doc = Document.from_bytes( + d[start_doc_pos:end_doc_pos], protocol=protocol, compress=compress + ) + docs.append(doc) + + return cls(docs) @classmethod def _get_batches(cls, gen, batch_size): @@ -221,24 +241,24 @@ def to_bytes( else: track = lambda x: x - # V1 Docarray streaming serialization format + # V1 DocArray streaming serialization format # | 1 byte | 8 bytes | 4 bytes | variable | 4 bytes | variable ... # 1 byte (uint8) version_byte = b'\x01' - f.write(version_byte) # 8 bytes (uint64) num_docs_as_bytes = len(self).to_bytes(8, 'big', signed=False) - f.write(num_docs_as_bytes) + f.write(version_byte + num_docs_as_bytes) for d in track(self): + # 4 bytes (uint32) doc_as_bytes = d.to_bytes(protocol=protocol, compress=compress) - len_doc_as_bytes = len(doc_bytes).to_bytes(4, 'big', signed=False) - # 8 bytes (uint32) - f.write(len_doc_as_bytes) # variable size bytes - f.write(doc_as_bytes) + len_doc_as_bytes = len(doc_bytes).to_bytes( + 4, 'big', signed=False + ) + f.write(len_doc_as_bytes + doc_as_bytes) if not _file_ctx: return bf.getvalue() From f7fe6e73b7ba196705e4712d061f6a87de8dcc80 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Mon, 17 Jan 2022 19:56:25 +0100 Subject: [PATCH 20/28] refactor: load binary stream --- docarray/array/mixins/io/binary.py | 64 ++++++++++++------------------ 1 file changed, 25 insertions(+), 39 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 54193b7577a..19b6246e4df 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -54,52 +54,38 @@ def load_binary( def _load_binary_stream( cls: Type['T'], file_ctx: str, - block_size: int = 10000, protocol=None, compress=None, ) -> 'T': from .... import Document - delimiter = None current_bytes = b'' with file_ctx as fp: - delimiter = fp.read(16) - while True: - new_bytes = fp.read(block_size) - - b = current_bytes + new_bytes - split = b.split(delimiter) - - # range(2-1) = range(1) == [0] - # d= whatever, _ = 0 - # for d , _ in zip(split, range(len(split)-1)): - breakpoint() - for d in split[:-1]: - if len(d) > 0: - print('\n\n_load_binary_stream') - print(d) - print('\n\n') - - yield Document.from_bytes( - d, protocol=protocol, compress=compress - ) - current_bytes = split[-1] - - # |-------------| - # __XX__AOSDHAIUWDHAIUSHD__XX__ASJDAIODJ - # ['','AOSDHAIUW'] - - # reach_load_binary_stream directly if len(split)==2 and split[0]=='b' - if new_bytes == b'': - if current_bytes != b'': - print('\n\n_load_binary_stream LAST CASE') - print( - Document.from_bytes(d, protocol=protocol, compress=compress) - ) - yield Document.from_bytes( - current_bytes, protocol=protocol, compress=compress - ) - break + # 1 byte (uint8) + version = int.from_bytes(d[0:1], 'big', signed=False) + # 8 bytes (uint64) + num_docs = int.from_bytes(d[1:9], 'big', signed=False) + + if show_progress: + from rich.progress import track as _track + + track = lambda x: _track(x, description='Deserializing') + else: + track = lambda x: x + + start_pos = 9 + for d in track(self): + # 4 bytes (uint32) + len_current_doc_in_bytes = int.from_bytes( + d[start_pos : start_pos + 1], 'big', signed=False + ) + start_doc_pos = start_pos + 1 + end_doc_pos = start_doc_pos + len_current_doc_in_bytes + start_pos = end_doc_pos + # variable length bytes doc + yield Document.from_bytes( + d[start_doc_pos:end_doc_pos], protocol=protocol, compress=compress + ) @classmethod def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): From e7207aaf05b6cec7ffcc220f5b08e0915a6f83f9 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 09:05:01 +0100 Subject: [PATCH 21/28] refactor: load binary fix --- docarray/array/mixins/io/binary.py | 15 +++++++------- tests/unit/array/test_from_to_bytes.py | 27 ++++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 19b6246e4df..344c6cbc65c 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -109,12 +109,10 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): # Binary format for streaming case else: - # 1 byte (uint8) version = int.from_bytes(d[0:1], 'big', signed=False) # 8 bytes (uint64) num_docs = int.from_bytes(d[1:9], 'big', signed=False) - if show_progress: from rich.progress import track as _track @@ -122,16 +120,19 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): else: track = lambda x: x - docs = [] + # this 9 is version + num_docs bytes used start_pos = 9 - for d in track(self): + docs = [] + + for _ in track(range(num_docs)): # 4 bytes (uint32) len_current_doc_in_bytes = int.from_bytes( - d[start_pos : start_pos + 1], 'big', signed=False + d[start_pos : start_pos + 4], 'big', signed=False ) - start_doc_pos = start_pos + 1 + start_doc_pos = start_pos + 4 end_doc_pos = start_doc_pos + len_current_doc_in_bytes start_pos = end_doc_pos + # variable length bytes doc doc = Document.from_bytes( d[start_doc_pos:end_doc_pos], protocol=protocol, compress=compress @@ -241,7 +242,7 @@ def to_bytes( doc_as_bytes = d.to_bytes(protocol=protocol, compress=compress) # variable size bytes - len_doc_as_bytes = len(doc_bytes).to_bytes( + len_doc_as_bytes = len(doc_as_bytes).to_bytes( 4, 'big', signed=False ) f.write(len_doc_as_bytes + doc_as_bytes) diff --git a/tests/unit/array/test_from_to_bytes.py b/tests/unit/array/test_from_to_bytes.py index 727ab54a277..94ca654b624 100644 --- a/tests/unit/array/test_from_to_bytes.py +++ b/tests/unit/array/test_from_to_bytes.py @@ -73,11 +73,34 @@ def test_save_bytes(target_da, protocol, compress, tmpfile): # Note protocol = ['protobuf-array', 'pickle-array'] not supported with Document.from_bytes +@pytest.mark.parametrize('protocol', ['pickle']) +@pytest.mark.parametrize('compress', ['lzma']) +def test_save_bytes_stream_stream_new(tmpfile, protocol, compress): +# tmpfile = '/Users/davidbuchaca1/Documents/jina_stuff/docarray/test_aladdine.bin' + da = DocumentArray( + [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] + ) + da.save_binary(tmpfile, protocol=protocol, compress=compress) + da_reconstructed = DocumentArray.load_binary( + tmpfile, protocol=protocol, compress=compress, return_iterator=True + ) + da_reconstructed_normal = DocumentArray.load_binary( + tmpfile, protocol=protocol, compress=compress, return_iterator=False + ) + + assert isinstance(da_reconstructed, types.GeneratorType) + for d, d_rec in zip(da, da_reconstructed): + print(f'\n\ndoc assets!!\n\n') + print(f'd={d.text}, d_rec={d_rec.text}') + print(f'd==d_rec: {d == d_rec}') + assert d == d_rec + + @pytest.mark.parametrize('protocol', ['pickle', 'protobuf']) -@pytest.mark.parametrize('compress', ['zlib', None]) +@pytest.mark.parametrize('compress', ['zlib', 'gzib', None]) def test_save_bytes_stream_stream(tmpfile, protocol, compress): da = DocumentArray( - [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] + [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] ) da.save_binary(tmpfile, protocol=protocol, compress=compress) da_reconstructed = DocumentArray.load_binary( From 1c73390d91fcd545b32750ed92e936337814547b Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 09:07:23 +0100 Subject: [PATCH 22/28] update: test with new serialization --- tests/unit/array/test_from_to_bytes.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/array/test_from_to_bytes.py b/tests/unit/array/test_from_to_bytes.py index 94ca654b624..f902e606feb 100644 --- a/tests/unit/array/test_from_to_bytes.py +++ b/tests/unit/array/test_from_to_bytes.py @@ -76,9 +76,9 @@ def test_save_bytes(target_da, protocol, compress, tmpfile): @pytest.mark.parametrize('protocol', ['pickle']) @pytest.mark.parametrize('compress', ['lzma']) def test_save_bytes_stream_stream_new(tmpfile, protocol, compress): -# tmpfile = '/Users/davidbuchaca1/Documents/jina_stuff/docarray/test_aladdine.bin' + # tmpfile = '/Users/davidbuchaca1/Documents/jina_stuff/docarray/test_aladdine.bin' da = DocumentArray( - [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] + [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] ) da.save_binary(tmpfile, protocol=protocol, compress=compress) da_reconstructed = DocumentArray.load_binary( @@ -100,7 +100,7 @@ def test_save_bytes_stream_stream_new(tmpfile, protocol, compress): @pytest.mark.parametrize('compress', ['zlib', 'gzib', None]) def test_save_bytes_stream_stream(tmpfile, protocol, compress): da = DocumentArray( - [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] + [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] ) da.save_binary(tmpfile, protocol=protocol, compress=compress) da_reconstructed = DocumentArray.load_binary( From f60d13ac779014de2f74a043d0235046562edfa1 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 10:00:30 +0100 Subject: [PATCH 23/28] test: fix stream test --- docarray/array/mixins/io/binary.py | 38 +++++++++++--------------- tests/unit/array/test_from_to_bytes.py | 4 +-- 2 files changed, 18 insertions(+), 24 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 344c6cbc65c..168efac8e63 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -56,35 +56,29 @@ def _load_binary_stream( file_ctx: str, protocol=None, compress=None, + show_progress=False ) -> 'T': from .... import Document - current_bytes = b'' - with file_ctx as fp: - # 1 byte (uint8) - version = int.from_bytes(d[0:1], 'big', signed=False) - # 8 bytes (uint64) - num_docs = int.from_bytes(d[1:9], 'big', signed=False) + if show_progress: + from rich.progress import track as _track - if show_progress: - from rich.progress import track as _track + track = lambda x: _track(x, description='Deserializing') + else: + track = lambda x: x - track = lambda x: _track(x, description='Deserializing') - else: - track = lambda x: x + with file_ctx as f: + version_numdocs_lendoc0 = f.read(9) + # 1 byte (uint8) + version = int.from_bytes(version_numdocs_lendoc0[0:1], 'big', signed=False) + # 8 bytes (uint64) + num_docs = int.from_bytes(version_numdocs_lendoc0[1:9], 'big', signed=False) - start_pos = 9 - for d in track(self): + for _ in track(range(num_docs)): # 4 bytes (uint32) - len_current_doc_in_bytes = int.from_bytes( - d[start_pos : start_pos + 1], 'big', signed=False - ) - start_doc_pos = start_pos + 1 - end_doc_pos = start_doc_pos + len_current_doc_in_bytes - start_pos = end_doc_pos - # variable length bytes doc + len_current_doc_in_bytes = int.from_bytes(f.read(4), 'big', signed=False) yield Document.from_bytes( - d[start_doc_pos:end_doc_pos], protocol=protocol, compress=compress + f.read(len_current_doc_in_bytes), protocol=protocol, compress=compress ) @classmethod @@ -127,7 +121,7 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): for _ in track(range(num_docs)): # 4 bytes (uint32) len_current_doc_in_bytes = int.from_bytes( - d[start_pos : start_pos + 4], 'big', signed=False + d[start_pos:start_pos + 4], 'big', signed=False ) start_doc_pos = start_pos + 4 end_doc_pos = start_doc_pos + len_current_doc_in_bytes diff --git a/tests/unit/array/test_from_to_bytes.py b/tests/unit/array/test_from_to_bytes.py index f902e606feb..1f05b9d3393 100644 --- a/tests/unit/array/test_from_to_bytes.py +++ b/tests/unit/array/test_from_to_bytes.py @@ -75,7 +75,7 @@ def test_save_bytes(target_da, protocol, compress, tmpfile): # Note protocol = ['protobuf-array', 'pickle-array'] not supported with Document.from_bytes @pytest.mark.parametrize('protocol', ['pickle']) @pytest.mark.parametrize('compress', ['lzma']) -def test_save_bytes_stream_stream_new(tmpfile, protocol, compress): +def test_save_bytes_stream_new(tmpfile, protocol, compress): # tmpfile = '/Users/davidbuchaca1/Documents/jina_stuff/docarray/test_aladdine.bin' da = DocumentArray( [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] @@ -98,7 +98,7 @@ def test_save_bytes_stream_stream_new(tmpfile, protocol, compress): @pytest.mark.parametrize('protocol', ['pickle', 'protobuf']) @pytest.mark.parametrize('compress', ['zlib', 'gzib', None]) -def test_save_bytes_stream_stream(tmpfile, protocol, compress): +def test_save_bytes_stream(tmpfile, protocol, compress): da = DocumentArray( [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] ) From 780b32cacf03c57db94bd3be0936798b78468b19 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 10:00:51 +0100 Subject: [PATCH 24/28] test: fix stream test --- docarray/array/mixins/io/binary.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 168efac8e63..f56a5c1d709 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -52,11 +52,7 @@ def load_binary( @classmethod def _load_binary_stream( - cls: Type['T'], - file_ctx: str, - protocol=None, - compress=None, - show_progress=False + cls: Type['T'], file_ctx: str, protocol=None, compress=None, show_progress=False ) -> 'T': from .... import Document @@ -76,9 +72,13 @@ def _load_binary_stream( for _ in track(range(num_docs)): # 4 bytes (uint32) - len_current_doc_in_bytes = int.from_bytes(f.read(4), 'big', signed=False) + len_current_doc_in_bytes = int.from_bytes( + f.read(4), 'big', signed=False + ) yield Document.from_bytes( - f.read(len_current_doc_in_bytes), protocol=protocol, compress=compress + f.read(len_current_doc_in_bytes), + protocol=protocol, + compress=compress, ) @classmethod @@ -121,7 +121,7 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): for _ in track(range(num_docs)): # 4 bytes (uint32) len_current_doc_in_bytes = int.from_bytes( - d[start_pos:start_pos + 4], 'big', signed=False + d[start_pos : start_pos + 4], 'big', signed=False ) start_doc_pos = start_pos + 4 end_doc_pos = start_doc_pos + len_current_doc_in_bytes From 30266beb56124121a68778f246a9141e50249a5f Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 15:12:43 +0100 Subject: [PATCH 25/28] test: add complete test with all compression --- docarray/array/mixins/io/binary.py | 19 ++++++++++------ docarray/document/mixins/porting.py | 1 + tests/unit/array/test_from_to_bytes.py | 30 +++++--------------------- 3 files changed, 19 insertions(+), 31 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index f56a5c1d709..4098b60102b 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -34,7 +34,6 @@ def load_binary( In case protocol is pickle the `Documents` are streamed from disk to save memory usage :return: a DocumentArray object """ - if isinstance(file, io.BufferedReader): file_ctx = nullcontext(file) elif isinstance(file, bytes): @@ -63,6 +62,7 @@ def _load_binary_stream( else: track = lambda x: x + print(f'type(file_ctx)={type(file_ctx)}') with file_ctx as f: version_numdocs_lendoc0 = f.read(9) # 1 byte (uint8) @@ -87,9 +87,11 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): with file_ctx as fp: d = fp.read() if hasattr(fp, 'read') else fp - if get_compress_ctx(algorithm=compress) is not None: - d = decompress_bytes(d, algorithm=compress) - compress = None + + if protocol == 'pickle-array' or protocol == 'protobuf-array': + if get_compress_ctx(algorithm=compress) is not None: + d = decompress_bytes(d, algorithm=compress) + compress = None if protocol == 'protobuf-array': from ....proto.docarray_pb2 import DocumentArrayProto @@ -197,8 +199,11 @@ def to_bytes( :return: the binary serialization in bytes """ - _binary_delimiter = random_uuid().bytes - compress_ctx = get_compress_ctx(compress, mode='wb') + if protocol == 'protobuf-array' or protocol == 'pickle-array': + compress_ctx = get_compress_ctx(compress, mode='wb') + else: + compress_ctx = None + with (_file_ctx or io.BytesIO()) as bf: if compress_ctx is None: # if compress do not support streaming then postpone the compress @@ -208,6 +213,7 @@ def to_bytes( f = compress_ctx(bf) fc = f compress = None + with fc: if protocol == 'protobuf-array': f.write(self.to_protobuf().SerializePartialToString()) @@ -233,6 +239,7 @@ def to_bytes( for d in track(self): # 4 bytes (uint32) + print(f'\nprotocol={protocol}, compress={compress}\n') doc_as_bytes = d.to_bytes(protocol=protocol, compress=compress) # variable size bytes diff --git a/docarray/document/mixins/porting.py b/docarray/document/mixins/porting.py index 6cbf7c1b357..f6077463c34 100644 --- a/docarray/document/mixins/porting.py +++ b/docarray/document/mixins/porting.py @@ -50,6 +50,7 @@ def to_bytes( raise ValueError( f'protocol={protocol} is not supported. Can be only `protobuf` or pickle protocols 0-5.' ) + return compress_bytes(bstr, algorithm=compress) @classmethod diff --git a/tests/unit/array/test_from_to_bytes.py b/tests/unit/array/test_from_to_bytes.py index 1f05b9d3393..042573dafe0 100644 --- a/tests/unit/array/test_from_to_bytes.py +++ b/tests/unit/array/test_from_to_bytes.py @@ -73,10 +73,11 @@ def test_save_bytes(target_da, protocol, compress, tmpfile): # Note protocol = ['protobuf-array', 'pickle-array'] not supported with Document.from_bytes -@pytest.mark.parametrize('protocol', ['pickle']) -@pytest.mark.parametrize('compress', ['lzma']) -def test_save_bytes_stream_new(tmpfile, protocol, compress): - # tmpfile = '/Users/davidbuchaca1/Documents/jina_stuff/docarray/test_aladdine.bin' +@pytest.mark.parametrize('protocol', ['protobuf', 'pickle']) +@pytest.mark.parametrize( + 'compress', ['lz4', 'bz2', 'lzma', 'gzip', 'zlib', 'gzib', None] +) +def test_save_bytes_stream(tmpfile, protocol, compress): da = DocumentArray( [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] ) @@ -84,32 +85,11 @@ def test_save_bytes_stream_new(tmpfile, protocol, compress): da_reconstructed = DocumentArray.load_binary( tmpfile, protocol=protocol, compress=compress, return_iterator=True ) - da_reconstructed_normal = DocumentArray.load_binary( - tmpfile, protocol=protocol, compress=compress, return_iterator=False - ) - assert isinstance(da_reconstructed, types.GeneratorType) for d, d_rec in zip(da, da_reconstructed): - print(f'\n\ndoc assets!!\n\n') - print(f'd={d.text}, d_rec={d_rec.text}') - print(f'd==d_rec: {d == d_rec}') assert d == d_rec -@pytest.mark.parametrize('protocol', ['pickle', 'protobuf']) -@pytest.mark.parametrize('compress', ['zlib', 'gzib', None]) -def test_save_bytes_stream(tmpfile, protocol, compress): - da = DocumentArray( - [Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})] - ) - da.save_binary(tmpfile, protocol=protocol, compress=compress) - da_reconstructed = DocumentArray.load_binary( - tmpfile, protocol=protocol, compress=compress, return_iterator=True - ) - assert isinstance(da_reconstructed, types.GeneratorType) - assert da == DocumentArray(da_reconstructed) - - @pytest.mark.parametrize('target_da', [DocumentArray.empty(100), random_docs(100)]) def test_from_to_protobuf(target_da): DocumentArray.from_protobuf(target_da.to_protobuf()) From f593b7304c21fb2e7d0b6d5c75dba7d724941101 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 15:36:34 +0100 Subject: [PATCH 26/28] refactor: remove prints --- docarray/array/mixins/io/binary.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 4098b60102b..833ee2fa86a 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -62,7 +62,6 @@ def _load_binary_stream( else: track = lambda x: x - print(f'type(file_ctx)={type(file_ctx)}') with file_ctx as f: version_numdocs_lendoc0 = f.read(9) # 1 byte (uint8) @@ -123,7 +122,7 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): for _ in track(range(num_docs)): # 4 bytes (uint32) len_current_doc_in_bytes = int.from_bytes( - d[start_pos : start_pos + 4], 'big', signed=False + d[start_pos: start_pos + 4], 'big', signed=False ) start_doc_pos = start_pos + 4 end_doc_pos = start_doc_pos + len_current_doc_in_bytes @@ -239,7 +238,6 @@ def to_bytes( for d in track(self): # 4 bytes (uint32) - print(f'\nprotocol={protocol}, compress={compress}\n') doc_as_bytes = d.to_bytes(protocol=protocol, compress=compress) # variable size bytes From 6ca89d667a2bbb0f09d7534835c83bff3cf2e342 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 15:41:27 +0100 Subject: [PATCH 27/28] refactor: fix black --- docarray/array/mixins/io/binary.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 833ee2fa86a..90296deed7a 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -53,6 +53,7 @@ def load_binary( def _load_binary_stream( cls: Type['T'], file_ctx: str, protocol=None, compress=None, show_progress=False ) -> 'T': + from .... import Document if show_progress: @@ -122,7 +123,7 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): for _ in track(range(num_docs)): # 4 bytes (uint32) len_current_doc_in_bytes = int.from_bytes( - d[start_pos: start_pos + 4], 'big', signed=False + d[start_pos : start_pos + 4], 'big', signed=False ) start_doc_pos = start_pos + 4 end_doc_pos = start_doc_pos + len_current_doc_in_bytes From fec27263288399bf0e8b0ea4ba239467b278fdb9 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 18 Jan 2022 15:43:07 +0100 Subject: [PATCH 28/28] refactor: remove batches --- docarray/array/mixins/io/binary.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 90296deed7a..544d0b5266c 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -137,11 +137,6 @@ def _load_binary_all(cls, file_ctx, protocol, compress, show_progress): return cls(docs) - @classmethod - def _get_batches(cls, gen, batch_size): - for i in range(0, len(gen), batch_size): - yield gen[i : i + batch_size] - @classmethod def from_bytes( cls: Type['T'],