From a9bdf859618b7df9eb32b2de42f6eb0969860cfb Mon Sep 17 00:00:00 2001 From: Joan Fontanals Martinez Date: Tue, 15 Feb 2022 13:35:10 +0100 Subject: [PATCH 1/4] fix: return responses --- jina/clients/mixin.py | 2 +- jina/parsers/client.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/jina/clients/mixin.py b/jina/clients/mixin.py index 6ded6a0456449..a12f90ea5803b 100644 --- a/jina/clients/mixin.py +++ b/jina/clients/mixin.py @@ -77,7 +77,7 @@ async def _get_results(*args, **kwargs): result.append(resp) if return_results: - if c.args.results_as_docarray: + if not c.args.return_responses: docs = [r.data.docs for r in result] if len(docs) < 1: return docs diff --git a/jina/parsers/client.py b/jina/parsers/client.py index 9d1894f680b43..fd50a08583b9e 100644 --- a/jina/parsers/client.py +++ b/jina/parsers/client.py @@ -32,8 +32,8 @@ def mixin_client_features_parser(parser): ) parser.add_argument( - '--results-as-docarray', + '--return-responses', action='store_true', default=False, - help="If set, return results as DocArray instead of Request.", + help="If set, return results as List of Requests instead of a reduced DocArray.", ) From 58fb0e13855200ffbe32c01191c9ae5c80355d51 Mon Sep 17 00:00:00 2001 From: Jina Dev Bot Date: Tue, 15 Feb 2022 12:38:50 +0000 Subject: [PATCH 2/4] style: fix overload and cli autocomplete --- cli/autocomplete.py | 2 +- jina/clients/__init__.py | 4 ++-- jina/orchestrate/flow/base.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cli/autocomplete.py b/cli/autocomplete.py index afbfbba607528..3b5d9ab91b67a 100644 --- a/cli/autocomplete.py +++ b/cli/autocomplete.py @@ -315,7 +315,7 @@ '--port', '--https', '--asyncio', - '--results-as-docarray', + '--return-responses', '--protocol', ], 'export-api': ['--help', '--yaml-path', '--json-path', '--schema-path'], diff --git a/jina/clients/__init__.py b/jina/clients/__init__.py index 4021634b1e95c..9f084a93fa296 100644 --- a/jina/clients/__init__.py +++ b/jina/clients/__init__.py @@ -22,7 +22,7 @@ def Client( port: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, - results_as_docarray: Optional[bool] = False, + return_responses: Optional[bool] = False, **kwargs ) -> Union[ 'AsyncWebSocketClient', @@ -40,7 +40,7 @@ def Client( :param port: The port of the Gateway, which the client should connect to. :param protocol: Communication protocol between server and client. :param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy - :param results_as_docarray: If set, return results as DocArray instead of Request. + :param return_responses: If set, return results as List of Requests instead of a reduced DocArray. :return: the new Client object .. # noqa: DAR202 diff --git a/jina/orchestrate/flow/base.py b/jina/orchestrate/flow/base.py index 35a0bff2590ac..4f55f02883275 100644 --- a/jina/orchestrate/flow/base.py +++ b/jina/orchestrate/flow/base.py @@ -97,7 +97,7 @@ def __init__( port: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, - results_as_docarray: Optional[bool] = False, + return_responses: Optional[bool] = False, **kwargs, ): """Create a Flow. Flow is how Jina streamlines and scales Executors. This overloaded method provides arguments from `jina client` CLI. @@ -108,7 +108,7 @@ def __init__( :param port: The port of the Gateway, which the client should connect to. :param protocol: Communication protocol between server and client. :param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy - :param results_as_docarray: If set, return results as DocArray instead of Request. + :param return_responses: If set, return results as List of Requests instead of a reduced DocArray. .. # noqa: DAR202 .. # noqa: DAR101 From 6dac0f8d097e70319f7bb835531cda2fb0669d30 Mon Sep 17 00:00:00 2001 From: Joan Fontanals Martinez Date: Tue, 15 Feb 2022 14:36:35 +0100 Subject: [PATCH 3/4] test: add return_responses to True in Client in test --- .../unit/api/endpoints/partial/test_flow.py | 2 +- .../test_remote_executors.py | 4 +-- .../test_dir_structures/test_remote_flows.py | 2 +- .../test_integration.py | 4 +-- .../test_integration.py | 2 +- .../test_remote_flows/test_remote_flow.py | 26 ++++++++++++++++--- .../test_remote_flows/test_remote_flow_env.py | 6 +++-- .../test_remote_flow_scale.py | 16 +++++++++--- .../test_remote_peas/test_remote_peas.py | 10 ++++--- .../test_rolling_update.py | 18 +++++++------ .../test_remote_executors.py | 8 +++--- .../test_topologies/test_topologies.py | 1 + .../test_topologies_docker.py | 1 + .../test_workspaces/test_cache_validate.py | 13 ++++++---- .../test_workspaces/test_custom_dockerfile.py | 2 +- .../test_workspaces/test_remote_workspaces.py | 12 ++++----- tests/docker_compose/test_docker_compose.py | 1 + .../test_concurrent_clients.py | 2 +- .../test_container_get_args.py | 2 +- tests/integration/crud/test_crud.py | 12 ++++----- .../deployments/test_deployment.py | 14 +++++----- .../gateway_clients/test_clients_gateways.py | 2 +- .../gateway_clients/test_streaming.py | 8 +++--- .../high_order_matches/test_document.py | 4 +-- .../test_inspect_deployments_flow.py | 8 +++--- .../test_hanging_termination.py | 2 +- .../container/test_override_config_params.py | 4 +-- .../worker/test_override_config_params.py | 26 +++++++++++++------ .../test_specific_params.py | 2 +- tests/integration/pods/container/test_pod.py | 4 ++- tests/integration/pods/test_pod.py | 16 +++++++----- tests/integration/reduce/test_reduce.py | 26 ++++++++++++++----- .../rolling_update/test_rolling_update.py | 14 ++++++---- tests/integration/runtimes/test_runtimes.py | 14 +++++----- tests/integration/scale/test_scale.py | 8 +++--- .../streaming/test_clients_streaming.py | 4 +-- .../v2_api/test_docs_matrix_tail_pea.py | 4 +-- tests/integration/v2_api/test_func_routing.py | 26 +++++++++---------- .../v2_api/test_override_requests.py | 4 +-- tests/k8s/test_graceful_request_handling.py | 1 + tests/k8s/test_k8s.py | 1 + tests/unit/clients/python/test_client.py | 21 +++++++++++---- .../test_flow_docarray_return.py | 5 ++-- tests/unit/test_gateway.py | 4 ++- 44 files changed, 225 insertions(+), 141 deletions(-) diff --git a/tests/daemon/unit/api/endpoints/partial/test_flow.py b/tests/daemon/unit/api/endpoints/partial/test_flow.py index 04e3272a16a7d..8fcb3cd8ea734 100644 --- a/tests/daemon/unit/api/endpoints/partial/test_flow.py +++ b/tests/daemon/unit/api/endpoints/partial/test_flow.py @@ -21,7 +21,7 @@ def test_flow_api(monkeypatch, partial_flow_client): get_response = partial_flow_client.get(api) - endpoint_responses = Client(port=56789).post( + endpoint_responses = Client(port=56789, return_responses=True).post( on='/any_endpoint', inputs=Document(), return_results=True ) diff --git a/tests/distributed/test_dir_structures/test_remote_executors.py b/tests/distributed/test_dir_structures/test_remote_executors.py index 8590aed2855c2..09ccbb171898b 100644 --- a/tests/distributed/test_dir_structures/test_remote_executors.py +++ b/tests/distributed/test_dir_structures/test_remote_executors.py @@ -93,7 +93,7 @@ def test_remote_executor_via_config_yaml(upload_files, config_yml): host=CLOUD_HOST, uses=config_yml, upload_files=upload_files ) with f: - resp = Client(port=exposed_port).post( + resp = Client(port=exposed_port, return_responses=True).post( on='/', inputs=Document(text=config_yml), return_results=True, @@ -126,7 +126,7 @@ def test_remote_executor_via_pymodules(upload_files, uses, py_modules): upload_files=upload_files, ) with f: - resp = Client(port=exposed_port).post( + resp = Client(port=exposed_port, return_responses=True).post( on='/', inputs=Document(text=py_modules), return_results=True, diff --git a/tests/distributed/test_dir_structures/test_remote_flows.py b/tests/distributed/test_dir_structures/test_remote_flows.py index cb7a170acc731..31eee7545f740 100644 --- a/tests/distributed/test_dir_structures/test_remote_flows.py +++ b/tests/distributed/test_dir_structures/test_remote_flows.py @@ -39,7 +39,7 @@ def test_remote_flow_with_directory(directory, filename, mul): filename=filename, envs={'PORT_EXPOSE': 12345}, ): - resp = Client(port=12345).post( + resp = Client(port=12345, return_responses=True).post( on='/', inputs=Document(text=directory), return_results=True, diff --git a/tests/distributed/test_join_local_from_remote/test_integration.py b/tests/distributed/test_join_local_from_remote/test_integration.py index 36cd55bf028f4..dd4f04a12f44f 100644 --- a/tests/distributed/test_join_local_from_remote/test_integration.py +++ b/tests/distributed/test_join_local_from_remote/test_integration.py @@ -30,13 +30,13 @@ def doc_to_index(): @pytest.fixture def client(): - return Client(host='localhost', port=45678) + return Client(host='localhost', port=45678, return_responses=True) @pytest.fixture def grpc_client(): args = set_client_cli_parser().parse_args( - ['--host', 'localhost', '--port', '45678'] + ['--host', 'localhost', '--port', '45678', '--return-responses'] ) return GRPCClient(args) diff --git a/tests/distributed/test_local_flow_use_remote_executor/test_integration.py b/tests/distributed/test_local_flow_use_remote_executor/test_integration.py index e48d14b6358f7..76f1926767975 100644 --- a/tests/distributed/test_local_flow_use_remote_executor/test_integration.py +++ b/tests/distributed/test_local_flow_use_remote_executor/test_integration.py @@ -44,7 +44,7 @@ def test_local_flow_use_external_executor( local_flow, documents_to_index, docker_compose ): with local_flow as f: - responses = Client(port=exposed_port).index( + responses = Client(port=exposed_port, return_responses=True).index( inputs=documents_to_index, return_results=True, request_size=100 ) assert len(responses) == 2 diff --git a/tests/distributed/test_remote_flows/test_remote_flow.py b/tests/distributed/test_remote_flows/test_remote_flow.py index 425a5815c9f5d..9eb6d5b8604e7 100644 --- a/tests/distributed/test_remote_flows/test_remote_flow.py +++ b/tests/distributed/test_remote_flows/test_remote_flow.py @@ -79,7 +79,9 @@ def test_remote_jinad_flow(jinad_client, flow_envs): remote_flow_args = remote_flow_args['arguments']['object']['arguments'] assert remote_flow_args['port_expose'] == MINI_FLOW1_PORT assert remote_flow_args['protocol'] == PROTOCOL - resp = Client(host=HOST, port=MINI_FLOW1_PORT, protocol='http').post( + resp = Client( + host=HOST, port=MINI_FLOW1_PORT, protocol='http', return_responses=True + ).post( on='/', inputs=[Document(id=str(idx)) for idx in range(NUM_DOCS)], return_results=True, @@ -109,7 +111,13 @@ async def test_remote_jinad_flow_async(async_jinad_client, flow_envs): remote_flow_args = remote_flow_args['arguments']['object']['arguments'] assert remote_flow_args['port_expose'] == MINI_FLOW1_PORT assert remote_flow_args['protocol'] == PROTOCOL - resp = Client(host=HOST, port=MINI_FLOW1_PORT, protocol='http', asyncio=True).post( + resp = Client( + host=HOST, + port=MINI_FLOW1_PORT, + protocol='http', + asyncio=True, + return_responses=True, + ).post( on='/', inputs=[Document(id=str(idx)) for idx in range(NUM_DOCS)], return_results=True, @@ -143,7 +151,13 @@ async def test_remote_jinad_flow_get_delete_all(async_jinad_client): # get all flows remote_flow_args = await async_jinad_client.flows.list() assert len(remote_flow_args.keys()) == 2 - resp = Client(host=HOST, port=MINI_FLOW2_PORT, protocol='http', asyncio=True).post( + resp = Client( + host=HOST, + port=MINI_FLOW2_PORT, + protocol='http', + asyncio=True, + return_responses=True, + ).post( on='/', inputs=[Document(id=str(idx)) for idx in range(NUM_DOCS)], return_results=True, @@ -176,7 +190,11 @@ async def test_jinad_flow_container_runtime(async_jinad_client, executor_image): remote_flow_args = await async_jinad_client.flows.get(DaemonID(flow_id)) assert remote_flow_args resp = Client( - host=HOST, port=CONTAINER_FLOW_PORT, protocol='http', asyncio=True + host=HOST, + port=CONTAINER_FLOW_PORT, + protocol='http', + asyncio=True, + return_responses=True, ).post( on='/', inputs=[Document(id=str(idx)) for idx in range(NUM_DOCS)], diff --git a/tests/distributed/test_remote_flows/test_remote_flow_env.py b/tests/distributed/test_remote_flows/test_remote_flow_env.py index 66b90d4c65ea6..b3568e95506d6 100644 --- a/tests/distributed/test_remote_flows/test_remote_flow_env.py +++ b/tests/distributed/test_remote_flows/test_remote_flow_env.py @@ -47,7 +47,7 @@ def test_remote_flow_local_executors(replicas, jinad_client): }, jinad_client=jinad_client, ): - resp = Client(host=HOST, port=FLOW_PORT).post( + resp = Client(host=HOST, port=FLOW_PORT, return_responses=True).post( on='/', inputs=[Document(id=str(idx)) for idx in range(NUM_DOCS)], return_results=True, @@ -69,7 +69,9 @@ def test_port_expose_env_var(port_expose, func, jinad_client): envs={'PORT_EXPOSE': port_expose, 'FUNC': func}, ) - r = Client(host=HOST, port=port_expose, protocol='http').post( + r = Client( + host=HOST, port=port_expose, protocol='http', return_responses=True + ).post( on='/blah', inputs=(Document(text=f'text {i}') for i in range(2)), return_results=True, diff --git a/tests/distributed/test_remote_flows/test_remote_flow_scale.py b/tests/distributed/test_remote_flows/test_remote_flow_scale.py index 003742b2b8bdd..0e19c2967f568 100644 --- a/tests/distributed/test_remote_flows/test_remote_flow_scale.py +++ b/tests/distributed/test_remote_flows/test_remote_flow_scale.py @@ -64,7 +64,9 @@ def test_scale_remote_flow(docker_image_built, jinad_client, deployment_params): ) assert flow_id - ret1 = Client(host=HOST, port=FLOW_PORT, protocol='http', asyncio=False).index( + ret1 = Client( + host=HOST, port=FLOW_PORT, protocol='http', asyncio=False, return_responses=True + ).index( inputs=DocumentArray([Document() for _ in range(200)]), return_results=True, request_size=10, @@ -81,7 +83,9 @@ def test_scale_remote_flow(docker_image_built, jinad_client, deployment_params): id=flow_id, deployment_name=SCALE_EXECUTOR, replicas=scale_to ) - ret2 = Client(host=HOST, port=FLOW_PORT, protocol='http', asyncio=False).index( + ret2 = Client( + host=HOST, port=FLOW_PORT, protocol='http', asyncio=False, return_responses=True + ).index( inputs=DocumentArray([Document() for _ in range(200)]), return_results=True, request_size=10, @@ -117,7 +121,9 @@ async def test_scale_remote_flow_async( ) assert flow_id - ret1 = Client(host=HOST, port=FLOW_PORT, protocol='http', asyncio=True).index( + ret1 = Client( + host=HOST, port=FLOW_PORT, protocol='http', asyncio=True, return_responses=True + ).index( inputs=DocumentArray([Document() for _ in range(1000)]), return_results=True, request_size=10, @@ -134,7 +140,9 @@ async def test_scale_remote_flow_async( id=flow_id, deployment_name=SCALE_EXECUTOR, replicas=scale_to ) - ret2 = Client(host=HOST, port=FLOW_PORT, protocol='http', asyncio=True).index( + ret2 = Client( + host=HOST, port=FLOW_PORT, protocol='http', asyncio=True, return_responses=True + ).index( inputs=DocumentArray([Document() for _ in range(1000)]), return_results=True, request_size=10, diff --git a/tests/distributed/test_remote_peas/test_remote_peas.py b/tests/distributed/test_remote_peas/test_remote_peas.py index 8e4e9efb9bb01..7064bb5ff31a1 100644 --- a/tests/distributed/test_remote_peas/test_remote_peas.py +++ b/tests/distributed/test_remote_peas/test_remote_peas.py @@ -230,7 +230,9 @@ async def test_pseudo_remote_pods_topologies(gateway, head, worker): ) # send requests to the gateway - c = Client(host='127.0.0.1', port=port_expose, asyncio=True) + c = Client( + host='127.0.0.1', port=port_expose, asyncio=True, return_responses=True + ) responses = c.post( '/', inputs=async_inputs, request_size=1, return_results=True ) @@ -295,7 +297,7 @@ async def test_pseudo_remote_pods_shards(gateway, head, worker, polling): await asyncio.sleep(1.0) - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -363,7 +365,7 @@ async def test_pseudo_remote_pods_replicas(gateway, head, worker): await asyncio.sleep(1.0) - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -483,7 +485,7 @@ async def test_pseudo_remote_pods_executor( await asyncio.sleep(1.0) - c = Client(host=HOST, port=port_expose, asyncio=True) + c = Client(host=HOST, port=port_expose, asyncio=True, return_responses=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: diff --git a/tests/distributed/test_rolling_update_container_runtime/test_rolling_update.py b/tests/distributed/test_rolling_update_container_runtime/test_rolling_update.py index 7408581f67959..c2584fe696b11 100644 --- a/tests/distributed/test_rolling_update_container_runtime/test_rolling_update.py +++ b/tests/distributed/test_rolling_update_container_runtime/test_rolling_update.py @@ -67,18 +67,18 @@ def test_dump_dbms_remote(executor_images, docker_compose): dbms_flow_id, query_flow_id, workspace_id = _create_flows() # check that there are no matches in Query Flow - r = Client(host=HOST, port=REST_PORT_QUERY, protocol='http').search( - inputs=[doc for doc in docs[:nr_search]], return_results=True - ) + r = Client( + host=HOST, port=REST_PORT_QUERY, protocol='http', return_responses=True + ).search(inputs=[doc for doc in docs[:nr_search]], return_results=True) assert r[0].data.docs[0].matches is None or len(r[0].data.docs[0].matches) == 0 # index on DBMS flow - Client(host=HOST, port=REST_PORT_DBMS, protocol='http').index( - inputs=docs, return_results=True - ) + Client( + host=HOST, port=REST_PORT_DBMS, protocol='http', return_responses=True + ).index(inputs=docs, return_results=True) # dump data for DBMS flow - Client(host=HOST, port=REST_PORT_DBMS, protocol='http').post( + Client(host=HOST, port=REST_PORT_DBMS, protocol='http', return_responses=True).post( on='/dump', parameters={'shards': SHARDS, 'dump_path': DUMP_PATH}, target_executor='indexer_dbms', @@ -97,7 +97,9 @@ def test_dump_dbms_remote(executor_images, docker_compose): ) # validate that there are matches now - r = Client(host=HOST, port=REST_PORT_QUERY, protocol='http').search( + r = Client( + host=HOST, port=REST_PORT_QUERY, protocol='http', return_responses=True + ).search( inputs=[doc for doc in docs[:nr_search]], return_results=True, parameters={'top_k': 10}, diff --git a/tests/distributed/test_scale_remote_executors/test_remote_executors.py b/tests/distributed/test_scale_remote_executors/test_remote_executors.py index c98bc9b40a17d..4922449914fd6 100644 --- a/tests/distributed/test_scale_remote_executors/test_remote_executors.py +++ b/tests/distributed/test_scale_remote_executors/test_remote_executors.py @@ -69,13 +69,13 @@ def remote_flow_with_runtime(request): def test_scale_success(remote_flow_with_runtime: Flow, deployment_params): num_replicas, scale_to, shards = deployment_params with remote_flow_with_runtime as f: - ret1 = Client(port=exposed_port).index( + ret1 = Client(port=exposed_port, return_responses=True).index( inputs=DocumentArray([Document() for _ in range(200)]), return_results=True, request_size=10, ) f.scale(deployment_name='executor', replicas=scale_to) - ret2 = Client(port=exposed_port).index( + ret2 = Client(port=exposed_port, return_responses=True).index( inputs=DocumentArray([Document() for _ in range(200)]), return_results=True, request_size=10, @@ -114,7 +114,7 @@ def test_scale_with_concurrent_client( remote_flow_with_runtime: Flow, deployment_params, protocol ): def peer_client(port, protocol, peer_hash, queue): - rv = Client(protocol=protocol, port=port).index( + rv = Client(protocol=protocol, port=port, return_responses=True).index( [Document(text=peer_hash) for _ in range(NUM_DOCS_SENT_BY_CLIENTS)], request_size=5, return_results=True, @@ -145,7 +145,7 @@ def peer_client(port, protocol, peer_hash, queue): for t in thread_pool: t.join() - c = Client(protocol=protocol, port=port_expose) + c = Client(protocol=protocol, port=port_expose, return_responses=True) rv = c.index( [Document() for _ in range(5)], request_size=1, return_results=True ) diff --git a/tests/distributed/test_topologies/test_topologies.py b/tests/distributed/test_topologies/test_topologies.py index 3050de3ee33b6..210a9a3b750ee 100644 --- a/tests/distributed/test_topologies/test_topologies.py +++ b/tests/distributed/test_topologies/test_topologies.py @@ -176,6 +176,7 @@ def test_remote_flow_local_executors(mocker, replicas): host=__default_host__, port=args['port_expose'], protocol=args['protocol'], + return_responses=True, ).post( on='/', inputs=( diff --git a/tests/distributed/test_topologies_docker/test_topologies_docker.py b/tests/distributed/test_topologies_docker/test_topologies_docker.py index 0527594a4ac75..6ee7aa9852b4a 100644 --- a/tests/distributed/test_topologies_docker/test_topologies_docker.py +++ b/tests/distributed/test_topologies_docker/test_topologies_docker.py @@ -150,6 +150,7 @@ def test_remote_flow_containerized_executors(docker_image, mocker): host=__default_host__, port=args['port_expose'], protocol=args['protocol'], + return_responses=True, ).post( on='/', inputs=( diff --git a/tests/distributed/test_workspaces/test_cache_validate.py b/tests/distributed/test_workspaces/test_cache_validate.py index d94fdde6c5f85..302d2eb0a312e 100644 --- a/tests/distributed/test_workspaces/test_cache_validate.py +++ b/tests/distributed/test_workspaces/test_cache_validate.py @@ -17,9 +17,12 @@ def RemoteFlow(workspace_id): workspace_id=workspace_id, filename='flow_cache_validator.yml' ) args = client.flows.get(flow_id)['arguments']['object']['arguments'] - yield Client(host=HOST, port=args['port_expose'], protocol=args['protocol']).post( - on='/', inputs=[Document()], show_progress=True, return_results=True - ) + yield Client( + host=HOST, + port=args['port_expose'], + protocol=args['protocol'], + return_responses=True, + ).post(on='/', inputs=[Document()], show_progress=True, return_results=True) assert client.flows.delete(flow_id) @@ -67,7 +70,7 @@ def test_cache_validate_remote_executor(): workspace_id=workspace_id, ) with f: - response = Client(port=exposed_port).post( + response = Client(port=exposed_port, return_responses=True).post( on='/', inputs=[Document()], show_progress=True, return_results=True ) assert not response[0].data.docs[0].tags['exists'] @@ -81,7 +84,7 @@ def test_cache_validate_remote_executor(): workspace_id=workspace_id, ) with f: - response = Client(port=exposed_port).post( + response = Client(port=exposed_port, return_responses=True).post( on='/', inputs=[Document()], show_progress=True, return_results=True ) assert response[0].data.docs[0].tags['exists'] diff --git a/tests/distributed/test_workspaces/test_custom_dockerfile.py b/tests/distributed/test_workspaces/test_custom_dockerfile.py index 22166fea302dd..462d829386b91 100644 --- a/tests/distributed/test_workspaces/test_custom_dockerfile.py +++ b/tests/distributed/test_workspaces/test_custom_dockerfile.py @@ -17,7 +17,7 @@ def test_custom_dockerfile(): host='localhost:8000', ) with f: - c = Client(port=exposed_port) + c = Client(port=exposed_port, return_responses=True) c.index( inputs=( Document(text=f'{i}', embedding=np.random.rand(2, 3)) for i in range(5) diff --git a/tests/distributed/test_workspaces/test_remote_workspaces.py b/tests/distributed/test_workspaces/test_remote_workspaces.py index 4b931170a91cc..33c5617fac91f 100644 --- a/tests/distributed/test_workspaces/test_remote_workspaces.py +++ b/tests/distributed/test_workspaces/test_remote_workspaces.py @@ -46,7 +46,7 @@ def test_upload_via_pymodule(replicas): .add() ) with f: - responses = Client(port=exposed_port).index( + responses = Client(port=exposed_port, return_responses=True).index( inputs=( Document(tensor=np.random.random([1, 100])) for _ in range(NUM_DOCS) ), @@ -72,7 +72,7 @@ def test_upload_via_yaml(replicas): .add() ) with f: - responses = Client(port=exposed_port).index( + responses = Client(port=exposed_port, return_responses=True).index( inputs=( Document(tensor=np.random.random([1, 100])) for _ in range(NUM_DOCS) ), @@ -107,7 +107,7 @@ def test_upload_multiple_workspaces(replicas): ) ) with f: - responses = Client(port=exposed_port).index( + responses = Client(port=exposed_port, return_responses=True).index( inputs=( Document(tensor=np.random.random([1, 100])) for _ in range(NUM_DOCS) ), @@ -186,12 +186,12 @@ async def gen_docs(): return async for resp in Client( - asyncio=True, host=HOST, port=42860, show_progress=True + asyncio=True, host=HOST, port=42860, show_progress=True, return_responses=True ).post(on='/index', inputs=gen_docs): pass async for resp in Client( - asyncio=True, host=HOST, port=42860, show_progress=True + asyncio=True, host=HOST, port=42860, show_progress=True, return_responses=True ).post( on='/search', inputs=Document(tags={'key': 'first', 'value': 's'}), @@ -231,7 +231,7 @@ def test_upload_simple_non_standard_rootworkspace(docker_compose): .add() ) with f: - responses = Client(port=exposed_port).index( + responses = Client(port=exposed_port, return_responses=True).index( inputs=( Document(tensor=np.random.random([1, 100])) for _ in range(NUM_DOCS) ), diff --git a/tests/docker_compose/test_docker_compose.py b/tests/docker_compose/test_docker_compose.py index 73d9b8383b995..e85f9ef6969dd 100644 --- a/tests/docker_compose/test_docker_compose.py +++ b/tests/docker_compose/test_docker_compose.py @@ -29,6 +29,7 @@ async def run_test(flow, endpoint, num_docs=10, request_size=10): client_kwargs = dict( host='localhost', port=flow.port_expose, + return_responses=True, asyncio=True, ) client_kwargs.update(flow._common_kwargs) diff --git a/tests/integration/concurrent_clients/test_concurrent_clients.py b/tests/integration/concurrent_clients/test_concurrent_clients.py index 7608218a5ff10..37e7781078765 100644 --- a/tests/integration/concurrent_clients/test_concurrent_clients.py +++ b/tests/integration/concurrent_clients/test_concurrent_clients.py @@ -28,7 +28,7 @@ def pong(peer_hash, queue, resp: Response): queue.put((peer_hash, d.text)) def peer_client(port, protocol, peer_hash, queue): - c = Client(protocol=protocol, port=port) + c = Client(protocol=protocol, port=port, return_responses=True) for _ in range(NUM_REQUESTS): c.post( '/ping', diff --git a/tests/integration/container_runtime_args/test_container_get_args.py b/tests/integration/container_runtime_args/test_container_get_args.py index 8fa3c84ee1c8b..531113ec43e8f 100644 --- a/tests/integration/container_runtime_args/test_container_get_args.py +++ b/tests/integration/container_runtime_args/test_container_get_args.py @@ -35,7 +35,7 @@ def test_containerruntime_args(docker_image_built, shards, replicas): polling='ANY', ) with f: - ret1 = Client(port=exposed_port).index( + ret1 = Client(return_responses=True, port=exposed_port).index( inputs=DocumentArray([Document() for _ in range(2000)]), return_results=True, request_size=10, diff --git a/tests/integration/crud/test_crud.py b/tests/integration/crud/test_crud.py index ad39f2332f363..3649a41efb2a3 100644 --- a/tests/integration/crud/test_crud.py +++ b/tests/integration/crud/test_crud.py @@ -36,7 +36,7 @@ def test_crud(tmpdir, rest): os.environ['WORKSPACE'] = str(tmpdir) with Flow.load_config('flow.yml') as f: - c = Client(port=f.port_expose) + c = Client(port=f.port_expose, return_responses=True) original_docs = list(random_docs(10, chunks_per_doc=0)) if rest: rest_post(f, 'index', original_docs) @@ -47,7 +47,7 @@ def test_crud(tmpdir, rest): ) with Flow.load_config('flow.yml') as f: - c = Client(port=f.port_expose) + c = Client(port=f.port_expose, return_responses=True) inputs = list(random_docs(1)) if rest: results = rest_post(f, 'search', inputs) @@ -65,7 +65,7 @@ def test_crud(tmpdir, rest): assert len(matches) == 10 with Flow.load_config('flow.yml') as f: - c = Client(port=f.port_expose) + c = Client(port=f.port_expose, return_responses=True) inputs = list(random_docs(5, chunks_per_doc=0)) if rest: @@ -75,7 +75,7 @@ def test_crud(tmpdir, rest): c.post(on='/delete', inputs=inputs) with Flow.load_config('flow.yml') as f: - c = Client(port=f.port_expose) + c = Client(port=f.port_expose, return_responses=True) inputs = list(random_docs(1)) if rest: @@ -95,14 +95,14 @@ def test_crud(tmpdir, rest): ) with Flow.load_config('flow.yml') as f: - c = Client(port=f.port_expose) + c = Client(port=f.port_expose, return_responses=True) if rest: rest_post(f, 'update', updated_docs) else: c.post(on='/update', inputs=updated_docs) with Flow.load_config('flow.yml') as f: - c = Client(port=f.port_expose) + c = Client(port=f.port_expose, return_responses=True) inputs = list(random_docs(1)) if rest: results = rest_post(f, 'search', inputs) diff --git a/tests/integration/deployments/test_deployment.py b/tests/integration/deployments/test_deployment.py index f6ccfc3aabd58..43e9ddd2549ad 100644 --- a/tests/integration/deployments/test_deployment.py +++ b/tests/integration/deployments/test_deployment.py @@ -31,7 +31,9 @@ async def test_deployments_trivial_topology(port_generator): with gateway_deployment, worker_deployment: # send requests to the gateway - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client( + host='localhost', port=port_expose, asyncio=True, return_responses=True + ) responses = c.post( '/', inputs=async_inputs, request_size=1, return_results=True ) @@ -102,7 +104,7 @@ async def test_deployments_flow_topology( await asyncio.sleep(0.1) # send requests to the gateway - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -145,7 +147,7 @@ async def test_deployments_shards(polling, port_generator): await asyncio.sleep(1.0) - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -180,7 +182,7 @@ async def test_deployments_replicas(port_generator): await asyncio.sleep(1.0) - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -220,7 +222,7 @@ async def test_deployments_with_executor(port_generator): await asyncio.sleep(1.0) - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -259,7 +261,7 @@ async def test_deployments_with_replicas_advance_faster(port_generator): await asyncio.sleep(1.0) - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True) input_docs = [Document(text='slow'), Document(text='fast')] responses = c.post('/', inputs=input_docs, request_size=1, return_results=True) response_list = [] diff --git a/tests/integration/gateway_clients/test_clients_gateways.py b/tests/integration/gateway_clients/test_clients_gateways.py index 799674904ce9b..4a5ec14994c49 100644 --- a/tests/integration/gateway_clients/test_clients_gateways.py +++ b/tests/integration/gateway_clients/test_clients_gateways.py @@ -170,7 +170,7 @@ def decompress(self): def client_send(client_id: int, port_in: int, protocol: str): from jina.clients import Client - c = Client(protocol=protocol, port=port_in) + c = Client(protocol=protocol, port=port_inm, return_responses=True) # send requests return c.post( diff --git a/tests/integration/gateway_clients/test_streaming.py b/tests/integration/gateway_clients/test_streaming.py index 80c325bc397b5..206650204899e 100644 --- a/tests/integration/gateway_clients/test_streaming.py +++ b/tests/integration/gateway_clients/test_streaming.py @@ -215,7 +215,7 @@ def test_disable_prefetch_slow_client_fast_executor( final_da = DocumentArray() - client = Client(protocol=protocol, port=port_in) + client = Client(protocol=protocol, port=port_in, return_responses=True) client.post( on='/', inputs=inputs, @@ -270,7 +270,7 @@ def test_disable_prefetch_fast_client_slow_executor( ) p.start() time.sleep(1.0) - client = Client(protocol=protocol, port=port_in) + client = Client(protocol=protocol, port=port_in, return_responses=True) client.post( on='/', inputs=inputs, @@ -325,7 +325,7 @@ async def malicious_client_gen(): yield get_document(i) def client(gen, port, protocol): - Client(protocol=protocol, port=port).post( + Client(protocol=protocol, port=port, return_responses=True).post( on='/index', inputs=gen, request_size=1 ) @@ -370,7 +370,7 @@ def client(gen, port, protocol): p.join() order_of_ids = list( - Client(protocol=protocol, port=port_in) + Client(protocol=protocol, port=port_in, return_responses=True) .post(on='/status', inputs=[Document()], return_results=True)[0] .docs[0] .tags['ids'] diff --git a/tests/integration/high_order_matches/test_document.py b/tests/integration/high_order_matches/test_document.py index da87fc5371298..4f81e4ba8d2d8 100644 --- a/tests/integration/high_order_matches/test_document.py +++ b/tests/integration/high_order_matches/test_document.py @@ -33,7 +33,7 @@ def test_single_executor(): ) with f: - results = Client(port=exposed_port).post( + results = Client(port=exposed_port, return_responses=True).post( on='index', inputs=Document(), return_results=True, @@ -50,7 +50,7 @@ def test_multi_executor(): ) with f: - results = Client(port=exposed_port).post( + results = Client(port=exposed_port, return_responses=True).post( on='index', inputs=Document(), return_results=True, diff --git a/tests/integration/inspect_deployments_flow/test_inspect_deployments_flow.py b/tests/integration/inspect_deployments_flow/test_inspect_deployments_flow.py index 8b9f92db449ee..2234ede37f49f 100644 --- a/tests/integration/inspect_deployments_flow/test_inspect_deployments_flow.py +++ b/tests/integration/inspect_deployments_flow/test_inspect_deployments_flow.py @@ -160,9 +160,9 @@ def validate_func(resp): ) with f: - response = Client(port=exposed_port, protocol=protocol).index( - inputs=docs, return_results=True - ) + response = Client( + port=exposed_port, protocol=protocol, return_responses=True + ).index(inputs=docs, return_results=True) validate_func(response[0]) @@ -183,7 +183,7 @@ def validate_func(resp): ) with f: - res = Client(protocol=protocol, port=exposed_port).index( + res = Client(protocol=protocol, port=exposed_port, return_responses=True).index( inputs=docs, return_results=True ) diff --git a/tests/integration/issues/hanging_termination/test_hanging_termination.py b/tests/integration/issues/hanging_termination/test_hanging_termination.py index 0b0e64966b7c2..a2d871532aa83 100644 --- a/tests/integration/issues/hanging_termination/test_hanging_termination.py +++ b/tests/integration/issues/hanging_termination/test_hanging_termination.py @@ -63,7 +63,7 @@ def merge(self, docs_matrix: DocumentArray, **kwargs): def get_client(port): args = set_client_cli_parser().parse_args( - ['--host', 'localhost', '--port', str(port)] + ['--host', 'localhost', '--port', str(port), '--return-responses'] ) return Client(args) diff --git a/tests/integration/override_config_params/container/test_override_config_params.py b/tests/integration/override_config_params/container/test_override_config_params.py index 4d613917547fd..b7aac8de7b4c3 100644 --- a/tests/integration/override_config_params/container/test_override_config_params.py +++ b/tests/integration/override_config_params/container/test_override_config_params.py @@ -40,7 +40,7 @@ def flow(request): @pytest.mark.parametrize('flow', ['yml', 'python'], indirect=['flow']) def test_override_config_params(docker_image, flow): with flow: - resps = Client(port=exposed_port).search( + resps = Client(port=exposed_port, return_responses=True).search( inputs=[Document()], return_results=True ) doc = resps[0].docs[0] @@ -59,7 +59,7 @@ def test_override_config_params_shards(docker_image): shards=2, ) with flow: - resps = Client(port=exposed_port).search( + resps = Client(port=exposed_port, return_responses=True).search( inputs=[Document()], return_results=True ) doc = resps[0].docs[0] diff --git a/tests/integration/override_config_params/worker/test_override_config_params.py b/tests/integration/override_config_params/worker/test_override_config_params.py index 868a8f12643c3..d7929a6c0e118 100644 --- a/tests/integration/override_config_params/worker/test_override_config_params.py +++ b/tests/integration/override_config_params/worker/test_override_config_params.py @@ -33,7 +33,7 @@ def flow(request): @pytest.mark.parametrize('flow', ['flow-yml', 'uses-yml', 'class'], indirect=['flow']) def test_override_config_params(flow): with flow: - resps = Client(port=exposed_port).search( + resps = Client(port=exposed_port, return_responses=True).search( inputs=[Document()], return_results=True ) doc = resps[0].docs[0] @@ -52,7 +52,7 @@ def test_override_config_params_shards(): shards=2, ) with flow: - resps = Client(port=exposed_port).search( + resps = Client(port=exposed_port, return_responses=True).search( inputs=[Document()], return_results=True ) doc = resps[0].docs[0] @@ -82,16 +82,22 @@ def foobar(self, docs, **kwargs): # original f = Flow(port_expose=exposed_port).add(uses=MyExec) with f: - req = Client(port=exposed_port).post('/index', Document(), return_results=True) + req = Client(port=exposed_port, return_responses=True).post( + '/index', Document(), return_results=True + ) assert req[0].docs[0].text == 'foo' # change bind to bar() f = Flow(port_expose=exposed_port).add(uses=MyExec, uses_requests={'/index': 'bar'}) with f: - req = Client(port=exposed_port).post('/index', Document(), return_results=True) + req = Client(port=exposed_port, return_responses=True).post( + '/index', Document(), return_results=True + ) assert req[0].docs[0].text == 'bar' - req = Client(port=exposed_port).post('/1', Document(), return_results=True) + req = Client(port=exposed_port, return_responses=True).post( + '/1', Document(), return_results=True + ) assert req[0].docs[0].text == 'foobar' # change bind to foobar() @@ -99,10 +105,12 @@ def foobar(self, docs, **kwargs): uses=MyExec, uses_requests={'/index': 'foobar'} ) with f: - req = Client(port=exposed_port).post('/index', Document(), return_results=True) + req = Client(port=exposed_port, return_responses=True).post( + '/index', Document(), return_results=True + ) assert req[0].docs[0].text == 'foobar' - req = Client(port=exposed_port).post( + req = Client(port=exposed_port, return_responses=True).post( '/index-blah', Document(), return_results=True ) assert req[0].docs[0].text == 'foo' @@ -112,5 +120,7 @@ def foobar(self, docs, **kwargs): uses=MyExec, uses_requests={'/default': 'bar'} ) with f: - req = Client(port=exposed_port).post('/index', Document(), return_results=True) + req = Client(port=exposed_port, return_responses=True).post( + '/index', Document(), return_results=True + ) assert req[0].docs[0].text == 'bar' diff --git a/tests/integration/override_executor_specific_params/test_specific_params.py b/tests/integration/override_executor_specific_params/test_specific_params.py index 728bf3eb40cf4..1294420bea646 100644 --- a/tests/integration/override_executor_specific_params/test_specific_params.py +++ b/tests/integration/override_executor_specific_params/test_specific_params.py @@ -45,7 +45,7 @@ def test_override_params(mocker): error_mock = mocker.Mock() with f: - resp = Client(port=exposed_port).index( + resp = Client(port=exposed_port, return_responses=True).index( inputs=DocumentArray([Document()]), parameters={'param1': 50, 'param2': 60, 'exec_name': {'param1': 'changed'}}, on_error=error_mock, diff --git a/tests/integration/pods/container/test_pod.py b/tests/integration/pods/container/test_pod.py index b217ee9383dd8..f69bd947e0daa 100644 --- a/tests/integration/pods/container/test_pod.py +++ b/tests/integration/pods/container/test_pod.py @@ -91,7 +91,9 @@ async def test_pods_trivial_topology( ) # send requests to the gateway - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client( + return_responses=True, host='localhost', port=port_expose, asyncio=True + ) responses = c.post( '/', inputs=async_inputs, request_size=1, return_results=True ) diff --git a/tests/integration/pods/test_pod.py b/tests/integration/pods/test_pod.py index 874c59076aa1a..c6e0c2df5e774 100644 --- a/tests/integration/pods/test_pod.py +++ b/tests/integration/pods/test_pod.py @@ -42,7 +42,9 @@ async def test_pods_trivial_topology(port_generator): # send requests to the gateway gateway_pod.wait_start_success() - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client( + return_responses=True, host='localhost', port=port_expose, asyncio=True + ) responses = c.post( '/', inputs=async_inputs, request_size=1, return_results=True ) @@ -140,7 +142,7 @@ async def test_pods_flow_topology( await asyncio.sleep(0.1) # send requests to the gateway - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(return_responses=True, host='localhost', port=port_expose, asyncio=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -197,7 +199,7 @@ async def test_pods_shards(polling, port_generator): await asyncio.sleep(1.0) gateway_pod.wait_start_success() - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(return_responses=True, host='localhost', port=port_expose, asyncio=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -252,7 +254,7 @@ async def test_pods_replicas(port_generator): await asyncio.sleep(1.0) gateway_pod.wait_start_success() - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(return_responses=True, host='localhost', port=port_expose, asyncio=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -320,7 +322,7 @@ async def test_pods_with_executor(port_generator): await asyncio.sleep(1.0) - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(return_responses=True, host='localhost', port=port_expose, asyncio=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -364,7 +366,7 @@ async def test_pods_gateway_worker_direct_connection(port_generator): worker_pod.wait_start_success() gateway_pod.wait_start_success() - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(return_responses=True, host='localhost', port=port_expose, asyncio=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -414,7 +416,7 @@ async def test_pods_with_replicas_advance_faster(port_generator): activate_msg.add_related_entity('worker', '127.0.0.1', pod.args.port_in) GrpcConnectionPool.send_request_sync(activate_msg, f'127.0.0.1:{head_port}') - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(return_responses=True, host='localhost', port=port_expose, asyncio=True) input_docs = [Document(text='slow'), Document(text='fast')] responses = c.post('/', inputs=input_docs, request_size=1, return_results=True) response_list = [] diff --git a/tests/integration/reduce/test_reduce.py b/tests/integration/reduce/test_reduce.py index 07a82f15811f7..7593c6a4f7619 100644 --- a/tests/integration/reduce/test_reduce.py +++ b/tests/integration/reduce/test_reduce.py @@ -57,7 +57,9 @@ def test_reduce_shards(n_docs): with search_flow as f: da = DocumentArray([Document() for _ in range(5)]) - resp = Client(port=exposed_port).post('/search', inputs=da, return_results=True) + resp = Client(port=exposed_port, return_responses=True).post( + '/search', inputs=da, return_results=True + ) assert len(resp[0].docs) == 5 @@ -94,7 +96,9 @@ def test_uses_after_no_reduce(n_shards, n_docs): with search_flow as f: da = DocumentArray([Document() for _ in range(5)]) - resp = Client(port=exposed_port).post('/search', inputs=da, return_results=True) + resp = Client(port=exposed_port, return_responses=True).post( + '/search', inputs=da, return_results=True + ) # assert no reduce happened assert len(resp[0].docs) == 1 @@ -147,7 +151,9 @@ def test_reduce_needs(): with flow as f: da = DocumentArray([Document() for _ in range(5)]) - resp = Client(port=exposed_port).post('/', inputs=da, return_results=True) + resp = Client(port=exposed_port, return_responses=True).post( + '/', inputs=da, return_results=True + ) assert len(resp[0].docs) == 5 for doc in resp[0].docs: @@ -168,7 +174,9 @@ def test_uses_before_reduce(): with flow as f: da = DocumentArray([Document() for _ in range(5)]) - resp = Client(port=exposed_port).post('/', inputs=da, return_results=True) + resp = Client(port=exposed_port, return_responses=True).post( + '/', inputs=da, return_results=True + ) # assert reduce happened because there is only BaseExecutor as uses_before assert len(resp[0].docs) == 5 @@ -185,7 +193,9 @@ def test_uses_before_no_reduce_real_executor(): with flow as f: da = DocumentArray([Document() for _ in range(5)]) - resp = Client(port=exposed_port).post('/', inputs=da, return_results=True) + resp = Client(port=exposed_port, return_responses=True).post( + '/', inputs=da, return_results=True + ) # assert no reduce happened assert len(resp[0].docs) == 1 @@ -203,7 +213,9 @@ def test_uses_before_no_reduce_real_executor_uses(): with flow as f: da = DocumentArray([Document() for _ in range(5)]) - resp = Client(port=exposed_port).post('/', inputs=da, return_results=True) + resp = Client(port=exposed_port, return_responses=True).post( + '/', inputs=da, return_results=True + ) # assert no reduce happened assert len(resp[0].docs) == 1 @@ -218,7 +230,7 @@ def test_reduce_status(): with flow as f: da = DocumentArray([Document() for _ in range(5)]) - resp = Client(port=exposed_port).post( + resp = Client(port=exposed_port, return_responses=True).post( '/status', parameters={'foo': 'bar'}, inputs=da, return_results=True ) diff --git a/tests/integration/rolling_update/test_rolling_update.py b/tests/integration/rolling_update/test_rolling_update.py index 6f239dc01930d..9ccac2c7ab8f8 100644 --- a/tests/integration/rolling_update/test_rolling_update.py +++ b/tests/integration/rolling_update/test_rolling_update.py @@ -248,7 +248,9 @@ def test_override_uses_with(docs): ) with flow: # test rolling update does not hang - ret1 = Client(port=exposed_port).search(docs, return_results=True) + ret1 = Client(port=exposed_port, return_responses=True).search( + docs, return_results=True + ) flow.rolling_update( 'executor1', uses_with={ @@ -257,7 +259,9 @@ def test_override_uses_with(docs): 'argument2': 'version2', }, ) - ret2 = Client(port=exposed_port).search(docs, return_results=True) + ret2 = Client(port=exposed_port, return_responses=True).search( + docs, return_results=True + ) assert len(ret1) > 0 assert len(ret1[0].docs) > 0 @@ -286,12 +290,12 @@ def test_scale_after_rolling_update(docs, replicas, scale_to): replicas=replicas, ) with flow: - ret1 = Client(port=exposed_port).search( + ret1 = Client(port=exposed_port, return_responses=True).search( docs, return_results=True, request_size=1 ) flow.rolling_update('executor1', None) flow.scale('executor1', replicas=scale_to) - ret2 = Client(port=exposed_port).search( + ret2 = Client(port=exposed_port, return_responses=True).search( docs, return_results=True, request_size=1 ) @@ -316,7 +320,7 @@ def send_requests( doc_count: int, request_count: int, ): - client = Client(port=port_expose) + client = Client(port=port_expose, return_responses=True) for i in range(request_count): responses = client.search( [Document(id=f'{idx}', text=f'doc{idx}') for idx in range(doc_count)], diff --git a/tests/integration/runtimes/test_runtimes.py b/tests/integration/runtimes/test_runtimes.py index eed94728e5c26..20ce643495e37 100644 --- a/tests/integration/runtimes/test_runtimes.py +++ b/tests/integration/runtimes/test_runtimes.py @@ -71,7 +71,7 @@ async def test_runtimes_trivial_topology(port_generator): GrpcConnectionPool.send_request_sync(activate_msg, f'127.0.0.1:{head_port}') # send requests to the gateway - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -193,7 +193,7 @@ async def test_runtimes_flow_topology( ) # send requests to the gateway - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -264,7 +264,7 @@ async def test_runtimes_shards(polling, port_generator): ready_or_shutdown_event=multiprocessing.Event(), ) - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -337,7 +337,7 @@ async def test_runtimes_replicas(port_generator): ready_or_shutdown_event=multiprocessing.Event(), ) - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -425,7 +425,7 @@ async def test_runtimes_with_executor(port_generator): ready_or_shutdown_event=multiprocessing.Event(), ) - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -483,7 +483,7 @@ async def test_runtimes_gateway_worker_direct_connection(port_generator): ready_or_shutdown_event=multiprocessing.Event(), ) - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True) responses = c.post('/', inputs=async_inputs, request_size=1, return_results=True) response_list = [] async for response in responses: @@ -548,7 +548,7 @@ async def test_runtimes_with_replicas_advance_faster(port_generator): ready_or_shutdown_event=multiprocessing.Event(), ) - c = Client(host='localhost', port=port_expose, asyncio=True) + c = Client(host='localhost', port=port_expose, asyncio=True, return_responses=True) input_docs = [Document(text='slow'), Document(text='fast')] responses = c.post('/', inputs=input_docs, request_size=1, return_results=True) response_list = [] diff --git a/tests/integration/scale/test_scale.py b/tests/integration/scale/test_scale.py index bee8b851fad2c..c790bcc523485 100644 --- a/tests/integration/scale/test_scale.py +++ b/tests/integration/scale/test_scale.py @@ -89,13 +89,13 @@ def flow_with_runtime(request): def test_scale_success(flow_with_runtime, deployment_params): num_replicas, scale_to, shards = deployment_params with flow_with_runtime as f: - ret1 = Client(port=exposed_port).index( + ret1 = Client(port=exposed_port, return_responses=True).index( inputs=DocumentArray([Document() for _ in range(200)]), return_results=True, request_size=10, ) f.scale(deployment_name='executor', replicas=scale_to) - ret2 = Client(port=exposed_port).index( + ret2 = Client(port=exposed_port, return_responses=True).index( inputs=DocumentArray([Document() for _ in range(200)]), return_results=True, request_size=10, @@ -138,7 +138,7 @@ def test_scale_success(flow_with_runtime, deployment_params): @pytest.mark.parametrize('protocol', ['grpc', 'http', 'websocket']) def test_scale_with_concurrent_client(flow_with_runtime, deployment_params, protocol): def peer_client(port, protocol, peer_hash, queue): - rv = Client(protocol=protocol, port=port).index( + rv = Client(protocol=protocol, port=port, return_responses=True).index( [Document(text=peer_hash) for _ in range(NUM_DOCS_SENT_BY_CLIENTS)], request_size=5, return_results=True, @@ -168,7 +168,7 @@ def peer_client(port, protocol, peer_hash, queue): for t in thread_pool: t.join() - c = Client(protocol=protocol, port=port_expose) + c = Client(protocol=protocol, port=port_expose, return_responses=True) rv = c.index( [Document() for _ in range(5)], request_size=1, return_results=True ) diff --git a/tests/integration/streaming/test_clients_streaming.py b/tests/integration/streaming/test_clients_streaming.py index 3879be5bcec5b..a3f0aafd627f1 100644 --- a/tests/integration/streaming/test_clients_streaming.py +++ b/tests/integration/streaming/test_clients_streaming.py @@ -221,7 +221,7 @@ async def malicious_client_gen(): yield get_document(i) def client(gen, port, protocol): - Client(protocol=protocol, port=port).post( + Client(protocol=protocol, port=port, return_responses=True).post( on='/index', inputs=gen, request_size=1 ) @@ -250,7 +250,7 @@ def client(gen, port, protocol): p.join() order_of_ids = list( - Client(protocol=protocol, port=f.port_expose) + Client(protocol=protocol, port=f.port_expose, return_responses=True) .post(on='/status', inputs=[Document()], return_results=True)[0] .docs[0] .tags['ids'] diff --git a/tests/integration/v2_api/test_docs_matrix_tail_pea.py b/tests/integration/v2_api/test_docs_matrix_tail_pea.py index dbca45aa34abd..f44a151dfe932 100644 --- a/tests/integration/v2_api/test_docs_matrix_tail_pea.py +++ b/tests/integration/v2_api/test_docs_matrix_tail_pea.py @@ -60,7 +60,7 @@ def test_sharding_tail_pod(num_replicas, num_shards): uses_after=MatchMerger, ) with f: - results = Client(port=1234).post( + results = Client(port=1234, return_responses=True).post( on='/search', inputs=Document(matches=[Document()]), return_results=True, @@ -89,7 +89,7 @@ def multimodal_generator(): ) ) with f: - results = Client(port=1234).post( + results = Client(port=1234, return_responses=True).post( on='/search', inputs=multimodal_generator(), return_results=True, diff --git a/tests/integration/v2_api/test_func_routing.py b/tests/integration/v2_api/test_func_routing.py index b1a5673659756..a6d262c81ae8d 100644 --- a/tests/integration/v2_api/test_func_routing.py +++ b/tests/integration/v2_api/test_func_routing.py @@ -15,7 +15,7 @@ def foo(self, **kwargs): f = Flow(port_expose=1234).add(uses=MyExecutor) with f: - results = Client(port=1234).post( + results = Client(return_responses=True, port=1234).post( on='/search', inputs=[(Document(), Document()) for _ in range(3)], parameters={'hello': 'world', 'topk': 10}, @@ -25,7 +25,7 @@ def foo(self, **kwargs): assert results[0].data.docs[0].tags['hello'] == 'world' with f: - results = Client(port=1234).post( + results = Client(return_responses=True, port=1234).post( on='/random', inputs=[Document() for _ in range(3)], parameters={'hello': 'world', 'topk': 10}, @@ -43,7 +43,7 @@ def foo(self, **kwargs): f = Flow(port_expose=1234).add(uses=MyExecutor) with f: - results = Client(port=1234).post( + results = Client(return_responses=True, port=1234).post( on='/search', inputs=[(Document(), Document()) for _ in range(3)], return_results=True, @@ -62,7 +62,7 @@ def foo(self, **kwargs): f = Flow(port_expose=1234).add(uses=MyExecutor) with f: - Client(port=1234).post( + Client(return_responses=True, port=1234).post( on='/some_endpoint', inputs=[Document() for _ in range(3)], parameters={'hello': 'world', 'topk': 10}, @@ -78,7 +78,7 @@ def foo(self, **kwargs): f = Flow(port_expose=1234).add(uses=MyExecutor) with f: - Client(port=1234).post( + Client(return_responses=True, port=1234).post( on='/some_endpoint', inputs=[Document() for _ in range(3)], parameters={'hello': 'world', 'topk': 10}, @@ -121,7 +121,7 @@ def validate(req): mock() with f: - Client(port=1234).post( + Client(return_responses=True, port=1234).post( on='/some_endpoint', inputs=[Document() for _ in range(3)], parameters={'hello': 'world', 'topk': 10}, @@ -135,7 +135,7 @@ def test_dealer_routing(mocker): f = Flow(port_expose=1234).add(shards=3) mock = mocker.Mock() with f: - Client(port=1234).post( + Client(return_responses=True, port=1234).post( on='/some_endpoint', inputs=[Document() for _ in range(100)], request_size=2, @@ -161,7 +161,7 @@ def bar(self, **kwargs): with f: success_mock = mocker.Mock() fail_mock = mocker.Mock() - Client(port=1234).post( + Client(return_responses=True, port=1234).post( '/hello', target_executor='p0', inputs=Document(), @@ -198,7 +198,7 @@ def success(self, **kwargs): with f: # both deployments are called, create no error mock = mocker.Mock() - Client(port=1234).post( + Client(return_responses=True, port=1234).post( on='/foo', target_executor='foo', inputs=Document(), on_done=mock ) mock.assert_called() @@ -207,7 +207,7 @@ def success(self, **kwargs): def test_target_executor_with_one_pathways(): f = Flow(port_expose=1234).add().add(name='my_target') with f: - results = Client(port=1234).post( + results = Client(return_responses=True, port=1234).post( on='/search', inputs=Document(), return_results=True, @@ -223,7 +223,7 @@ def test_target_executor_with_two_pathways(): .add(needs=['gateway', 'executor0'], name='my_target') ) with f: - results = Client(port=1234).post( + results = Client(return_responses=True, port=1234).post( on='/search', inputs=Document(), return_results=True, @@ -240,7 +240,7 @@ def test_target_executor_with_two_pathways_one_skip(): .add(name='my_target') ) with f: - results = Client(port=1234).post( + results = Client(return_responses=True, port=1234).post( on='/search', inputs=Document(), return_results=True, @@ -252,7 +252,7 @@ def test_target_executor_with_two_pathways_one_skip(): def test_target_executor_with_shards(): f = Flow(port_expose=1234).add(shards=2).add(name='my_target') with f: - results = Client(port=1234).post( + results = Client(return_responses=True, port=1234).post( on='/search', inputs=Document(), return_results=True, diff --git a/tests/integration/v2_api/test_override_requests.py b/tests/integration/v2_api/test_override_requests.py index be40a14fd620c..4f666be459845 100644 --- a/tests/integration/v2_api/test_override_requests.py +++ b/tests/integration/v2_api/test_override_requests.py @@ -13,7 +13,7 @@ def foo(self, docs, **kwargs): with Flow(port_expose=exposed_port).add( uses=FooExecutor, uses_requests={'/non_foo': 'foo'} ) as f: - c = Client(port=exposed_port) + c = Client(port=exposed_port, return_responses=True) resp1 = c.post( on='/foo', inputs=DocumentArray([Document(text='')]), return_results=True ) @@ -46,7 +46,7 @@ def bar(self, docs, **kwargs): uses_after=OtherExecutor, uses_before=OtherExecutor, ) as f: - c = Client(port=exposed_port) + c = Client(port=exposed_port, return_responses=True) resp1 = c.post( on='/foo', inputs=DocumentArray([Document(text='')]), return_results=True ) diff --git a/tests/k8s/test_graceful_request_handling.py b/tests/k8s/test_graceful_request_handling.py index eba7987a7aa1f..e708db5915c49 100644 --- a/tests/k8s/test_graceful_request_handling.py +++ b/tests/k8s/test_graceful_request_handling.py @@ -180,6 +180,7 @@ async def test_no_message_lost_during_scaling(logger, docker_images, tmpdir): # send requests and validate time.sleep(0.1) client_kwargs = dict( + return_responses=True, host='localhost', port=flow.port_expose, ) diff --git a/tests/k8s/test_k8s.py b/tests/k8s/test_k8s.py index 9debf9df7eb62..cae726e171d9b 100644 --- a/tests/k8s/test_k8s.py +++ b/tests/k8s/test_k8s.py @@ -121,6 +121,7 @@ async def run_test(flow, core_client, namespace, endpoint, n_docs=10, request_si client_kwargs = dict( host='localhost', port=flow.port_expose, + return_responses=True, asyncio=True, ) client_kwargs.update(flow._common_kwargs) diff --git a/tests/unit/clients/python/test_client.py b/tests/unit/clients/python/test_client.py index b438b751a68f8..30a3f2cce7395 100644 --- a/tests/unit/clients/python/test_client.py +++ b/tests/unit/clients/python/test_client.py @@ -46,13 +46,13 @@ def test_img_2(): 'inputs', [iter([b'1234', b'45467']), iter([Document(), Document()])] ) def test_check_input_success(inputs): - client = Client(host='localhost', port_jinad=12345) + client = Client(return_responses=True, host='localhost', port_jinad=12345) client.check_input(inputs) @pytest.mark.parametrize('inputs', [iter([list(), list(), {12, 2, 3}])]) def test_check_input_fail(inputs): - client = Client(host='localhost', port_jinad=12345) + client = Client(return_responses=True, host='localhost', port_jinad=12345) with pytest.raises(BadClientInput): client.check_input(inputs) @@ -101,6 +101,7 @@ def test_client_websocket(mocker, flow_with_websocket): with flow_with_websocket: time.sleep(0.5) client = Client( + return_responses=True, host='localhost', port=str(flow_with_websocket.port_expose), protocol='websocket', @@ -124,13 +125,18 @@ def test_client_websocket(mocker, flow_with_websocket): @pytest.mark.parametrize('protocol', ['websocket', 'grpc', 'http']) def test_client_from_kwargs(protocol): - Client(port=12345, host='0.0.0.1', protocol=protocol) + Client(return_responses=True, port=12345, host='0.0.0.1', protocol=protocol) @pytest.mark.parametrize('protocol', ['websocket', 'grpc', 'http']) def test_independent_client(protocol): with Flow(protocol=protocol) as f: - c = Client(host='localhost', port=f.port_expose, protocol=protocol) + c = Client( + return_responses=True, + host='localhost', + port=f.port_expose, + protocol=protocol, + ) assert type(c) == type(f.client) c.post('/') @@ -151,7 +157,12 @@ def test_all_sync_clients(protocol, mocker): m3 = mocker.Mock() m4 = mocker.Mock() with f: - c = Client(host='localhost', port=f.port_expose, protocol=protocol) + c = Client( + return_responses=True, + host='localhost', + port=f.port_expose, + protocol=protocol, + ) c.post('/', on_done=m1) c.post('/foo', docs, on_done=m2) c.post('/foo', on_done=m3) diff --git a/tests/unit/orchestrate/flow/flow-orchestrate/test_flow_docarray_return.py b/tests/unit/orchestrate/flow/flow-orchestrate/test_flow_docarray_return.py index e9660584c2e97..f810074f208b6 100644 --- a/tests/unit/orchestrate/flow/flow-orchestrate/test_flow_docarray_return.py +++ b/tests/unit/orchestrate/flow/flow-orchestrate/test_flow_docarray_return.py @@ -76,6 +76,5 @@ def test_flow_client_defaults(): results = c.post(on='/index', inputs=[Document()], return_results=True) assert isinstance(docs, DocumentArray) assert docs[0].text == 'Hello World!' - assert isinstance(results, list) - assert isinstance(results[0], types.request.data.DataRequest) - assert results[0].docs[0].text == 'Hello World!' + assert isinstance(results, DocumentArray) + assert results[0].text == 'Hello World!' diff --git a/tests/unit/test_gateway.py b/tests/unit/test_gateway.py index 8690c1fdf9b9f..30f26f1bb715e 100644 --- a/tests/unit/test_gateway.py +++ b/tests/unit/test_gateway.py @@ -42,7 +42,9 @@ def _request(status_codes, durations, index): durations=durations, index=index, ) - results = Client(port=PORT_EXPOSE, protocol=protocol).index( + results = Client( + port=PORT_EXPOSE, protocol=protocol, return_responses=True + ).index( inputs=(Document() for _ in range(256)), return_results=True, _size=16, From 19d77c60707d143ea92c1258811fa3d1f1304654 Mon Sep 17 00:00:00 2001 From: Joan Fontanals Martinez Date: Tue, 15 Feb 2022 15:02:12 +0100 Subject: [PATCH 4/4] test: fix more tests --- tests/distributed/test_topologies/test_topologies.py | 5 ++++- tests/integration/gateway_clients/test_clients_gateways.py | 2 +- tests/integration/v2_api/test_yaml_dump_load.py | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/distributed/test_topologies/test_topologies.py b/tests/distributed/test_topologies/test_topologies.py index 210a9a3b750ee..78be38316f8a9 100644 --- a/tests/distributed/test_topologies/test_topologies.py +++ b/tests/distributed/test_topologies/test_topologies.py @@ -204,7 +204,10 @@ def test_remote_workspace_value(): ) args = client.flows.get(flow_id)['arguments']['object']['arguments'] response = Client( - host=HOST, port=args['port_expose'], protocol=args['protocol'] + host=HOST, + port=args['port_expose'], + protocol=args['protocol'], + return_responses=True, ).post(on='/', inputs=[Document()], show_progress=True, return_results=True) assert ( response[0] diff --git a/tests/integration/gateway_clients/test_clients_gateways.py b/tests/integration/gateway_clients/test_clients_gateways.py index 4a5ec14994c49..fb26f65ffb9db 100644 --- a/tests/integration/gateway_clients/test_clients_gateways.py +++ b/tests/integration/gateway_clients/test_clients_gateways.py @@ -170,7 +170,7 @@ def decompress(self): def client_send(client_id: int, port_in: int, protocol: str): from jina.clients import Client - c = Client(protocol=protocol, port=port_inm, return_responses=True) + c = Client(protocol=protocol, port=port_in, return_responses=True) # send requests return c.post( diff --git a/tests/integration/v2_api/test_yaml_dump_load.py b/tests/integration/v2_api/test_yaml_dump_load.py index 647482e50d38c..c0b1a5b906973 100644 --- a/tests/integration/v2_api/test_yaml_dump_load.py +++ b/tests/integration/v2_api/test_yaml_dump_load.py @@ -66,7 +66,7 @@ def test_load_save_yml(tmp_path): ) def test_load_yaml_route(req_endpoint, doc_text): f = Flow(port_expose=12345).add(uses=y) - c = Client(port=exposed_port) + c = Client(port=exposed_port, return_responses=True) with f: results = c.post(req_endpoint, Document(), return_results=True)