From b847483ceae2ce9da58fc77bdf1bd60cafad9540 Mon Sep 17 00:00:00 2001 From: Deepankar Mahapatro Date: Mon, 4 Jan 2021 16:50:56 +0530 Subject: [PATCH 1/7] chore: check ci tests --- .../jinad/test_index_query_with_shards/test_integration.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/jinad/test_index_query_with_shards/test_integration.sh b/tests/integration/jinad/test_index_query_with_shards/test_integration.sh index 8d4a026d84454..258508f0a5498 100755 --- a/tests/integration/jinad/test_index_query_with_shards/test_integration.sh +++ b/tests/integration/jinad/test_index_query_with_shards/test_integration.sh @@ -7,6 +7,7 @@ if [ "${PWD##*/}" != "jina" ] exit 1 fi + docker-compose -f tests/integration/jinad/test_index_query_with_shards/docker-compose.yml --project-directory . up --build -d sleep 10 From c0d8bf4ca53b98000e9da0e42e9968208a7a79a7 Mon Sep 17 00:00:00 2001 From: Deepankar Mahapatro Date: Mon, 4 Jan 2021 20:50:59 +0530 Subject: [PATCH 2/7] fix(jinad): de;ete call in teardown --- jina/excepts.py | 4 -- jina/peapods/runtimes/jinad/__init__.py | 5 ++- jina/peapods/runtimes/jinad/api.py | 54 ++++++++++++------------- 3 files changed, 29 insertions(+), 34 deletions(-) diff --git a/jina/excepts.py b/jina/excepts.py index 5816ea679f336..3512c65a05332 100644 --- a/jina/excepts.py +++ b/jina/excepts.py @@ -175,10 +175,6 @@ class BadNamedScoreType(TypeError): """ Exception when can not construct a named score from the given data """ -class RemotePodClosed(Exception): - """ Exception when remote pod is closed and log streaming needs to exit """ - - class LengthMismatchException(Exception): """ Exception when length of two items should be identical while not """ diff --git a/jina/peapods/runtimes/jinad/__init__.py b/jina/peapods/runtimes/jinad/__init__.py index 2288e46f0fde5..f6f70e4e77e9b 100644 --- a/jina/peapods/runtimes/jinad/__init__.py +++ b/jina/peapods/runtimes/jinad/__init__.py @@ -20,7 +20,10 @@ def setup(self): self.logger.success(f'created remote {self.api.kind} with id {colored(self._remote_id, "cyan")}') def run_forever(self): - self.api.log(self._remote_id, None) + self.api.log(self._remote_id) + + def teardown(self): + self.api.delete(remote_id=self._remote_id) @cached_property def _remote_id(self) -> Optional[str]: diff --git a/jina/peapods/runtimes/jinad/api.py b/jina/peapods/runtimes/jinad/api.py index 88466ceed9e39..d5b63af362b89 100644 --- a/jina/peapods/runtimes/jinad/api.py +++ b/jina/peapods/runtimes/jinad/api.py @@ -8,7 +8,6 @@ from ....jaml import JAML from ....logging import JinaLogger from ....enums import RemotePeapodType -from ....excepts import RemotePodClosed from ....importer import ImportExtensions @@ -162,7 +161,7 @@ def create(self, args: Dict, **kwargs) -> Optional[str]: except requests.exceptions.RequestException as ex: self.logger.error(f'couldn\'t create pod with remote jinad {repr(ex)}') - async def wslogs(self, remote_id: 'str', stop_event: Event, current_line: int = 0): + async def wslogs(self, remote_id: 'str', current_line: int = 0): """ websocket log stream from remote pea/pod :param remote_id: the identity of that pea/pod :param stop_event: the multiprocessing event which marks if stop event is set @@ -172,51 +171,48 @@ async def wslogs(self, remote_id: 'str', stop_event: Event, current_line: int = with ImportExtensions(required=True): import websockets + remote_loggers = {} try: # sleeping for few seconds to allow the logs to be written in remote await asyncio.sleep(3) - async with websockets.connect(f'{self.log_url}/{remote_id}?timeout=20') as websocket: + async with websockets.connect(f'{self.log_url}/{remote_id}?timeout=5') as websocket: await websocket.send(json.dumps({'from': current_line})) - remote_loggers = {} - while True: - log_line = await websocket.recv() - if log_line: - try: - log_line = json.loads(log_line) - current_line = int(list(log_line.keys())[0]) - log_line_dict = list(log_line.values())[0] - log_line_dict = json.loads(log_line_dict.split('\t')[-1].strip()) - name = log_line_dict['name'] - if name not in remote_loggers: - remote_loggers[name] = JinaLogger(context=f'🌏 {name}') - # TODO: change logging level, process name in local logger - remote_loggers[name].info(f'{log_line_dict["message"].strip()}') - except json.decoder.JSONDecodeError: - continue + async for log_line in websocket: + try: + log_line = json.loads(log_line) + current_line = int(list(log_line.keys())[0]) + log_line_dict = list(log_line.values())[0] + log_line_dict = json.loads(log_line_dict.split('\t')[-1].strip()) + name = log_line_dict['name'] + + if name not in remote_loggers: + remote_loggers[name] = JinaLogger(context=f'🌏 {name}') + # TODO(Deepankar): change logging level, process name in local logger + remote_loggers[name].info(f'{log_line_dict["message"].strip()}') + except json.decoder.JSONDecodeError: + continue await websocket.send(json.dumps({})) - if stop_event.is_set(): - for logger in remote_loggers.values(): - logger.close() - raise RemotePodClosed except websockets.exceptions.ConnectionClosedOK: self.logger.debug(f'Client got disconnected from server') return current_line except websockets.exceptions.WebSocketException as e: self.logger.error(f'Got following error while streaming logs via websocket {repr(e)}') return 0 + finally: + if remote_loggers: + for logger in remote_loggers.values(): + logger.close() - def log(self, remote_id: 'str', stop_event: Event, **kwargs) -> None: + def log(self, remote_id: 'str', **kwargs) -> None: """ Start the log stream from remote pea/pod, will use local logger for output :param remote_id: the identity of that pea/pod :return: """ try: - self.logger.info(f'fetching streamed logs from remote id: {remote_id}') - asyncio.run(self.wslogs(remote_id=remote_id, stop_event=stop_event, current_line=0)) - except RemotePodClosed: - self.logger.debug(f'🌏 remote closed') + self.logger.info(f'🌏 Fetching streamed logs from remote id: {remote_id}') + asyncio.run(self.wslogs(remote_id=remote_id, current_line=0)) finally: - self.logger.info(f'🌏 exiting from remote logger') + self.logger.info(f'🌏 Exiting from remote logger') def delete(self, remote_id: 'str', **kwargs) -> bool: """ Delete a remote pea/pod From 3f03846fa00d585658589e25056e901e87cb6ce8 Mon Sep 17 00:00:00 2001 From: Deepankar Mahapatro Date: Mon, 4 Jan 2021 20:56:14 +0530 Subject: [PATCH 3/7] test(docker): add remove orphans option --- tests/integration/jinad/Dockerfiles/Dockerfile | 7 ++++--- tests/integration/jinad/Dockerfiles/entrypoint.sh | 12 ++++++++++++ .../jinad/test_index_query/test_integration.sh | 4 ++-- .../test_index_query_with_shards/test_integration.sh | 4 ++-- .../test_simple_distributed/test_integration.sh | 4 ++-- .../test_integration.sh | 4 ++-- .../jinad/test_simple_hub_pods/test_integration.sh | 4 ++-- 7 files changed, 26 insertions(+), 13 deletions(-) create mode 100644 tests/integration/jinad/Dockerfiles/entrypoint.sh diff --git a/tests/integration/jinad/Dockerfiles/Dockerfile b/tests/integration/jinad/Dockerfiles/Dockerfile index 03d53f527ced8..a527137853225 100644 --- a/tests/integration/jinad/Dockerfiles/Dockerfile +++ b/tests/integration/jinad/Dockerfiles/Dockerfile @@ -7,11 +7,12 @@ WORKDIR /jina/ ADD setup.py MANIFEST.in requirements.txt extra-requirements.txt README.md ./ ADD cli ./cli/ ADD jina ./jina/ +ADD tests/integration/jinad/Dockerfiles/entrypoint.sh ./ ARG PIP_TAG=[test] +RUN chmod +x entrypoint.sh RUN pip install ."$PIP_TAG" -# This doesn't start fluentd in the background -# add entrypoint script if fluentd needs to be enabled for tests -ENTRYPOINT ["jinad"] +# This starts both jinad & fluentd in the background +ENTRYPOINT ["bash", "-c", "./entrypoint.sh"] diff --git a/tests/integration/jinad/Dockerfiles/entrypoint.sh b/tests/integration/jinad/Dockerfiles/entrypoint.sh new file mode 100644 index 0000000000000..6224048c772a2 --- /dev/null +++ b/tests/integration/jinad/Dockerfiles/entrypoint.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +CONF_PATH=$(python3 -c "import pkg_resources; print(pkg_resources.resource_filename('jina', 'resources/fluent.conf'))") + +# Start fluentd in the background +nohup fluentd -c $CONF_PATH & + +# Allowing fluentd conf to load by sleeping for 2secs +sleep 2 + +# Start jinad (uvicorn) server +jinad diff --git a/tests/integration/jinad/test_index_query/test_integration.sh b/tests/integration/jinad/test_index_query/test_integration.sh index b247a05f0ac5b..bfeb1a7022d37 100755 --- a/tests/integration/jinad/test_index_query/test_integration.sh +++ b/tests/integration/jinad/test_index_query/test_integration.sh @@ -6,7 +6,7 @@ if [ "${PWD##*/}" != "jina" ] exit 1 fi -docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . up --build -d +docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . up --build -d --remove-orphans sleep 10 #Indexing part @@ -43,7 +43,7 @@ echo "document matched has the text: ${TEXT_INDEXED}" curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" -docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . down +docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . down --remove-orphans EXPECTED_TEXT='"text:hey, dude"' diff --git a/tests/integration/jinad/test_index_query_with_shards/test_integration.sh b/tests/integration/jinad/test_index_query_with_shards/test_integration.sh index 258508f0a5498..0acec390d53b8 100755 --- a/tests/integration/jinad/test_index_query_with_shards/test_integration.sh +++ b/tests/integration/jinad/test_index_query_with_shards/test_integration.sh @@ -8,7 +8,7 @@ if [ "${PWD##*/}" != "jina" ] fi -docker-compose -f tests/integration/jinad/test_index_query_with_shards/docker-compose.yml --project-directory . up --build -d +docker-compose -f tests/integration/jinad/test_index_query_with_shards/docker-compose.yml --project-directory . up --build -d --remove-orphans sleep 10 #Indexing part @@ -59,7 +59,7 @@ echo "found ${COUNT} matches" curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" -docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . down +docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . down --remove-orphans if [ $COUNT = 10 ]; then echo "Success" diff --git a/tests/integration/jinad/test_simple_distributed/test_integration.sh b/tests/integration/jinad/test_simple_distributed/test_integration.sh index c15e955c2f83e..599c46f486de9 100755 --- a/tests/integration/jinad/test_simple_distributed/test_integration.sh +++ b/tests/integration/jinad/test_simple_distributed/test_integration.sh @@ -6,7 +6,7 @@ if [ "${PWD##*/}" != "jina" ] exit 1 fi -docker-compose -f tests/integration/jinad/test_simple_distributed/docker-compose.yml --project-directory . up --build -d +docker-compose -f tests/integration/jinad/test_simple_distributed/docker-compose.yml --project-directory . up --build -d --remove-orphans sleep 10 @@ -27,7 +27,7 @@ curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: appli curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" -docker-compose -f tests/integration/jinad/test_simple_distributed/docker-compose.yml --project-directory . down +docker-compose -f tests/integration/jinad/test_simple_distributed/docker-compose.yml --project-directory . down --remove-orphans EXPECTED_TEXT='"text:hey, dude"' diff --git a/tests/integration/jinad/test_simple_distributed_with_shards/test_integration.sh b/tests/integration/jinad/test_simple_distributed_with_shards/test_integration.sh index 133fbf111937f..0fe0560f1caf3 100755 --- a/tests/integration/jinad/test_simple_distributed_with_shards/test_integration.sh +++ b/tests/integration/jinad/test_simple_distributed_with_shards/test_integration.sh @@ -6,7 +6,7 @@ if [ "${PWD##*/}" != "jina" ] exit 1 fi -docker-compose -f tests/integration/jinad/test_simple_distributed_with_shards/docker-compose.yml --project-directory . up --build -d +docker-compose -f tests/integration/jinad/test_simple_distributed_with_shards/docker-compose.yml --project-directory . up --build -d --remove-orphans sleep 10 @@ -27,7 +27,7 @@ curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: appli curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" -docker-compose -f tests/integration/jinad/test_simple_distributed_with_shards/docker-compose.yml --project-directory . down +docker-compose -f tests/integration/jinad/test_simple_distributed_with_shards/docker-compose.yml --project-directory . down --remove-orphans EXPECTED_TEXT='"text:hey, dude"' diff --git a/tests/integration/jinad/test_simple_hub_pods/test_integration.sh b/tests/integration/jinad/test_simple_hub_pods/test_integration.sh index 975b368468720..79f135f49fcef 100755 --- a/tests/integration/jinad/test_simple_hub_pods/test_integration.sh +++ b/tests/integration/jinad/test_simple_hub_pods/test_integration.sh @@ -6,7 +6,7 @@ if [ "${PWD##*/}" != "jina" ] exit 1 fi -docker-compose -f tests/integration/jinad/test_simple_hub_pods/docker-compose.yml --project-directory . up --build -d +docker-compose -f tests/integration/jinad/test_simple_hub_pods/docker-compose.yml --project-directory . up --build -d --remove-orphans sleep 10 @@ -30,7 +30,7 @@ curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: appli curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" -docker-compose -f tests/integration/jinad/test_simple_hub_pods/docker-compose.yml --project-directory . down +docker-compose -f tests/integration/jinad/test_simple_hub_pods/docker-compose.yml --project-directory . down --remove-orphans EXPECTED_TEXT='"text:hey, dude"' From 20a2e1a77d2a57fda9825f45841035e89776ec96 Mon Sep 17 00:00:00 2001 From: Deepankar Mahapatro Date: Tue, 5 Jan 2021 18:26:59 +0530 Subject: [PATCH 4/7] fix(jinad): log streaming --- jina/peapods/runtimes/jinad/__init__.py | 13 +++++- jina/peapods/runtimes/jinad/api.py | 56 +++++++++++++------------ 2 files changed, 42 insertions(+), 27 deletions(-) diff --git a/jina/peapods/runtimes/jinad/__init__.py b/jina/peapods/runtimes/jinad/__init__.py index f6f70e4e77e9b..9ff0bea822285 100644 --- a/jina/peapods/runtimes/jinad/__init__.py +++ b/jina/peapods/runtimes/jinad/__init__.py @@ -1,4 +1,5 @@ import argparse +from multiprocessing import Event from typing import Union, Dict, Optional from .api import get_jinad_api @@ -10,19 +11,29 @@ class JinadRuntime(ZMQManyRuntime): def __init__(self, args: Union['argparse.Namespace', Dict]): super().__init__(args) + self.exit_event = Event() + self.exit_event.clear() self.api = get_jinad_api(kind=self.remote_type, host=self.host, port=self.port_expose, logger=self.logger) def setup(self): + # Uploads Pod/Pea context to remote & Creates remote Pod/Pea using :class:`JinadAPI` if self._remote_id: self.logger.success(f'created remote {self.api.kind} with id {colored(self._remote_id, "cyan")}') def run_forever(self): - self.api.log(self._remote_id) + # Streams log messages using websocket from remote server. + # Waits for an `asyncio.Event` to be set to exit out of streaming loop + self.api.log(remote_id=self._remote_id, event=self.exit_event) + + def cancel(self): + # Indicates :meth:`run_forever` to exit by setting the `asyncio.Event` + self.exit_event.set() def teardown(self): + # Closes the remote Pod/Pea using :class:`JinadAPI` self.api.delete(remote_id=self._remote_id) @cached_property diff --git a/jina/peapods/runtimes/jinad/api.py b/jina/peapods/runtimes/jinad/api.py index d5b63af362b89..ed1ed19dbe991 100644 --- a/jina/peapods/runtimes/jinad/api.py +++ b/jina/peapods/runtimes/jinad/api.py @@ -2,7 +2,7 @@ import asyncio from pathlib import Path from contextlib import ExitStack -from multiprocessing.synchronize import Event +from multiprocessing import Event from typing import Dict, Tuple, Set, List, Optional from ....jaml import JAML @@ -68,6 +68,7 @@ def fetch_files_from_yaml(pea_args: Dict, logger: 'JinaLogger') -> Tuple[Set[str class JinadAPI: kind = 'pea' # select from pea/pod, TODO: enum + TIMEOUT_ERROR_CODE = 4000 def __init__(self, host: str, @@ -97,7 +98,7 @@ def __init__(self, self.upload_url = f'{rest_url}/upload' self.pea_url = f'{rest_url}/pea' self.pod_url = f'{rest_url}/pod' - self.log_url = f'{websocket_url}/wslog' + self.log_url = f'{websocket_url}/logstream' @property def is_alive(self) -> bool: @@ -161,11 +162,10 @@ def create(self, args: Dict, **kwargs) -> Optional[str]: except requests.exceptions.RequestException as ex: self.logger.error(f'couldn\'t create pod with remote jinad {repr(ex)}') - async def wslogs(self, remote_id: 'str', current_line: int = 0): + async def logstream(self, remote_id: 'str', event: Event): """ websocket log stream from remote pea/pod :param remote_id: the identity of that pea/pod - :param stop_event: the multiprocessing event which marks if stop event is set - :param current_line: the line number from which logs would be streamed + :param event: the multiprocessing event which marks if stop event is set :return: """ with ImportExtensions(required=True): @@ -175,42 +175,46 @@ async def wslogs(self, remote_id: 'str', current_line: int = 0): try: # sleeping for few seconds to allow the logs to be written in remote await asyncio.sleep(3) + async with websockets.connect(f'{self.log_url}/{remote_id}?timeout=5') as websocket: - await websocket.send(json.dumps({'from': current_line})) - async for log_line in websocket: - try: - log_line = json.loads(log_line) - current_line = int(list(log_line.keys())[0]) - log_line_dict = list(log_line.values())[0] - log_line_dict = json.loads(log_line_dict.split('\t')[-1].strip()) - name = log_line_dict['name'] - - if name not in remote_loggers: - remote_loggers[name] = JinaLogger(context=f'🌏 {name}') - # TODO(Deepankar): change logging level, process name in local logger - remote_loggers[name].info(f'{log_line_dict["message"].strip()}') - except json.decoder.JSONDecodeError: - continue - await websocket.send(json.dumps({})) + current_line_number = -1 + + while not event.is_set(): + self.logger.warning(f'fetching logs from line# {int(current_line_number) + 1}, event: {event}') + await websocket.send(json.dumps({'from': int(current_line_number) + 1})) + async for log_line in websocket: + try: + log_line = json.loads(log_line) + if 'code' in log_line and log_line['code'] == self.TIMEOUT_ERROR_CODE: + self.logger.info(f'Received timeout from the log server. Breaking') + break + current_line_number = list(log_line.keys())[0] + complete_log_message = log_line[current_line_number] + log_line_dict = json.loads(complete_log_message.split('\t')[-1].strip()) + name = log_line_dict['name'] + if name not in remote_loggers: + remote_loggers[name] = JinaLogger(context=f'🌏 {name}') + # TODO(Deepankar): change logging level, process name in local logger + remote_loggers[name].info(f'{log_line_dict["message"].strip()}') + except json.decoder.JSONDecodeError: + continue except websockets.exceptions.ConnectionClosedOK: self.logger.debug(f'Client got disconnected from server') - return current_line except websockets.exceptions.WebSocketException as e: self.logger.error(f'Got following error while streaming logs via websocket {repr(e)}') - return 0 finally: if remote_loggers: for logger in remote_loggers.values(): logger.close() - def log(self, remote_id: 'str', **kwargs) -> None: + def log(self, remote_id: 'str', event: Event, **kwargs) -> None: """ Start the log stream from remote pea/pod, will use local logger for output :param remote_id: the identity of that pea/pod :return: """ try: - self.logger.info(f'🌏 Fetching streamed logs from remote id: {remote_id}') - asyncio.run(self.wslogs(remote_id=remote_id, current_line=0)) + self.logger.info(f'🌏 Fetching streamed logs from remote id: {remote_id}, event: {event}') + asyncio.run(self.logstream(remote_id=remote_id, event=event)) finally: self.logger.info(f'🌏 Exiting from remote logger') From 937df8e34815112e7aa0487f2729a8937601f149 Mon Sep 17 00:00:00 2001 From: Deepankar Mahapatro Date: Tue, 5 Jan 2021 23:13:30 +0530 Subject: [PATCH 5/7] fix(jinad): moved jinad runtime to async --- jina/peapods/runtimes/jinad/__init__.py | 25 ++++++++++++++----------- jina/peapods/runtimes/jinad/api.py | 25 ++++++++----------------- 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/jina/peapods/runtimes/jinad/__init__.py b/jina/peapods/runtimes/jinad/__init__.py index 9ff0bea822285..9d58ecd187a4e 100644 --- a/jina/peapods/runtimes/jinad/__init__.py +++ b/jina/peapods/runtimes/jinad/__init__.py @@ -1,18 +1,22 @@ import argparse -from multiprocessing import Event +import asyncio from typing import Union, Dict, Optional +from ...zmq import Zmqlet from .api import get_jinad_api -from ..zmq.base import ZMQManyRuntime +from ..asyncio.base import AsyncZMQRuntime from ....helper import cached_property, ArgNamespace, colored -class JinadRuntime(ZMQManyRuntime): +class JinadRuntime(AsyncZMQRuntime): def __init__(self, args: Union['argparse.Namespace', Dict]): super().__init__(args) - self.exit_event = Event() - self.exit_event.clear() + self.ctrl_addr = Zmqlet.get_ctrl_address(None, None, True)[0] + self.timeout_ctrl = args.timeout_ctrl + self.host = args.host + self.port_expose = args.port_expose + self.remote_type = args.remote_type self.api = get_jinad_api(kind=self.remote_type, host=self.host, port=self.port_expose, @@ -23,14 +27,13 @@ def setup(self): if self._remote_id: self.logger.success(f'created remote {self.api.kind} with id {colored(self._remote_id, "cyan")}') - def run_forever(self): + async def async_run_forever(self): # Streams log messages using websocket from remote server. - # Waits for an `asyncio.Event` to be set to exit out of streaming loop - self.api.log(remote_id=self._remote_id, event=self.exit_event) + self.logging_task = asyncio.create_task(self.api.logstream(self._remote_id)) - def cancel(self): - # Indicates :meth:`run_forever` to exit by setting the `asyncio.Event` - self.exit_event.set() + async def async_cancel(self): + # Cancels the logging task + self.logging_task.cancel() def teardown(self): # Closes the remote Pod/Pea using :class:`JinadAPI` diff --git a/jina/peapods/runtimes/jinad/api.py b/jina/peapods/runtimes/jinad/api.py index ed1ed19dbe991..c393589fe5bd6 100644 --- a/jina/peapods/runtimes/jinad/api.py +++ b/jina/peapods/runtimes/jinad/api.py @@ -162,15 +162,15 @@ def create(self, args: Dict, **kwargs) -> Optional[str]: except requests.exceptions.RequestException as ex: self.logger.error(f'couldn\'t create pod with remote jinad {repr(ex)}') - async def logstream(self, remote_id: 'str', event: Event): + async def logstream(self, remote_id: 'str'): """ websocket log stream from remote pea/pod :param remote_id: the identity of that pea/pod - :param event: the multiprocessing event which marks if stop event is set :return: """ with ImportExtensions(required=True): import websockets + self.logger.info(f'🌏 Fetching streamed logs from remote id: {remote_id}') remote_loggers = {} try: # sleeping for few seconds to allow the logs to be written in remote @@ -179,8 +179,7 @@ async def logstream(self, remote_id: 'str', event: Event): async with websockets.connect(f'{self.log_url}/{remote_id}?timeout=5') as websocket: current_line_number = -1 - while not event.is_set(): - self.logger.warning(f'fetching logs from line# {int(current_line_number) + 1}, event: {event}') + while True: await websocket.send(json.dumps({'from': int(current_line_number) + 1})) async for log_line in websocket: try: @@ -199,25 +198,17 @@ async def logstream(self, remote_id: 'str', event: Event): except json.decoder.JSONDecodeError: continue except websockets.exceptions.ConnectionClosedOK: - self.logger.debug(f'Client got disconnected from server') + self.logger.error(f'🌏 Client got disconnected from server') except websockets.exceptions.WebSocketException as e: - self.logger.error(f'Got following error while streaming logs via websocket {repr(e)}') + self.logger.error(f'🌏 Got following error while streaming logs via websocket {repr(e)}') + except asyncio.CancelledError: + self.logger.info(f'🌏 Logging task cancelled successfully') finally: + self.logger.info(f'🌏 Exiting from remote loggers') if remote_loggers: for logger in remote_loggers.values(): logger.close() - def log(self, remote_id: 'str', event: Event, **kwargs) -> None: - """ Start the log stream from remote pea/pod, will use local logger for output - :param remote_id: the identity of that pea/pod - :return: - """ - try: - self.logger.info(f'🌏 Fetching streamed logs from remote id: {remote_id}, event: {event}') - asyncio.run(self.logstream(remote_id=remote_id, event=event)) - finally: - self.logger.info(f'🌏 Exiting from remote logger') - def delete(self, remote_id: 'str', **kwargs) -> bool: """ Delete a remote pea/pod :param kind: pea/pod From d92794ff07c82eade6ec9180829a186d443ddddf Mon Sep 17 00:00:00 2001 From: Deepankar Mahapatro Date: Tue, 5 Jan 2021 23:15:15 +0530 Subject: [PATCH 6/7] test: adding extra log lines --- .../jinad/test_index_query/test_integration.sh | 8 ++++++++ .../test_index_query_with_shards/test_integration.sh | 6 ++++++ .../jinad/test_simple_distributed/test_integration.sh | 2 ++ .../test_integration.sh | 2 ++ 4 files changed, 18 insertions(+) diff --git a/tests/integration/jinad/test_index_query/test_integration.sh b/tests/integration/jinad/test_index_query/test_integration.sh index bfeb1a7022d37..86af8cf4d680a 100755 --- a/tests/integration/jinad/test_index_query/test_integration.sh +++ b/tests/integration/jinad/test_index_query/test_integration.sh @@ -23,7 +23,11 @@ echo "Successfully started the flow: ${FLOW_ID}. Let's index some data" TEXT_INDEXED=$(curl -s --request POST -d '{"top_k": 10, "data": ["text:hey, dude"]}' -H 'Content-Type: application/json' '0.0.0.0:45678/api/index' | \ jq -e ".index.docs[] | .text") echo "Indexed document has the text: ${TEXT_INDEXED}" + +echo "Getting status code of the Flow: " curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" + +echo "Closing Flow context.." curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" #Query part @@ -40,7 +44,11 @@ echo "Successfully started the flow: ${FLOW_ID}. Let's send some query" TEXT_MATCHED=$(curl -s --request POST -d '{"top_k": 10, "data": ["text:anything will match the same"]}' -H 'Content-Type: application/json' '0.0.0.0:45678/api/search' | \ jq -e ".search.docs[] | .matches[] | .text") echo "document matched has the text: ${TEXT_INDEXED}" + +echo "Getting status code of the Flow: " curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" + +echo "Closing Flow context.." curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . down --remove-orphans diff --git a/tests/integration/jinad/test_index_query_with_shards/test_integration.sh b/tests/integration/jinad/test_index_query_with_shards/test_integration.sh index 0acec390d53b8..ef5585eddc318 100755 --- a/tests/integration/jinad/test_index_query_with_shards/test_integration.sh +++ b/tests/integration/jinad/test_index_query_with_shards/test_integration.sh @@ -30,7 +30,10 @@ for i in {1..100}; echo "Indexed document has the text: ${TEXT_INDEXED}" done +echo "Getting status code of the Flow: " curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" + +echo "Closing Flow context.." curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" #Query part @@ -56,7 +59,10 @@ rm count.txt echo "found ${COUNT} matches" +echo "Getting status code of the Flow: " curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" + +echo "Closing Flow context.." curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . down --remove-orphans diff --git a/tests/integration/jinad/test_simple_distributed/test_integration.sh b/tests/integration/jinad/test_simple_distributed/test_integration.sh index 599c46f486de9..c4b909948524d 100755 --- a/tests/integration/jinad/test_simple_distributed/test_integration.sh +++ b/tests/integration/jinad/test_simple_distributed/test_integration.sh @@ -23,8 +23,10 @@ TEXT_BACK=$(curl -s --request POST -d '{"top_k": 10, "data": ["text:hey, dude"]} echo "Returned document has the text: ${TEXT_BACK}" +echo "Getting status code of the Flow: " curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" +echo "Closing Flow context.." curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" docker-compose -f tests/integration/jinad/test_simple_distributed/docker-compose.yml --project-directory . down --remove-orphans diff --git a/tests/integration/jinad/test_simple_distributed_with_shards/test_integration.sh b/tests/integration/jinad/test_simple_distributed_with_shards/test_integration.sh index 0fe0560f1caf3..c3b21f1d09a94 100755 --- a/tests/integration/jinad/test_simple_distributed_with_shards/test_integration.sh +++ b/tests/integration/jinad/test_simple_distributed_with_shards/test_integration.sh @@ -23,8 +23,10 @@ TEXT_BACK=$(curl -s --request POST -d '{"top_k": 10, "data": ["text:hey, dude"]} echo "Returned document has the text: ${TEXT_BACK}" +echo "Getting status code of the Flow: " curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" +echo "Closing Flow context.." curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code" docker-compose -f tests/integration/jinad/test_simple_distributed_with_shards/docker-compose.yml --project-directory . down --remove-orphans From b3d42e1c2211edbaa5f53bb11b5526d8de3c474f Mon Sep 17 00:00:00 2001 From: Deepankar Mahapatro Date: Wed, 6 Jan 2021 00:00:14 +0530 Subject: [PATCH 7/7] test: add polling to flow yml --- tests/integration/jinad/test_index_query_with_shards/flow.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/jinad/test_index_query_with_shards/flow.yml b/tests/integration/jinad/test_index_query_with_shards/flow.yml index 41d8b161f81f6..cf83a1f65d95f 100644 --- a/tests/integration/jinad/test_index_query_with_shards/flow.yml +++ b/tests/integration/jinad/test_index_query_with_shards/flow.yml @@ -14,6 +14,7 @@ pods: uses: index.yml read_only: False parallel: 3 + polling: all separated_workspace: True host: $JINA_INDEXER_HOST port_expose: 8000