-
Notifications
You must be signed in to change notification settings - Fork 2.2k
fix(core): fix request/response missmatch while prefetching #2843
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
4495257
fix(tests): storm client test case
numb3r3 22199d3
fix(tests): add prefetch parameter
numb3r3 46a370e
fix(tests): add integration test for concurrent clients
numb3r3 7750a93
fix(tests): update thread queueu
numb3r3 bf0b3fe
fix(tests): use reraise to capture sub-thread assert
numb3r3 066880a
fix(core): match responses correctly while prefetching
jacobowitz 9240735
fix(core): add prefetch test
jacobowitz c16bfce
fix(core): create loop if not existing
jacobowitz 1395dc1
fix(core): fix prefetch close
jacobowitz 8ae4598
fix(core): address comments
jacobowitz e55a2f7
fix(core): remove is running flag
jacobowitz e7d9b60
fix(core): add test client context manager
jacobowitz 0719981
fix(core): use composition for grpc prefetch
jacobowitz 3199498
Merge branch 'fix-test-storm-clients' into fix-concurrent-client-requ…
jacobowitz 7be47e7
fix(core): delete manual storm test app
jacobowitz 39f6ccf
fix(core): address comments
jacobowitz 6c116eb
fix(core): dont suppress cancel
jacobowitz c12ab7c
fix(core): dont wait for canceled task
jacobowitz 6d90e34
fix(core): wait for prefetcher to be closed
jacobowitz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
52 changes: 52 additions & 0 deletions
52
tests/integration/concurrent_clients/test_concurrent_clients.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import pytest | ||
|
||
from jina import Flow, Executor, Client, requests, DocumentArray, Document | ||
from jina.types.request import Response | ||
import threading | ||
import random | ||
import time | ||
from functools import partial | ||
|
||
|
||
class MyExecutor(Executor): | ||
@requests(on='/ping') | ||
def ping(self, docs: DocumentArray, **kwargs): | ||
time.sleep(0.1 * random.random()) | ||
|
||
|
||
@pytest.mark.parametrize('protocal', ['http', 'grpc']) | ||
@pytest.mark.parametrize('parallel', [10]) | ||
@pytest.mark.parametrize('polling', ['ANY', 'ALL']) | ||
@pytest.mark.parametrize('prefetch', [1, 10]) | ||
@pytest.mark.parametrize('concurrent', [15]) | ||
def test_concurrent_clients(concurrent, protocal, parallel, polling, prefetch, reraise): | ||
def pong(peer_hash, resp: Response): | ||
with reraise: | ||
for d in resp.docs: | ||
assert d.text == peer_hash | ||
|
||
def peer_client(port, protocal, peer_hash): | ||
c = Client(protocol=protocal, port_expose=port) | ||
for _ in range(5): | ||
c.post( | ||
'/ping', Document(text=peer_hash), on_done=lambda r: pong(peer_hash, r) | ||
) | ||
|
||
f = Flow(protocol=protocal, prefetch=prefetch).add( | ||
uses=MyExecutor, parallel=parallel, polling=polling | ||
) | ||
|
||
with f: | ||
port_expose = f.port_expose | ||
|
||
thread_pool = [] | ||
for peer_id in range(concurrent): | ||
t = threading.Thread( | ||
target=partial(peer_client, port_expose, protocal, str(peer_id)), | ||
daemon=True, | ||
) | ||
t.start() | ||
thread_pool.append(t) | ||
|
||
for t in thread_pool: | ||
t.join() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,19 @@ | ||
import pytest | ||
from fastapi.testclient import TestClient | ||
|
||
from jina.logging.logger import JinaLogger | ||
from jina.peapods.runtimes.gateway.http import get_fastapi_app | ||
from jina.parsers import set_gateway_parser | ||
import pytest | ||
from jina.peapods.runtimes.gateway.http import get_fastapi_app | ||
|
||
|
||
@pytest.mark.parametrize('p', [['--default-swagger-ui'], []]) | ||
def test_custom_swagger(p): | ||
args = set_gateway_parser().parse_args(p) | ||
logger = JinaLogger('') | ||
app = get_fastapi_app(args, logger) | ||
assert any('/docs' in r.path for r in app.routes) | ||
assert any('/openapi.json' in r.path for r in app.routes) | ||
# The TestClient is needed here as a context manager to generate the shutdown event correctly | ||
# otherwise the app can hang as it is not cleaned up correctly | ||
# see https://fastapi.tiangolo.com/advanced/testing-events/ | ||
with TestClient(app) as client: | ||
assert any('/docs' in r.path for r in app.routes) | ||
assert any('/openapi.json' in r.path for r in app.routes) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
import asyncio | ||
from asyncio import Future | ||
|
||
import pytest | ||
|
||
from jina.helper import ArgNamespace, random_identity | ||
from jina.parsers import set_gateway_parser | ||
from jina.peapods.runtimes.gateway.prefetch import PrefetchCaller | ||
from jina.proto import jina_pb2 | ||
|
||
|
||
def _generate_request(): | ||
req = jina_pb2.RequestProto() | ||
req.request_id = random_identity() | ||
req.data.docs.add() | ||
return req | ||
|
||
|
||
class ZmqletMock: | ||
def __init__(self): | ||
self.sent_future = Future() | ||
self.received_event = asyncio.Event() | ||
|
||
async def recv_message(self, **kwargs): | ||
msg = await self.sent_future | ||
self.sent_future = Future() | ||
self.received_event.set() | ||
return msg | ||
|
||
async def send_message(self, message): | ||
self.sent_future.set_result(_generate_request()) | ||
await self.received_event.wait() | ||
self.sent_future.set_result(message.response) | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_concurrent_requests(): | ||
args = ArgNamespace.kwargs2namespace({}, set_gateway_parser()) | ||
mock_zmqlet = ZmqletMock() | ||
servicer = PrefetchCaller(args, mock_zmqlet) | ||
|
||
request = _generate_request() | ||
|
||
response = servicer.send(iter([request])) | ||
|
||
async for r in response: | ||
assert r == request | ||
jacobowitz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
await servicer.close() |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.