8000 refactor(jinad): log streaming from remote by deepankarm · Pull Request #1584 · jina-ai/serve · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

refactor(jinad): log streaming from remote #1584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions jina/excepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,6 @@ class BadNamedScoreType(TypeError):
""" Exception when can not construct a named score from the given data """


class RemotePodClosed(Exception):
""" Exception when remote pod is closed and log streaming needs to exit """


class LengthMismatchException(Exception):
""" Exception when length of two items should be identical while not """

Expand Down
25 changes: 21 additions & 4 deletions jina/peapods/runtimes/jinad/__init__.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,43 @@
import argparse
import asyncio
from typing import Union, Dict, Optional

from ...zmq import Zmqlet
from .api import get_jinad_api
from ..zmq.base import ZMQManyRuntime
from ..asyncio.base import AsyncZMQRuntime
from ....helper import cached_property, ArgNamespace, colored


class JinadRuntime(ZMQManyRuntime):
class JinadRuntime(AsyncZMQRuntime):

def __init__(self, args: Union['argparse.Namespace', Dict]):
super().__init__(args)
self.ctrl_addr = Zmqlet.get_ctrl_address(None, None, True)[0]
self.timeout_ctrl = args.timeout_ctrl
self.host = args.host
self.port_expose = args.port_expose
self.remote_type = args.remote_type
self.api = get_jinad_api(kind=self.remote_type,
host=self.host,
port=self.port_expose,
logger=self.logger)

def setup(self):
# Uploads Pod/Pea context to remote & Creates remote Pod/Pea using :class:`JinadAPI`
if self._remote_id:
self.logger.success(f'created remote {self.api.kind} with id {colored(self._remote_id, "cyan")}')

def run_forever(self):
self.api.log(self._remote_id, None)
async def async_run_forever(self):
# Streams log messages using websocket from remote server.
self.logging_task = asyncio.create_task(self.api.logstream(self._remote_id))

async def async_cancel(self):
# Cancels the logging task
self.logging_task.cancel()

def teardown(self):
# Closes the remote Pod/Pea using :class:`JinadAPI`
self.api.delete(remote_id=self._remote_id)

@cached_property
def _remote_id(self) -> Optional[str]:
Expand Down
63 changes: 27 additions & 36 deletions jina/peapods/runtimes/jinad/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
import asyncio
from pathlib import Path
from contextlib import ExitStack
from multiprocessing.synchronize import Event
from multiprocessing import Event
from typing import Dict, Tuple, Set, List, Optional

from ....jaml import JAML
from ....logging import JinaLogger
from ....enums import RemotePeapodType
from ....excepts import RemotePodClosed
from ....importer import ImportExtensions


