Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3c1fbc7
feat: initial draft
florian-hoenicke Jan 13, 2022
5403b03
feat: constant delimiter len
florian-hoenicke Jan 13, 2022
b6a6491
fix: offset error
florian-hoenicke Jan 13, 2022
c30138f
test: load stream
florian-hoenicke Jan 14, 2022
715f510
tests: add protocol test to stream load
davidbp Jan 17, 2022
281786d
test: add protocol test to stream load
davidbp Jan 17, 2022
e4d72df
Merge branch 'feat-read-stream' of https://github.com/jina-ai/docarra…
davidbp Jan 17, 2022
1827ff0
docs: v1 serialization format
davidbp Jan 17, 2022
1400933
refactor: to bytes v1 serialization
davidbp Jan 17, 2022
c8c1024
refactor: load v1 serialization protocol
davidbp Jan 17, 2022
0cac456
refactor: load binary stream
davidbp Jan 17, 2022
b7f12de
refactor: load binary fix
davidbp Jan 18, 2022
f066282
update: test with new serialization
davidbp Jan 18, 2022
a6689d1
test: fix stream test
davidbp Jan 18, 2022
d182852
test: fix stream test
davidbp Jan 18, 2022
927e5cb
test: fix stream test
davidbp Jan 18, 2022
189f3b5
test: add complete test with all compression
davidbp Jan 18, 2022
4a91d93
Merge branch 'feat-read-stream' of https://github.com/jina-ai/docarra…
davidbp Jan 18, 2022
6e6ec5a
docs: v1 serialization format
davidbp Jan 17, 2022
9a799a8
refactor: to bytes v1 serialization
davidbp Jan 17, 2022
57f769b
refactor: load v1 serialization protocol
davidbp Jan 17, 2022
f7fe6e7
refactor: load binary stream
davidbp Jan 17, 2022
e7207aa
refactor: load binary fix
davidbp Jan 18, 2022
1c73390
update: test with new serialization
davidbp Jan 18, 2022
f60d13a
test: fix stream test
davidbp Jan 18, 2022
780b32c
test: fix stream test
davidbp Jan 18, 2022
30266be
test: add complete test with all compression
davidbp Jan 18, 2022
fc21e74
Merge branch 'feat-read-stream' of https://github.com/jina-ai/docarra…
davidbp Jan 18, 2022
f593b73
refactor: remove prints
davidbp Jan 18, 2022
6ca89d6
refactor: fix black
davidbp Jan 18, 2022
fec2726
refactor: remove batches
davidbp Jan 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 109 additions & 24 deletions docarray/array/mixins/io/binary.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import base64
import io
import os.path
import base64
import pickle
from contextlib import nullcontext
from typing import Union, BinaryIO, TYPE_CHECKING, Type, Optional
Expand All @@ -22,17 +22,18 @@ def load_binary(
protocol: str = 'pickle-array',
compress: Optional[str] = None,
_show_progress: bool = False,
return_iterator: bool = False,
) -> 'T':
"""Load array elements from a LZ4-compressed binary file.

:param file: File or filename or serialized bytes where the data is stored.
:param protocol: protocol to use
:param compress: compress algorithm to use
:param _show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`

:param return_iterator: returns an iterator over the DocumentArray.
In case protocol is pickle the `Documents` are streamed from disk to save memory usage
:return: a DocumentArray object
"""

