Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
a034228
refactor: move streaming serialization into separate method
Jackmin801 Feb 27, 2023
104d540
refactor: add binary io like protocol definition
Jackmin801 Feb 27, 2023
4c532fa
feat: ported push pull to JAC
Jackmin801 Feb 27, 2023
a3844c6
fix: protocol is not in 3.7 typing
Jackmin801 Feb 27, 2023
e37ba7c
fix: make mypy happy
Jackmin801 Feb 27, 2023
f7a1686
fix: patch missing waterfall
Jackmin801 Feb 27, 2023
5ae831e
refactor: jit import backends
Jackmin801 Feb 27, 2023
9703d6b
feat: implement cache in jinaai pull
Jackmin801 Feb 27, 2023
b304421
fix: add hubble dependency to jina group
Jackmin801 Feb 27, 2023
6fd41b4
refactor: better division of concerns
Jackmin801 Feb 27, 2023
7b2d0c4
feat: add concept of namespace
Jackmin801 Feb 27, 2023
920f993
fix: ignore missing hubble stubs
Jackmin801 Feb 28, 2023
e1811be
Merge branch 'feat-rewrite-v2' into feat-1134-pushpull
Jackmin801 Feb 28, 2023
1a1fe6a
feat: streaming protocol stubs
Jackmin801 Feb 28, 2023
ee57e33
refactor: make more general buffered caching reader
Jackmin801 Feb 28, 2023
6f42d40
test: add tests for hubble pushpull
Jackmin801 Feb 28, 2023
3fa8d2c
test: add tests for file backend
Jackmin801 Mar 2, 2023
7cb5e5c
fix: remove hubble dependency from jina group
Jackmin801 Mar 2, 2023
8c0c92b
Merge branch 'feat-rewrite-v2' into feat-1134-pushpull
Jackmin801 Mar 2, 2023
1fa54d4
feat: implement push pull for local filesystem
Jackmin801 Mar 2, 2023
4ff4dd7
test: test concurrent pushes and pulls in file protocol
Jackmin801 Mar 2, 2023
38ede73
fix: resolve concurrent pushes and pulls correctly
Jackmin801 Mar 2, 2023
7ccf0e5
Merge branch 'feat-rewrite-v2' into feat-1134-pushpull
Jackmin801 Mar 7, 2023
18b3b9a
fix: rename text to textdoc
Jackmin801 Mar 7, 2023
1df2a21
feat: added some logging
Jackmin801 Mar 7, 2023
a66d801
Merge branch 'feat-rewrite-v2' into feat-1134-pushpull
Jackmin801 Mar 7, 2023
46a7397
test: s3 tests
Jackmin801 Mar 8, 2023
67a89c7
feat: s3 pushpull
Jackmin801 Mar 8, 2023
eb0e52b
fix: add smart open dependency
Jackmin801 Mar 10, 2023
cf78c6c
fix: add smart opens silly python bound
Jackmin801 Mar 10, 2023
42079ad
test: update hubble tests (failing)
Jackmin801 Mar 10, 2023
d91dde0
fix: fix delete return in hubble pushpull
Jackmin801 Mar 10, 2023
f66ee30
Revert "fix: add smart open dependency"
Jackmin801 Mar 10, 2023
1d1d2ee
fix: add hubble and smart open dependencies
Jackmin801 Mar 10, 2023
9687400
fix: mypy fixes
Jackmin801 Mar 10, 2023
b25f4e7
Merge branch 'feat-rewrite-v2' into feat-1134-pushpull
Jackmin801 Mar 10, 2023
62493bf
ci: allow tests to see jina auth token
Jackmin801 Mar 10, 2023
670d439
Merge branch 'feat-rewrite-v2' into feat-1134-pushpull
Jackmin801 Mar 13, 2023
8122ca2
feat: add progress bars for streaming
Jackmin801 Mar 13, 2023
83e65b9
style: blacken
Jackmin801 Mar 13, 2023
cb4569f
feat: buffer writes to s3
Jackmin801 Mar 13, 2023
449415a
fix: mypy no like sequence
Jackmin801 Mar 13, 2023
4fdc226
fix: make progress bar quieter when disabled
Jackmin801 Mar 13, 2023
b8d9ca6
test: skip failing tests
Jackmin801 Mar 13, 2023
1817902
feat: add tables when listing
Jackmin801 Mar 13, 2023
3b0a708
test: add jina auth token to uncaped test
Jackmin801 Mar 13, 2023
83e5046
test: mock s3 tests with minio container
Jackmin801 Mar 13, 2023
c9d3df7
fix: silly error that cost me 2 hours of life
Jackmin801 Mar 13, 2023
358b96f
test: use tolerance ratio in file tests
Jackmin801 Mar 13, 2023
f53dc86
feat: add caching to s3 pull
Jackmin801 Mar 13, 2023
751c4b7
feat: add log messages for unused parameters
Jackmin801 Mar 13, 2023
d2e9b7a
refactor: take out unneeded buffering
Jackmin801 Mar 14, 2023
e473ca8
feat: pick fastest protocol compression configuration for s3
Jackmin801 Mar 14, 2023
c8c8cb6
test: bump tolerance ratio for s3 test
Jackmin801 Mar 14, 2023
15731f7
refactor: reduce code duplication
Jackmin801 Mar 14, 2023
d343801
refactor: put reader chunk size constant at top of file
Jackmin801 Mar 14, 2023
0cd46fd
test: reduce reader chunk size for memory tests
Jackmin801 Mar 14, 2023
a9cb282
fix: multipart uploads get stuck frequently
Jackmin801 Mar 14, 2023
ba99182
docs: add docstrings to mixin and file backend
Jackmin801 Mar 14, 2023
ac3a0b8
docs: add docstring for s3 and hubble backends
Jackmin801 Mar 14, 2023
3db9fbd
Merge branch 'feat-rewrite-v2' into feat-1134-pushpull
Jackmin801 Mar 15, 2023
290bf61
test: remove unused test
Jackmin801 Mar 15, 2023
46ed106
Merge branch 'feat-rewrite-v2' into feat-1134-pushpull
Jackmin801 Mar 17, 2023
0a1e19c
refactor: use literal in protocol
Jackmin801 Mar 17, 2023
e012274
refactor: protocols dont need to be inherited
Jackmin801 Mar 17, 2023
5469adb
fix: add make mypy happy with the literals
Jackmin801 Mar 17, 2023
a305019
fix: literals not in 3.7
Jackmin801 Mar 20, 2023
e360ccc
refactor: move mixin out of init file
Jackmin801 Mar 20, 2023
91b38f0
refactor: move cache path resolution to utils
Jackmin801 Mar 20, 2023
1efc33a
feat: cache path is only evaluated once
Jackmin801 Mar 20, 2023
b6c439e
refactor: loading backends makes more sense as debug log
Jackmin801 Mar 20, 2023
57f2484
tests: add slow and internet marks
Jackmin801 Mar 20, 2023
8827c07
refactor: pin image tag
Jackmin801 Mar 20, 2023
44cb4ac
refactor: use abc instead of protocol for typing backends
Jackmin801 Mar 20, 2023
cfb7a1e
fix: revert - add hubble and smart open dependencies
Jackmin801 Mar 20, 2023
06d5aa4
Merge branch 'feat-rewrite-v2' into feat-1134-pushpull
Jackmin801 Mar 20, 2023
4997803
fix: add hubble and aws dependencies
Jackmin801 Mar 21, 2023
a63525a
Merge branch 'feat-rewrite-v2' into feat-1134-pushpull
Jackmin801 Mar 21, 2023
a12160f
refactor: change all push pull mixin methods to class methods
Jackmin801 Mar 21, 2023
0b261ad
fix: misstyped class method self reference
Jackmin801 Mar 21, 2023
3e615f5
refactor: rename pushpull to docstore and use more classmethods
Jackmin801 Mar 22, 2023
bcef4e7
Merge branch 'feat-rewrite-v2' into feat-1134-pushpull
Jackmin801 Mar 22, 2023
45b2ccc
refactor: separate remote backend implementations from mixin
Jackmin801 Mar 22, 2023
4639cfc
fix: missed import refactor
Jackmin801 Mar 22, 2023
92648d1
Merge branch 'feat-rewrite-v2' into feat-1134-pushpull
Jackmin801 Mar 23, 2023
aa4c3c4
Merge branch 'feat-rewrite-v2' into feat-1134-pushpull
Jackmin801 Mar 23, 2023
529d5d0
refactor: change submodule name to store
Jackmin801 Mar 24, 2023
1cde6a4
Merge branch 'feat-rewrite-v2' into feat-1134-pushpull
Jackmin801 Mar 24, 2023
a05fe79
refactor: remove list and delete from mixin
Jackmin801 Mar 24, 2023
b225e85
Merge branch 'feat-rewrite-v2' into feat-1134-pushpull
Jackmin801 Mar 24, 2023
5c57fda
tests: clear all the garbage in ci account
Jackmin801 Mar 24, 2023
82e3132
tests: skip test that is broken on ci
Jackmin801 Mar 24, 2023
7cd875b
refactor: standardize naming to jac
Jackmin801 Mar 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ jobs:
run: |
poetry run pytest -m "not (tensorflow or benchmark or index)" ${{ matrix.test-path }}
timeout-minutes: 30
# env:
# JINA_AUTH_TOKEN: "${{ secrets.JINA_AUTH_TOKEN }}"
env:
JINA_AUTH_TOKEN: "${{ secrets.JINA_AUTH_TOKEN }}"
# - name: Check codecov file
# id: check_files
# uses: andstor/file-existence-action@v1
Expand Down Expand Up @@ -164,6 +164,8 @@ jobs:
run: |
poetry run pytest -m "not (tensorflow or benchmark)" ${{ matrix.test-path }}
timeout-minutes: 30
env:
JINA_AUTH_TOKEN: "${{ secrets.JINA_AUTH_TOKEN }}"


