From 02a0dc59100a27f2897083b07522591c1901ec1d Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Fri, 3 Dec 2021 10:35:04 +0100 Subject: [PATCH 1/3] feat(types): support lambda function in parallel mixin --- .../document/documentarray-api.md | 10 ++------ jina/types/arrays/mixins/parallel.py | 24 ++++++++++++++++++- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/docs/fundamentals/document/documentarray-api.md b/docs/fundamentals/document/documentarray-api.md index 05346859f0ee6..847677ef38911 100644 --- a/docs/fundamentals/document/documentarray-api.md +++ b/docs/fundamentals/document/documentarray-api.md @@ -860,7 +860,7 @@ map-thread ... map-thread takes 10 seconds (10.28s) foo-loop ... foo-loop takes 18 seconds (18.52s) ``` -One can see big improvement with `.map()`. +One can see a significant speedup with `.map()`. ```{admonition} When to choose process or thread backend? :class: important @@ -875,18 +875,12 @@ If you only modify elements in-place, and do not need return values, you can wri ```python da = DocumentArray(...) -list(da.map(func)) +da.apply(func) ``` - -This follows the same convention as you using Python built-in `map()`. - -You can also use `.apply()` which always returns a `DocumentArray`. - ```` - ## Sampling `DocumentArray` provides a `.sample` function that samples `k` elements without replacement. It accepts two parameters, `k` diff --git a/jina/types/arrays/mixins/parallel.py b/jina/types/arrays/mixins/parallel.py index 55100352cd962..7bc4de19718d0 100644 --- a/jina/types/arrays/mixins/parallel.py +++ b/jina/types/arrays/mixins/parallel.py @@ -1,7 +1,9 @@ +import sys +from types import LambdaType from typing import Callable, TYPE_CHECKING, Generator, Optional, overload, TypeVar if TYPE_CHECKING: - from ....helper import T + from ....helper import T, random_identity from ...document import Document from .... import DocumentArray @@ -76,6 +78,8 @@ def map( :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. :yield: anything return from ``func`` """ + if _is_lambda_function(func) and backend == 'process': + func = _globalize_lambda_function(func) with _get_pool(backend, num_worker) as p: for x in p.imap(func, self): yield x @@ -154,6 +158,9 @@ def map_batch( :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. :yield: anything return from ``func`` """ + + if _is_lambda_function(func) and backend == 'process': + func = _globalize_lambda_function(func) with _get_pool(backend, num_worker) as p: for x in p.imap(func, self.batch(batch_size=batch_size, shuffle=shuffle)): yield x @@ -172,3 +179,18 @@ def _get_pool(backend, num_worker): raise ValueError( f'`backend` must be either `process` or `thread`, receiving {backend}' ) + + +def _is_lambda_function(func): + return isinstance(func, LambdaType) and func.__name__ == '' + + +def _globalize_lambda_function(func): + def result(*args, **kwargs): + return func(*args, **kwargs) + + from ....helper import random_identity + + result.__name__ = result.__qualname__ = random_identity() + setattr(sys.modules[result.__module__], result.__name__, result) + return result From 725255f71551fb52a31ea7e0ae612e8792a644e7 Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Fri, 3 Dec 2021 12:37:09 +0100 Subject: [PATCH 2/3] feat(types): support lambda function in parallel mixin --- extra-requirements.txt | 2 +- jina/types/arrays/mixins/match.py | 48 +++++++++++++++---- jina/types/arrays/mixins/parallel.py | 10 ++-- jina/types/document/mixins/sugar.py | 10 ++-- .../unit/types/arrays/mixins/test_parallel.py | 15 +++++- 5 files changed, 66 insertions(+), 19 deletions(-) diff --git a/extra-requirements.txt b/extra-requirements.txt index e25d1099a1acb..998278e442f91 100644 --- a/extra-requirements.txt +++ b/extra-requirements.txt @@ -78,6 +78,6 @@ pytest-lazy-fixture: test datasets: cicd av: cicd trimesh: cicd -paddlepaddle: cicd +paddlepaddle==2.2.0: cicd onnx: cicd onnxruntime: cicd \ No newline at end of file diff --git a/jina/types/arrays/mixins/match.py b/jina/types/arrays/mixins/match.py index d0fd74591ecbf..73d5b7b8b04f9 100644 --- a/jina/types/arrays/mixins/match.py +++ b/jina/types/arrays/mixins/match.py @@ -32,6 +32,7 @@ def match( only_id: bool = False, use_scipy: bool = False, device: str = 'cpu', + num_worker: Optional[int] = None, **kwargs, ) -> None: """Compute embedding based nearest neighbour in `another` for each Document in `self`, @@ -54,9 +55,8 @@ def match( the min distance will be rescaled to `a`, the max distance will be rescaled to `b` all values will be rescaled into range `[a, b]`. :param metric_name: if provided, then match result will be marked with this string. - :param batch_size: if provided, then `darray` is loaded in chunks of, at most, batch_size elements. This option - will be slower but more memory efficient. Specialy indicated if `darray` is a big - DocumentArrayMemmap. + :param batch_size: if provided, then ``darray`` is loaded in batches, where each of them is at most ``batch_size`` + elements. When `darray` is big, this can significantly speedup the computation. :param traversal_ldarray: DEPRECATED. if set, then matching is applied along the `traversal_path` of the left-hand ``DocumentArray``. :param traversal_rdarray: DEPRECATED. if set, then matching is applied along the `traversal_path` of the @@ -68,6 +68,11 @@ def match( :param use_scipy: if set, use ``scipy`` as the computation backend. Note, ``scipy`` does not support distance on sparse matrix. :param device: the computational device for ``.match()``, can be either `cpu` or `cuda`. + :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. + + .. note:: + This argument is only effective when ``batch_size`` is set. + :param kwargs: other kwargs. """ if limit is not None: @@ -136,7 +141,7 @@ def match( if batch_size: dist, idx = lhv._match_online( - rhv, cdist, _limit, normalization, metric_name, batch_size + rhv, cdist, _limit, normalization, metric_name, batch_size, num_worker ) else: dist, idx = lhv._match(rhv, cdist, _limit, normalization, metric_name) @@ -204,7 +209,14 @@ def _match(self, darray, cdist, limit, normalization, metric_name): return dist, idx def _match_online( - self, darray, cdist, limit, normalization, metric_name, batch_size + self, + darray, + cdist, + limit, + normalization, + metric_name, + batch_size, + num_worker, ): """ Computes the matches between self and `darray` loading `darray` into main memory in chunks of size `batch_size`. @@ -218,6 +230,7 @@ def _match_online( all values will be rescaled into range `[a, b]`. :param batch_size: length of the chunks loaded into memory from darray. :param metric_name: if provided, then match result will be marked with this string. + :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. :return: distances and indices """ @@ -227,8 +240,9 @@ def _match_online( idx = 0 top_dists = np.inf * np.ones((n_x, limit)) top_inds = np.zeros((n_x, limit), dtype=int) - for ld in darray.batch(batch_size=batch_size): - y_batch = ld.embeddings + + def _get_dist(da: 'DocumentArray'): + y_batch = da.embeddings distances = cdist(x_mat, y_batch, metric_name) dists, inds = top_k(distances, limit, descending=False) @@ -236,8 +250,24 @@ def _match_online( if isinstance(normalization, (tuple, list)) and normalization is not None: dists = minmax_normalize(dists, normalization) - inds = idx + inds - idx += y_batch.shape[0] + return dists, inds, y_batch.shape[0] + + if num_worker is None or num_worker > 1: + # notice that all most all computations (regardless the framework) are conducted in C + # hence there is no worry on Python GIL and the backend can be safely put to `thread` to + # save unnecessary data passing. This in fact gives a huge boost on the performance. + _gen = darray.map_batch( + _get_dist, + batch_size=batch_size, + backend='thread', + num_worker=num_worker, + ) + else: + _gen = (_get_dist(b) for b in darray.batch(batch_size=batch_size)) + + for (dists, inds, _bs) in _gen: + inds += idx + idx += _bs top_dists, top_inds = update_rows_x_mat_best( top_dists, top_inds, dists, inds, limit ) diff --git a/jina/types/arrays/mixins/parallel.py b/jina/types/arrays/mixins/parallel.py index 7bc4de19718d0..760beb75508be 100644 --- a/jina/types/arrays/mixins/parallel.py +++ b/jina/types/arrays/mixins/parallel.py @@ -78,7 +78,7 @@ def map( :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. :yield: anything return from ``func`` """ - if _is_lambda_function(func) and backend == 'process': + if _is_lambda_or_local_function(func) and backend == 'process': func = _globalize_lambda_function(func) with _get_pool(backend, num_worker) as p: for x in p.imap(func, self): @@ -159,7 +159,7 @@ def map_batch( :yield: anything return from ``func`` """ - if _is_lambda_function(func) and backend == 'process': + if _is_lambda_or_local_function(func) and backend == 'process': func = _globalize_lambda_function(func) with _get_pool(backend, num_worker) as p: for x in p.imap(func, self.batch(batch_size=batch_size, shuffle=shuffle)): @@ -181,8 +181,10 @@ def _get_pool(backend, num_worker): ) -def _is_lambda_function(func): - return isinstance(func, LambdaType) and func.__name__ == '' +def _is_lambda_or_local_function(func): + return (isinstance(func, LambdaType) and func.__name__ == '') or ( + '' in func.__qualname__ + ) def _globalize_lambda_function(func): diff --git a/jina/types/document/mixins/sugar.py b/jina/types/document/mixins/sugar.py index f5a14780dec00..4037d705d7b2c 100644 --- a/jina/types/document/mixins/sugar.py +++ b/jina/types/document/mixins/sugar.py @@ -28,6 +28,7 @@ def match( exclude_self: bool = False, only_id: bool = False, use_scipy: bool = False, + num_worker: Optional[int] = None, ) -> 'T': """Matching the current Document against a set of Documents. @@ -45,14 +46,17 @@ def match( the min distance will be rescaled to `a`, the max distance will be rescaled to `b` all values will be rescaled into range `[a, b]`. :param metric_name: if provided, then match result will be marked with this string. - :param batch_size: if provided, then `darray` is loaded in chunks of, at most, batch_size elements. This option - will be slower but more memory efficient. Specialy indicated if `darray` is a big - DocumentArrayMemmap. + :param batch_size: if provided, then ``darray`` is loaded in batches, where each of them is at most ``batch_size`` + elements. When `darray` is big, this can significantly speedup the computation. :param exclude_self: if set, Documents in ``darray`` with same ``id`` as the left-hand values will not be considered as matches. :param only_id: if set, then returning matches will only contain ``id`` :param use_scipy: if set, use ``scipy`` as the computation backend. Note, ``scipy`` does not support distance on sparse matrix. + :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. + + .. note:: + This argument is only effective when ``batch_size`` is set. """ ... diff --git a/tests/unit/types/arrays/mixins/test_parallel.py b/tests/unit/types/arrays/mixins/test_parallel.py index ff7b7b0e2d4b4..ec6943445227e 100644 --- a/tests/unit/types/arrays/mixins/test_parallel.py +++ b/tests/unit/types/arrays/mixins/test_parallel.py @@ -1,6 +1,6 @@ import pytest -from jina import DocumentArrayMemmap, DocumentArray, Document +from jina import DocumentArray, Document, DocumentArrayMemmap def foo(d: Document): @@ -20,7 +20,7 @@ def foo_batch(da: DocumentArray): @pytest.mark.parametrize('da_cls', [DocumentArray, DocumentArrayMemmap]) @pytest.mark.parametrize('backend', ['process', 'thread']) -@pytest.mark.parametrize('num_worker', [1, 2]) +@pytest.mark.parametrize('num_worker', [1, 2, None]) def test_parallel_map(pytestconfig, da_cls, backend, num_worker): da = da_cls.from_files(f'{pytestconfig.rootdir}/docs/**/*.png')[:10] @@ -71,3 +71,14 @@ def test_parallel_map_batch(pytestconfig, da_cls, backend, num_worker, b_size): da_new = da.apply_batch(foo_batch, batch_size=b_size) assert da_new.blobs.shape == (len(da_new), 3, 222, 222) + + +@pytest.mark.parametrize('da_cls', [DocumentArray, DocumentArrayMemmap]) +def test_map_lambda(pytestconfig, da_cls): + da = da_cls.from_files(f'{pytestconfig.rootdir}/docs/**/*.png')[:10] + + for d in da: + assert d.blob is None + + for d in da.map(lambda x: x.load_uri_to_image_blob()): + assert d.blob is not None From b920e77db49ce97ce6ecb4cc25492114fb443633 Mon Sep 17 00:00:00 2001 From: Jina Dev Bot Date: Fri, 3 Dec 2021 11:38:45 +0000 Subject: [PATCH 3/3] style: fix overload and cli autocomplete --- jina/resources/extra-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jina/resources/extra-requirements.txt b/jina/resources/extra-requirements.txt index e25d1099a1acb..998278e442f91 100644 --- a/jina/resources/extra-requirements.txt +++ b/jina/resources/extra-requirements.txt @@ -78,6 +78,6 @@ pytest-lazy-fixture: test datasets: cicd av: cicd trimesh: cicd -paddlepaddle: cicd +paddlepaddle==2.2.0: cicd onnx: cicd onnxruntime: cicd \ No newline at end of file