From fbd10a4a089ada38c320795b9aea2283410e7e67 Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Wed, 2 Mar 2022 11:51:23 +0100 Subject: [PATCH 1/3] feat(array): improve pull cache add pbar to map --- docarray/array/mixins/io/binary.py | 1 - docarray/array/mixins/io/pushpull.py | 37 +++++++++++++++++-- docarray/array/mixins/parallel.py | 34 ++++++++++++++++- docarray/array/mixins/plot.py | 11 ++++++ .../documentarray/serialization.md | 5 ++- tests/unit/array/mixins/test_parallel.py | 31 +++++++++++++--- 6 files changed, 106 insertions(+), 13 deletions(-) diff --git a/docarray/array/mixins/io/binary.py b/docarray/array/mixins/io/binary.py index 14d539e84d4..8edcb05ef64 100644 --- a/docarray/array/mixins/io/binary.py +++ b/docarray/array/mixins/io/binary.py @@ -7,7 +7,6 @@ from typing import Union, BinaryIO, TYPE_CHECKING, Type, Optional, Generator from ....helper import ( - __windows__, get_compress_ctx, decompress_bytes, protocol_and_compress_from_file_path, diff --git a/docarray/array/mixins/io/pushpull.py b/docarray/array/mixins/io/pushpull.py index c6b17460df8..098fbe43c59 100644 --- a/docarray/array/mixins/io/pushpull.py +++ b/docarray/array/mixins/io/pushpull.py @@ -104,14 +104,21 @@ def read(self, n=-1): @classmethod def pull( - cls: Type['T'], token: str, show_progress: bool = False, *args, **kwargs + cls: Type['T'], + token: str, + show_progress: bool = False, + local_cache: bool = False, + *args, + **kwargs, ) -> 'T': """Pulling a :class:`DocumentArray` from Jina Cloud Service to local. :param token: the upload token set during :meth:`.push` :param show_progress: if to show a progress bar on pulling + :param local_cache: store the downloaded DocumentArray to local folder :return: a :class:`DocumentArray` object """ + import requests url = f'{_get_cloud_api()}/v2/rpc/da.pull?token={token}' @@ -127,9 +134,27 @@ def pull( headers=get_request_header(), ) as r, progress: r.raise_for_status() + + _da_len = int(r.headers['Content-length']) + + if local_cache and os.path.exists(f'.cache/{token}'): + _cache_len = os.path.getsize(f'.cache/{token}') + if _cache_len == _da_len: + if show_progress: + progress.stop() + + return cls.load_binary( + f'.cache/{token}', + protocol='protobuf', + compress='gzip', + _show_progress=show_progress, + *args, + **kwargs, + ) + if show_progress: task_id = progress.add_task('download', start=False) - progress.update(task_id, total=int(r.headers['Content-length'])) + progress.update(task_id, total=int(_da_len)) with io.BytesIO() as f: chunk_size = 8192 if show_progress: @@ -139,8 +164,14 @@ def pull( if show_progress: progress.update(task_id, advance=len(chunk)) + if local_cache: + os.makedirs('.cache', exist_ok=True) + with open(f'.cache/{token}', 'wb') as fp: + fp.write(f.getbuffer()) + if show_progress: progress.stop() + return cls.from_bytes( f.getvalue(), protocol='protobuf', @@ -180,7 +211,7 @@ def _get_progressbar(show_progress): ) return Progress( - BarColumn(bar_width=None), + BarColumn(), "[progress.percentage]{task.percentage:>3.1f}%", "•", DownloadColumn(), diff --git a/docarray/array/mixins/parallel.py b/docarray/array/mixins/parallel.py index eaaf5d66815..44fb6121e0a 100644 --- a/docarray/array/mixins/parallel.py +++ b/docarray/array/mixins/parallel.py @@ -1,4 +1,5 @@ import sys +from math import ceil from types import LambdaType from typing import Callable, TYPE_CHECKING, Generator, Optional, overload, TypeVar @@ -18,6 +19,7 @@ def apply( func: Callable[['Document'], 'Document'], backend: str = 'thread', num_worker: Optional[int] = None, + show_progress: bool = False, ) -> 'T': """Apply each element in itself with ``func``, return itself after modified. @@ -33,6 +35,7 @@ def apply( and the original object do **not** share the same memory. :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. + :param show_progress: show a progress bar """ ... @@ -56,6 +59,7 @@ def map( func: Callable[['Document'], 'T'], backend: str = 'thread', num_worker: Optional[int] = None, + show_progress: bool = False, ) -> Generator['T', None, None]: """Return an iterator that applies function to every **element** of iterable in parallel, yielding the results. @@ -76,12 +80,22 @@ def map( and the original object do **not** share the same memory. :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. + :param show_progress: show a progress bar + :yield: anything return from ``func`` """ if _is_lambda_or_local_function(func) and backend == 'process': func = _globalize_lambda_function(func) + + if show_progress: + from rich.progress import track as _track + + track = lambda x: _track(x, total=len(self)) + else: + track = lambda x: x + with _get_pool(backend, num_worker) as p: - for x in p.imap(func, self): + for x in track(p.imap(func, self)): yield x @overload @@ -92,6 +106,7 @@ def apply_batch( backend: str = 'thread', num_worker: Optional[int] = None, shuffle: bool = False, + show_progress: bool = False, ) -> 'T': """Apply each element in itself with ``func``, return itself after modified. @@ -109,6 +124,8 @@ def apply_batch( :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. :param batch_size: Size of each generated batch (except the last one, which might be smaller, default: 32) :param shuffle: If set, shuffle the Documents before dividing into minibatches. + :param show_progress: show a progress bar + """ ... @@ -135,6 +152,7 @@ def map_batch( backend: str = 'thread', num_worker: Optional[int] = None, shuffle: bool = False, + show_progress: bool = False, ) -> Generator['T', None, None]: """Return an iterator that applies function to every **minibatch** of iterable in parallel, yielding the results. Each element in the returned iterator is :class:`DocumentArray`. @@ -158,13 +176,25 @@ def map_batch( and the original object do **not** share the same memory. :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. + :param show_progress: show a progress bar + :yield: anything return from ``func`` """ if _is_lambda_or_local_function(func) and backend == 'process': func = _globalize_lambda_function(func) + + if show_progress: + from rich.progress import track as _track + + track = lambda x: _track(x, total=ceil(len(self) / batch_size)) + else: + track = lambda x: x + with _get_pool(backend, num_worker) as p: - for x in p.imap(func, self.batch(batch_size=batch_size, shuffle=shuffle)): + for x in track( + p.imap(func, self.batch(batch_size=batch_size, shuffle=shuffle)) + ): yield x diff --git a/docarray/array/mixins/plot.py b/docarray/array/mixins/plot.py index ec778ad05c8..de96d49edfa 100644 --- a/docarray/array/mixins/plot.py +++ b/docarray/array/mixins/plot.py @@ -303,6 +303,7 @@ def plot_image_sprites( min_size: int = 16, channel_axis: int = -1, image_source: str = 'tensor', + skip_empty: bool = False, ) -> None: """Generate a sprite image for all image tensors in this DocumentArray-like object. @@ -314,6 +315,7 @@ def plot_image_sprites( :param min_size: the minimum size of the image :param channel_axis: the axis id of the color channel, ``-1`` indicates the color channel info at the last axis :param image_source: specify where the image comes from, can be ``uri`` or ``tensor``. empty tensor will fallback to uri + :param skip_empty: skip Document who has no .uri or .tensor. """ if not self: raise ValueError(f'{self!r} is empty') @@ -335,6 +337,15 @@ def plot_image_sprites( img_id = 0 try: for d in self: + + if not d.uri and d.tensor is None: + if skip_empty: + continue + else: + raise ValueError( + f'Document has neither `uri` nor `tensor`, can not be plotted' + ) + _d = copy.deepcopy(d) if image_source == 'uri' or ( diff --git a/docs/fundamentals/documentarray/serialization.md b/docs/fundamentals/documentarray/serialization.md index a5fc2d75b5f..9f74adb172b 100644 --- a/docs/fundamentals/documentarray/serialization.md +++ b/docs/fundamentals/documentarray/serialization.md @@ -366,7 +366,7 @@ Considering you are working on a GPU machine via Google Colab/Jupyter. After pre from docarray import DocumentArray da = DocumentArray(...) # heavylifting, processing, GPU task, ... -da.push(token='myda123') +da.push(token='myda123', show_progress=True) ``` ```{figure} images/da-push.png @@ -377,11 +377,12 @@ Then on your local laptop, simply pull it: ```python from docarray import DocumentArray -da = DocumentArray.pull(token='myda123') +da = DocumentArray.pull(token='myda123', show_progress=True) ``` Now you can continue the work at local, analyzing `da` or visualizing it. Your friends & colleagues who know the token `myda123` can also pull that DocumentArray. It's useful when you want to quickly share the results with your colleagues & friends. The maximum size of an upload is 4GB under the `protocol='protobuf'` and `compress='gzip'` setting. The lifetime of an upload is one week after its creation. +To avoid unnecessary download when upstream DocumentArray is unchanged, you can add `DocumentArray.pull(..., local_cache=True)`. diff --git a/tests/unit/array/mixins/test_parallel.py b/tests/unit/array/mixins/test_parallel.py index c3c1ad4d4aa..559a546de91 100644 --- a/tests/unit/array/mixins/test_parallel.py +++ b/tests/unit/array/mixins/test_parallel.py @@ -36,7 +36,10 @@ def foo_batch(da: DocumentArray): ) @pytest.mark.parametrize('backend', ['process', 'thread']) @pytest.mark.parametrize('num_worker', [1, 2, None]) -def test_parallel_map(pytestconfig, da_cls, config, backend, num_worker, start_storage): +@pytest.mark.parametrize('show_progress', [True, False]) +def test_parallel_map( + pytestconfig, da_cls, config, backend, num_worker, start_storage, show_progress +): if __name__ == '__main__': if config: @@ -47,7 +50,9 @@ def test_parallel_map(pytestconfig, da_cls, config, backend, num_worker, start_s da = da_cls.from_files(f'{pytestconfig.rootdir}/**/*.jpeg')[:10] # use a generator - for d in da.map(foo, backend, num_worker=num_worker): + for d in da.map( + foo, backend, num_worker=num_worker, show_progress=show_progress + ): assert d.tensor.shape == (3, 222, 222) if config: @@ -87,8 +92,16 @@ def test_parallel_map(pytestconfig, da_cls, config, backend, num_worker, start_s @pytest.mark.parametrize('backend', ['thread']) @pytest.mark.parametrize('num_worker', [1, 2, None]) @pytest.mark.parametrize('b_size', [1, 2, 256]) +@pytest.mark.parametrize('show_progress', [True, False]) def test_parallel_map_batch( - pytestconfig, da_cls, config, backend, num_worker, b_size, start_storage + pytestconfig, + da_cls, + config, + backend, + num_worker, + b_size, + start_storage, + show_progress, ): if __name__ == '__main__': @@ -101,7 +114,11 @@ def test_parallel_map_batch( # use a generator for _da in da.map_batch( - foo_batch, batch_size=b_size, backend=backend, num_worker=num_worker + foo_batch, + batch_size=b_size, + backend=backend, + num_worker=num_worker, + show_progress=True, ): for d in _da: assert d.tensor.shape == (3, 222, 222) @@ -116,7 +133,11 @@ def test_parallel_map_batch( # use as list, here the caveat is when using process backend you can not modify thing in-place list( da.map_batch( - foo_batch, batch_size=b_size, backend=backend, num_worker=num_worker + foo_batch, + batch_size=b_size, + backend=backend, + num_worker=num_worker, + show_progress=True, ) ) if backend == 'thread': From b7fcc775b6e59dc000d3d3954ddafc7cfe40d234 Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Wed, 2 Mar 2022 12:09:27 +0100 Subject: [PATCH 2/3] feat(array): improve pull cache add pbar to map --- tests/unit/array/mixins/test_pushpull.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/array/mixins/test_pushpull.py b/tests/unit/array/mixins/test_pushpull.py index 7bf2042fa3a..0a432d01506 100644 --- a/tests/unit/array/mixins/test_pushpull.py +++ b/tests/unit/array/mixins/test_pushpull.py @@ -27,6 +27,7 @@ def json(self): class DownloadMockResponse: def __init__(self, status_code: int = 200): self.status_code = status_code + self.headers = {'Content-length': 1} def raise_for_status(self): pass From e5864116612f42ec6d471e8e77396196aeea2c7b Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Wed, 2 Mar 2022 12:09:54 +0100 Subject: [PATCH 3/3] feat(array): improve pull cache add pbar to map --- tests/unit/array/mixins/test_pushpull.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/array/mixins/test_pushpull.py b/tests/unit/array/mixins/test_pushpull.py index 0a432d01506..ffcf541557f 100644 --- a/tests/unit/array/mixins/test_pushpull.py +++ b/tests/unit/array/mixins/test_pushpull.py @@ -8,6 +8,7 @@ class PushMockResponse: def __init__(self, status_code: int = 200): self.status_code = status_code + self.headers = {'Content-length': 1} def json(self): return {'code': self.status_code} @@ -16,6 +17,7 @@ def json(self): class PullMockResponse: def __init__(self, status_code: int = 200): self.status_code = status_code + self.headers = {'Content-length': 1} def json(self): return {