if isinstance(file, io.BufferedReader):
file_ctx = nullcontext(file)
elif isinstance(file, bytes):
Expand All @@ -41,37 +42,100 @@ def load_binary(
file_ctx = open(file, 'rb')
else:
raise ValueError(f'unsupported input {file!r}')
if return_iterator:
return cls._load_binary_stream(
file_ctx, protocol=protocol, compress=compress
)
else:
return cls._load_binary_all(file_ctx, protocol, compress, _show_progress)

@classmethod
def _load_binary_stream(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thin _load_binary_stream should leverage _load_binary_all by just giving a subset of bytes. I think like this we can leverage optimizations from both sides

cls: Type['T'], file_ctx: str, protocol=None, compress=None, show_progress=False
) -> 'T':

from .... import Document

if show_progress:
from rich.progress import track as _track

track = lambda x: _track(x, description='Deserializing')
else:
track = lambda x: x

with file_ctx as f:
version_numdocs_lendoc0 = f.read(9)
# 1 byte (uint8)
version = int.from_bytes(version_numdocs_lendoc0[0:1], 'big', signed=False)
# 8 bytes (uint64)
num_docs = int.from_bytes(version_numdocs_lendoc0[1:9], 'big', signed=False)

for _ in track(range(num_docs)):
# 4 bytes (uint32)
len_current_doc_in_bytes = int.from_bytes(
f.read(4), 'big', signed=False
)
yield Document.from_bytes(
f.read(len_current_doc_in_bytes),
protocol=protocol,
compress=compress,
)

@classmethod
def _load_binary_all(cls, file_ctx, protocol, compress, show_progress):
from .... import Document

with file_ctx as fp:
d = fp.read() if hasattr(fp, 'read') else fp

if protocol == 'pickle-array' or protocol == 'protobuf-array':
if get_compress_ctx(algorithm=compress) is not None:
d = decompress_bytes(d, algorithm=compress)
compress = None

if protocol == 'protobuf-array':
from ....proto.docarray_pb2 import DocumentArrayProto
if protocol == 'protobuf-array':
from ....proto.docarray_pb2 import DocumentArrayProto

dap = DocumentArrayProto()
dap.ParseFromString(d)

dap = DocumentArrayProto()
dap.ParseFromString(d)
return cls.from_protobuf(dap)
elif protocol == 'pickle-array':
return pickle.loads(d)

return cls.from_protobuf(dap)
elif protocol == 'pickle-array':
return pickle.loads(d)
# Binary format for streaming case
else:
# 1 byte (uint8)
version = int.from_bytes(d[0:1], 'big', signed=False)
# 8 bytes (uint64)
num_docs = int.from_bytes(d[1:9], 'big', signed=False)
if show_progress:
from rich.progress import track as _track

track = lambda x: _track(x, description='Deserializing')
else:
_len = len(random_uuid().bytes)
_binary_delimiter = d[:_len] # first get delimiter
if _show_progress:
from rich.progress import track as _track
track = lambda x: x

track = lambda x: _track(x, description='Deserializing')
else:
track = lambda x: x
return cls(
Document.from_bytes(od, protocol=protocol, compress=compress)
for od in track(d[_len:].split(_binary_delimiter))
# this 9 is version + num_docs bytes used
start_pos = 9
docs = []

for _ in track(range(num_docs)):
# 4 bytes (uint32)
len_current_doc_in_bytes = int.from_bytes(
d[start_pos : start_pos + 4], 'big', signed=False
)
start_doc_pos = start_pos + 4
end_doc_pos = start_doc_pos + len_current_doc_in_bytes
start_pos = end_doc_pos

# variable length bytes doc
doc = Document.from_bytes(
d[start_doc_pos:end_doc_pos], protocol=protocol, compress=compress
)
docs.append(doc)

return cls(docs)

@classmethod
def from_bytes(
Expand Down Expand Up @@ -130,8 +194,11 @@ def to_bytes(
:return: the binary serialization in bytes
"""

_binary_delimiter = random_uuid().bytes
compress_ctx = get_compress_ctx(compress, mode='wb')
if protocol == 'protobuf-array' or protocol == 'pickle-array':
compress_ctx = get_compress_ctx(compress, mode='wb')
else:
compress_ctx = None

with (_file_ctx or io.BytesIO()) as bf:
if compress_ctx is None:
# if compress do not support streaming then postpone the compress
Expand All @@ -141,22 +208,40 @@ def to_bytes(
f = compress_ctx(bf)
fc = f
compress = None

with fc:
if protocol == 'protobuf-array':
f.write(self.to_protobuf().SerializePartialToString())
elif protocol == 'pickle-array':
f.write(pickle.dumps(self))
else:
# Binary format for streaming case
if _show_progress:
from rich.progress import track as _track

track = lambda x: _track(x, description='Serializing')
else:
track = lambda x: x

# V1 DocArray streaming serialization format
# | 1 byte | 8 bytes | 4 bytes | variable | 4 bytes | variable ...

# 1 byte (uint8)
version_byte = b'\x01'
# 8 bytes (uint64)
num_docs_as_bytes = len(self).to_bytes(8, 'big', signed=False)
f.write(version_byte + num_docs_as_bytes)

for d in track(self):
f.write(_binary_delimiter)
f.write(d.to_bytes(protocol=protocol, compress=compress))
# 4 bytes (uint32)
doc_as_bytes = d.to_bytes(protocol=protocol, compress=compress)

# variable size bytes
len_doc_as_bytes = len(doc_as_bytes).to_bytes(
4, 'big', signed=False
)
f.write(len_doc_as_bytes + doc_as_bytes)

if not _file_ctx:
return bf.getvalue()

Expand Down
1 change: 1 addition & 0 deletions docarray/document/mixins/porting.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def to_bytes(
raise ValueError(
f'protocol={protocol} is not supported. Can be only `protobuf` or pickle protocols 0-5.'
)

return compress_bytes(bstr, algorithm=compress)

@classmethod
Expand Down
24 changes: 13 additions & 11 deletions docs/fundamentals/documentarray/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,23 @@ Depending on how you want to interpret the results, the figures above can be an

### Wire format of `pickle` and `protobuf`

When set `protocol=pickle` or `protobuf`, the result binary string looks like the following:
When set `protocol=pickle` or `protobuf`, the resulting bytes look like the following:

```text
-----------------------------------------------------------------------------------
| Delimiter | doc1.to_bytes() | Delimiter | doc2.to_bytes() | Delimiter | ...
-----------------------------------------------------------------------------------
| |
| |
| |
Fixed-length |
|
Variable-length
--------------------------------------------------------------------------------------------------------
| version | len(docs) | doc1_bytes | doc1.to_bytes() | doc2_bytes | doc2.to_bytes() ...
---------------------------------------------------------------------------------------------------------
| Fixed-length | Fixed-length | Fixed-length | Variable-length | Fixed-length | Variable-length ...
--------------------------------------------------------------------------------------------------------
| | | | | |
uint8 uint64 uint32 Variable-length ... ...

```

Here `Delimiter` is a 16-bytes separator such as `b'g\x81\xcc\x1c\x0f\x93L\xed\xa2\xb0s)\x9c\xf9\xf6\xf2'` used for setting the boundary of each Document's serialization. Given a `to_bytes(protocol='pickle/protobuf')` binary string, once we know the first 16 bytes, the boundary is clear. Consequently, one can leverage this format to stream Documents, drop, skip, or early-stop, etc.
Here `version` is a `uint8` that specifies the serialization version of the `DocumentArray` serialization format, followed by `len(docs)` which is a `uint64` that specifies the amount of serialized documents.
Afterwards, `doc1_bytes` describes how many bytes are used to serialize `doc1`, followed by `doc1.to_bytes()` which is the bytes data of the document itself.
The pattern `dock_bytes` and `dock.to_bytes` is repeated `len(docs)` times.


## From/to base64

Expand Down
22 changes: 21 additions & 1 deletion tests/unit/array/test_from_to_bytes.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import types

import numpy as np
import pytest
import tensorflow as tf
import torch
from scipy.sparse import csr_matrix, coo_matrix, bsr_matrix, csc_matrix

from docarray import DocumentArray
from docarray import DocumentArray, Document
from docarray.math.ndarray import to_numpy_array
from tests import random_docs

Expand Down Expand Up @@ -70,6 +72,24 @@ def test_save_bytes(target_da, protocol, compress, tmpfile):
DocumentArray.load_binary(fp, protocol=protocol, compress=compress)


# Note protocol = ['protobuf-array', 'pickle-array'] not supported with Document.from_bytes
@pytest.mark.parametrize('protocol', ['protobuf', 'pickle'])
@pytest.mark.parametrize(
'compress', ['lz4', 'bz2', 'lzma', 'gzip', 'zlib', 'gzib', None]
)
def test_save_bytes_stream(tmpfile, protocol, compress):
da = DocumentArray(
[Document(text='aaa'), Document(buffer=b'buffer'), Document(tags={'a': 'b'})]
)
da.save_binary(tmpfile, protocol=protocol, compress=compress)
da_reconstructed = DocumentArray.load_binary(
tmpfile, protocol=protocol, compress=compress, return_iterator=True
)
assert isinstance(da_reconstructed, types.GeneratorType)
for d, d_rec in zip(da, da_reconstructed):
assert d == d_rec


@pytest.mark.parametrize('target_da', [DocumentArray.empty(100), random_docs(100)])
def test_from_to_protobuf(target_da):
DocumentArray.from_protobuf(target_da.to_protobuf())
Expand Down