docarray-test-proto3:
Expand Down
3 changes: 2 additions & 1 deletion docarray/array/array/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from docarray.array.abstract_array import AnyDocumentArray
from docarray.array.array.io import IOMixinArray
from docarray.array.array.pushpull import PushPullMixin
from docarray.array.array.sequence_indexing_mixin import (
IndexingSequenceMixin,
IndexIterType,
Expand Down Expand Up @@ -57,7 +58,7 @@ def _delegate_meth(self, *args, **kwargs):


class DocumentArray(
IndexingSequenceMixin[T_doc], IOMixinArray, AnyDocumentArray[T_doc]
IndexingSequenceMixin[T_doc], PushPullMixin, IOMixinArray, AnyDocumentArray[T_doc]
):
Comment on lines +61 to 62
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about making Da directly a PushPullMixin. I think this quite polute the namespace of DocumentArray :thinking_face: At the same time I don't have an alternative in mind

"""
DocumentArray is a container of Documents.
Expand Down
128 changes: 82 additions & 46 deletions docarray/array/array/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Dict,
Generator,
Iterable,
Iterator,
List,
Optional,
Tuple,
Expand Down Expand Up @@ -177,37 +178,60 @@ def _write_bytes(
elif protocol == 'pickle-array':
f.write(pickle.dumps(self))
elif protocol in SINGLE_PROTOCOLS:
from rich import filesize

from docarray.utils.progress_bar import _get_progressbar

pbar, t = _get_progressbar(
'Serializing', disable=not show_progress, total=len(self)
)

f.write(self._stream_header)

with pbar:
_total_size = 0
pbar.start_task(t)
for doc in self:
doc_bytes = doc.to_bytes(protocol=protocol, compress=compress)
len_doc_as_bytes = len(doc_bytes).to_bytes(
4, 'big', signed=False
)
all_bytes = len_doc_as_bytes + doc_bytes
f.write(all_bytes)
_total_size += len(all_bytes)
pbar.update(
t,
advance=1,
total_size=str(filesize.decimal(_total_size)),
f.write(
b''.join(
self.to_binary_stream(
protocol=protocol,
compress=compress,
show_progress=show_progress,
)
)
)
else:
raise ValueError(
f'protocol={protocol} is not supported. Can be only {ALLOWED_PROTOCOLS}.'
)

def to_binary_stream(
self,
protocol: str = 'protobuf',
compress: Optional[str] = None,
show_progress: bool = False,
) -> Iterator[bytes]:
from rich import filesize

if show_progress:
from docarray.utils.progress_bar import _get_progressbar

pbar, t = _get_progressbar(
'Serializing', disable=not show_progress, total=len(self)
)
else:
from contextlib import nullcontext

pbar = nullcontext()

yield self._stream_header

with pbar:
if show_progress:
_total_size = 0
pbar.start_task(t)
for doc in self:
doc_bytes = doc.to_bytes(protocol=protocol, compress=compress)
len_doc_as_bytes = len(doc_bytes).to_bytes(4, 'big', signed=False)
all_bytes = len_doc_as_bytes + doc_bytes

yield all_bytes

if show_progress:
_total_size += len(all_bytes)
pbar.update(
t,
advance=1,
total_size=str(filesize.decimal(_total_size)),
)

def to_bytes(
self,
protocol: str = 'protobuf-array',
Expand Down Expand Up @@ -584,7 +608,7 @@ def _load_binary_all(
def _load_binary_stream(
cls: Type[T],
file_ctx: ContextManager[io.BufferedReader],
protocol: Optional[str] = None,
protocol: str = 'protobuf',
compress: Optional[str] = None,
show_progress: bool = False,
) -> Generator['BaseDocument', None, None]:
Expand All @@ -598,37 +622,43 @@ def _load_binary_stream(

from rich import filesize

from docarray import BaseDocument
from docarray.utils.progress_bar import _get_progressbar

with file_ctx as f:
version_numdocs_lendoc0 = f.read(9)
# 1 byte (uint8)
# 8 bytes (uint64)
num_docs = int.from_bytes(version_numdocs_lendoc0[1:9], 'big', signed=False)

pbar, t = _get_progressbar(
'Deserializing', disable=not show_progress, total=num_docs
)
if show_progress:
from docarray.utils.progress_bar import _get_progressbar

pbar, t = _get_progressbar(
'Deserializing', disable=not show_progress, total=num_docs
)
else:
from contextlib import nullcontext

pbar = nullcontext()

with pbar:
_total_size = 0
pbar.start_task(t)
if show_progress:
_total_size = 0
pbar.start_task(t)
for _ in range(num_docs):
# 4 bytes (uint32)
len_current_doc_in_bytes = int.from_bytes(
f.read(4), 'big', signed=False
)
_total_size += len_current_doc_in_bytes
load_protocol: str = protocol or 'protobuf'
yield BaseDocument.from_bytes(
load_protocol: str = protocol
yield cls.document_type.from_bytes(
f.read(len_current_doc_in_bytes),
protocol=load_protocol,
compress=compress,
)
pbar.update(
t, advance=1, total_size=str(filesize.decimal(_total_size))
)
if show_progress:
_total_size += len_current_doc_in_bytes
pbar.update(
t, advance=1, total_size=str(filesize.decimal(_total_size))
)

@classmethod
def load_binary(
Expand Down Expand Up @@ -670,12 +700,18 @@ def load_binary(
else:
raise FileNotFoundError(f'cannot find file {file}')
if streaming:
return cls._load_binary_stream(
file_ctx,
protocol=load_protocol,
compress=load_compress,
show_progress=show_progress,
)
if load_protocol not in SINGLE_PROTOCOLS:
raise ValueError(
f'`streaming` is only available when using {" or ".join(map(lambda x: f"`{x}`", SINGLE_PROTOCOLS))} as protocol, '
f'got {load_protocol}'
)
else:
return cls._load_binary_stream(
file_ctx,
protocol=load_protocol,
compress=load_compress,
show_progress=show_progress,
)
else:
return cls._load_binary_all(
file_ctx, load_protocol, load_compress, show_progress
Expand Down
Loading