8000 test: added a test for docmatrix in the tail pea by maximilianwerk · Pull Request #2537 · jina-ai/serve · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

test: added a test for docmatrix in the tail pea #2537

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 2, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
94 changes: 94 additions & 0 deletions tests/integration/v2_api/test_docs_matrix_tail_pea.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import pytest
from collections import OrderedDict
from jina import Document, DocumentArray, Executor, Flow, requests
from jina.types.document.multimodal import MultimodalDocument
from jina.types.arrays.chunk import ChunkArray


class DummyExecutor(Executor):
def __init__(self, mode=None, *args, **kwargs):
super().__init__(*args, **kwargs)
if mode:
self._mode = str(mode)

@requests
def do_something(self, docs, **kwargs):
for doc in docs:
chunks = ChunkArray(
list(filter(lambda d: d.modality == self._mode, doc.chunks)), doc
)
assert chunks[0].content == self._mode
assert len(chunks) == 1
doc.chunks = chunks


class MatchMerger(Executor):
@requests
def merge(self, docs_matrix, **kwargs):
results = OrderedDict()
for docs in docs_matrix:
for doc in docs:
if doc.id in results:
results[doc.id].matches.extend(doc.matches)
else:
results[doc.id] = doc
return DocumentArray(results.values())


class ChunkMerger(Executor):
@requests
def merge(self, docs_matrix, **kwargs):
results = OrderedDict()
for docs in docs_matrix:
for doc in docs:
if doc.id in results:
results[doc.id].chunks.extend(doc.chunks)
else:
results[doc.id] = doc
return DocumentArray(results.values())


@pytest.mark.timeout(5)
@pytest.mark.parametrize('num_replicas, num_shards', [(1, 1), (2, 2)])
def test_sharding_tail_pea(num_replicas, num_shards):
"""TODO(Maximilian): Make (1, 2) and (2, 1) also workable"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opened an issue for this #2547


f = Flow().add(
uses=DummyExecutor,
replicas=num_replicas,
shards=num_shards,
uses_after=MatchMerger,
)
with f:
results = f.post(
on='/search',
inputs=Document(matches=[Document()]),
return_results=True,
)
assert len(results[0].docs[0].matches) == num_shards


def test_merging_head_pea():
def multimodal_generator():
for i in range(0, 5):
document = MultimodalDocument(modality_content_map={'1': '1', '2': '2'})
yield document

f = (
Flow()
.add(uses={'jtype': 'DummyExecutor', 'with': {'mode': '1'}}, name='pod1')
.add(
uses={'jtype': 'DummyExecutor', 'with': {'mode': '2'}},
name='pod2',
needs='gateway',
)
.add(uses_before=ChunkMerger, name='pod3', needs=['pod1', 'pod2'])
)
with f:
results = f.post(
on='/search',
inputs=multimodal_generator(),
return_results=True,
)
assert len(results[0].docs[0].chunks) == 2
assert len(results[0].docs) == 5
0