Expand Down Expand Up @@ -69,6 +68,7 @@ def fetch_files_from_yaml(pea_args: Dict, logger: 'JinaLogger') -> Tuple[Set[str

class JinadAPI:
kind = 'pea' # select from pea/pod, TODO: enum
TIMEOUT_ERROR_CODE = 4000

def __init__(self,
host: str,
Expand Down Expand Up @@ -98,7 +98,7 @@ def __init__(self,
self.upload_url = f'{rest_url}/upload'
self.pea_url = f'{rest_url}/pea'
self.pod_url = f'{rest_url}/pod'
self.log_url = f'{websocket_url}/wslog'
self.log_url = f'{websocket_url}/logstream'

@property
def is_alive(self) -> bool:
Expand Down Expand Up @@ -162,61 +162,52 @@ def create(self, args: Dict, **kwargs) -> Optional[str]:
except requests.exceptions.RequestException as ex:
self.logger.error(f'couldn\'t create pod with remote jinad {repr(ex)}')

async def wslogs(self, remote_id: 'str', stop_event: Event, current_line: int = 0):
async def logstream(self, remote_id: 'str'):
""" websocket log stream from remote pea/pod
:param remote_id: the identity of that pea/pod
:param stop_event: the multiprocessing event which marks if stop event is set
:param current_line: the line number from which logs would be streamed
:return:
"""
with ImportExtensions(required=True):
import websockets

self.logger.info(f'🌏 Fetching streamed logs from remote id: {remote_id}')
remote_loggers = {}
try:
# sleeping for few seconds to allow the logs to be written in remote
await asyncio.sleep(3)
async with websockets.connect(f'{self.log_url}/{remote_id}?timeout=20') as websocket:
await websocket.send(json.dumps({'from': current_line}))
remote_loggers = {}

async with websockets.connect(f'{self.log_url}/{remote_id}?timeout=5') as websocket:
current_line_number = -1

while True:
log_line = await websocket.recv()
if log_line:
await websocket.send(json.dumps({'from': int(current_line_number) + 1}))
async for log_line in websocket:
try:
log_line = json.loads(log_line)
current_line = int(list(log_line.keys())[0])
log_line_dict = list(log_line.values())[0]
log_line_dict = json.loads(log_line_dict.split('\t')[-1].strip())
if 'code' in log_line and log_line['code'] == self.TIMEOUT_ERROR_CODE:
self.logger.info(f'Received timeout from the log server. Breaking')
break
current_line_number = list(log_line.keys())[0]
complete_log_message = log_line[current_line_number]
log_line_dict = json.loads(complete_log_message.split('\t')[-1].strip())
name = log_line_dict['name']
if name not in remote_loggers:
remote_loggers[name] = JinaLogger(context=f'🌏 {name}')
# TODO: change logging level, process name in local logger
# TODO(Deepankar): change logging level, process name in local logger
remote_loggers[name].info(f'{log_line_dict["message"].strip()}')
except json.decoder.JSONDecodeError:
continue
await websocket.send(json.dumps({}))
if stop_event.is_set():
for logger in remote_loggers.values():
logger.close()
raise RemotePodClosed
except websockets.exceptions.ConnectionClosedOK:
self.logger.debug(f'Client got disconnected from server')
return current_line
self.logger.error(f'🌏 Client got disconnected from server')
except websockets.exceptions.WebSocketException as e:
self.logger.error(f'Got following error while streaming logs via websocket {repr(e)}')
return 0

def log(self, remote_id: 'str', stop_event: Event, **kwargs) -> None:
""" Start the log stream from remote pea/pod, will use local logger for output
:param remote_id: the identity of that pea/pod
:return:
"""
try:
self.logger.info(f'fetching streamed logs from remote id: {remote_id}')
asyncio.run(self.wslogs(remote_id=remote_id, stop_event=stop_event, current_line=0))
except RemotePodClosed:
self.logger.debug(f'🌏 remote closed')
self.logger.error(f'🌏 Got following error while streaming logs via websocket {repr(e)}')
except asyncio.CancelledError:
self.logger.info(f'🌏 Logging task cancelled successfully')
finally:
self.logger.info(f'🌏 exiting from remote logger')
self.logger.info(f'🌏 Exiting from remote loggers')
if remote_loggers:
for logger in remote_loggers.values():
logger.close()

def delete(self, remote_id: 'str', **kwargs) -> bool:
""" Delete a remote pea/pod
Expand Down
7 changes: 4 additions & 3 deletions tests/integration/jinad/Dockerfiles/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ WORKDIR /jina/
ADD setup.py MANIFEST.in requirements.txt extra-requirements.txt README.md ./
ADD cli ./cli/
ADD jina ./jina/
ADD tests/integration/jinad/Dockerfiles/entrypoint.sh ./

ARG PIP_TAG=[test]

RUN chmod +x entrypoint.sh
RUN pip install ."$PIP_TAG"

# This doesn't start fluentd in the background
# add entrypoint script if fluentd needs to be enabled for tests
ENTRYPOINT ["jinad"]
# This starts both jinad & fluentd in the background
ENTRYPOINT ["bash", "-c", "./entrypoint.sh"]
12 changes: 12 additions & 0 deletions tests/integration/jinad/Dockerfiles/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash

CONF_PATH=$(python3 -c "import pkg_resources; print(pkg_resources.resource_filename('jina', 'resources/fluent.conf'))")

# Start fluentd in the background
nohup fluentd -c $CONF_PATH &

# Allowing fluentd conf to load by sleeping for 2secs
sleep 2

# Start jinad (uvicorn) server
jinad
12 changes: 10 additions & 2 deletions tests/integration/jinad/test_index_query/test_integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ if [ "${PWD##*/}" != "jina" ]
exit 1
fi

docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . up --build -d
docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . up --build -d --remove-orphans

sleep 10
#Indexing part
Expand All @@ -23,7 +23,11 @@ echo "Successfully started the flow: ${FLOW_ID}. Let's index some data"
TEXT_INDEXED=$(curl -s --request POST -d '{"top_k": 10, "data": ["text:hey, dude"]}' -H 'Content-Type: application/json' '0.0.0.0:45678/api/index' | \
jq -e ".index.docs[] | .text")
echo "Indexed document has the text: ${TEXT_INDEXED}"

echo "Getting status code of the Flow: "
curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

echo "Closing Flow context.."
curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

#Query part
Expand All @@ -40,10 +44,14 @@ echo "Successfully started the flow: ${FLOW_ID}. Let's send some query"
TEXT_MATCHED=$(curl -s --request POST -d '{"top_k": 10, "data": ["text:anything will match the same"]}' -H 'Content-Type: application/json' '0.0.0.0:45678/api/search' | \
jq -e ".search.docs[] | .matches[] | .text")
echo "document matched has the text: ${TEXT_INDEXED}"

echo "Getting status code of the Flow: "
curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

echo "Closing Flow context.."
curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . down
docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . down --remove-orphans

EXPECTED_TEXT='"text:hey, dude"'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pods:
uses: index.yml
read_only: False
parallel: 3
polling: all
separated_workspace: True
host: $JINA_INDEXER_HOST
port_expose: 8000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ if [ "${PWD##*/}" != "jina" ]
exit 1
fi

docker-compose -f tests/integration/jinad/test_index_query_with_shards/docker-compose.yml --project-directory . up --build -d

docker-compose -f tests/integration/jinad/test_index_query_with_shards/docker-compose.yml --project-directory . up --build -d --remove-orphans

sleep 10
#Indexing part
Expand All @@ -29,7 +30,10 @@ for i in {1..100};
echo "Indexed document has the text: ${TEXT_INDEXED}"
done

echo "Getting status code of the Flow: "
curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

echo "Closing Flow context.."
curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

#Query part
Expand All @@ -55,10 +59,13 @@ rm count.txt

echo "found ${COUNT} matches"

echo "Getting status code of the Flow: "
curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

echo "Closing Flow context.."
curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . down
docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . down --remove-orphans

if [ $COUNT = 10 ]; then
echo "Success"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ if [ "${PWD##*/}" != "jina" ]
exit 1
fi

docker-compose -f tests/integration/jinad/test_simple_distributed/docker-compose.yml --project-directory . up --build -d
docker-compose -f tests/integration/jinad/test_simple_distributed/docker-compose.yml --project-directory . up --build -d --remove-orphans

sleep 10

Expand All @@ -23,11 +23,13 @@ TEXT_BACK=$(curl -s --request POST -d '{"top_k": 10, "data": ["text:hey, dude"]}

echo "Returned document has the text: ${TEXT_BACK}"

echo "Getting status code of the Flow: "
curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

echo "Closing Flow context.."
curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

docker-compose -f tests/integration/jinad/test_simple_distributed/docker-compose.yml --project-directory . down
docker-compose -f tests/integration/jinad/test_simple_distributed/docker-compose.yml --project-directory . down --remove-orphans

EXPECTED_TEXT='"text:hey, dude"'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ if [ "${PWD##*/}" != "jina" ]
exit 1
fi

docker-compose -f tests/integration/jinad/test_simple_distributed_with_shards/docker-compose.yml --project-directory . up --build -d
docker-compose -f tests/integration/jinad/test_simple_distributed_with_shards/docker-compose.yml --project-directory . up --build -d --remove-orphans

sleep 10

Expand All @@ -23,11 +23,13 @@ TEXT_BACK=$(curl -s --request POST -d '{"top_k": 10, "data": ["text:hey, dude"]}

echo "Returned document has the text: ${TEXT_BACK}"

echo "Getting status code of the Flow: "
curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

echo "Closing Flow context.."
curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

docker-compose -f tests/integration/jinad/test_simple_distributed_with_shards/docker-compose.yml --project-directory . down
docker-compose -f tests/integration/jinad/test_simple_distributed_with_shards/docker-compose.yml --project-directory . down --remove-orphans

EXPECTED_TEXT='"text:hey, dude"'

Expand Down
4 changes: 2 additions & 2 deletions tests/integration/jinad/test_simple_hub_pods/test_integration.sh
427E
517C
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ if [ "${PWD##*/}" != "jina" ]
exit 1
fi

docker-compose -f tests/integration/jinad/test_simple_hub_pods/docker-compose.yml --project-directory . up --build -d
docker-compose -f tests/integration/jinad/test_simple_hub_pods/docker-compose.yml --project-directory . up --build -d --remove-orphans

sleep 10

Expand All @@ -30,7 +30,7 @@ curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: appli

curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

docker-compose -f tests/integration/jinad/test_simple_hub_pods/docker-compose.yml --project-directory . down
docker-compose -f tests/integration/jinad/test_simple_hub_pods/docker-compose.yml --project-directory . down --remove-orphans

EXPECTED_TEXT='"text:hey, dude"'

Expand Down
0