8000 feat(daemon): async interaction with children by deepankarm · Pull Request #2948 · jina-ai/serve · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(daemon): async interaction with children #2948

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/2.0/cookbooks/Daemon.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Table of Contents
- [Run](#run)
- [Example Usage](#example-usage)
- [Development using JinaD](#development-using-jinad)
- [Build](#build)
- [Run](#run-1)
- [Why?](#why)
- [Metaworks](#metaworks)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

Expand Down Expand Up @@ -149,6 +153,7 @@ docker build -f Dockerfiles/debianx.Dockerfile --build-arg PIP_TAG=daemon -t jin
docker run --add-host host.docker.internal:host-gateway \
--name jinad \
-e JINA_DAEMON_BUILD=DEVEL \
-e JINA_LOG_LEVEL=DEBUG \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /tmp/jinad:/tmp/jinad \
-p 8000:8000 \
Expand Down
57 changes: 37 additions & 20 deletions daemon/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import asyncio
import pathlib
import subprocess
from pathlib import Path
Expand Down Expand Up @@ -120,20 +121,10 @@ def _get_app(mode=None):
return app


def _start_uvicorn(app: 'FastAPI'):
config = Config(
app=app,
host=jinad_args.host,
port=jinad_args.port_expose,
loop='uvloop',
log_level='error',
)
server = Server(config=config)
server.run()

from jina import __stop_msg__

daemon_logger.success(__stop_msg__)
def _update_default_args():
global jinad_args, __root_workspace__
jinad_args = _get_run_args()
__root_workspace__ = '/workspace' if jinad_args.mode else jinad_args.workspace


def _start_fluentd():
Expand All @@ -160,17 +151,43 @@ def _start_consumer():
ConsumerThread().start()


def _update_default_args():
global jinad_args, __root_workspace__
jinad_args = _get_run_args()
__root_workspace__ = '/workspace' if jinad_args.mode else jinad_args.workspace
def _start_uvicorn(app: 'FastAPI'):
config = Config(
app=app,
host=jinad_args.host,
port=jinad_args.port_expose,
loop='uvloop',
log_level='error',
)
server = Server(config=config)
server.run()


def main():
"""Entrypoint for jinad"""
def setup():
"""Setup steps for JinaD"""
_update_default_args()
pathlib.Path(__root_workspace__).mkdir(parents=True, exist_ok=True)
if not jinad_args.no_fluentd:
Thread(target=_start_fluentd, daemon=True).start()
_start_consumer()
_start_uvicorn(app=_get_app(mode=jinad_args.mode))


def teardown():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we stop the consumer thread and uvicorn here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consumer thread is a daemon thread, so no need to close it manually.
Uvicorn handles its own shutdown logic in serve itself for different signal handlers.

"""Cleanup steps for JinaD"""
from jina import __stop_msg__

daemon_logger.success(__stop_msg__)
daemon_logger.close()


def main():
"""Entrypoint for JinaD"""
try:
setup()
except KeyboardInterrupt:
pass
except Exception as e:
daemon_logger.info(f'error while server was running {e!r}')
finally:
teardown()
8 changes: 4 additions & 4 deletions daemon/api/endpoints/flows.py
9E7A
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def _fetch_flow_params():
)
async def _create(flow: FlowDepends = Depends(FlowDepends)):
try:
return store.add(
return await store.add(
id=flow.id,
workspace_id=flow.workspace_id,
params=flow.params,
Expand All @@ -53,7 +53,7 @@ async def _update(
pod_name: str,
shards: int = None,
):
return store.update(id, kind, dump_path, pod_name, shards)
return await store.update(id, kind, dump_path, pod_name, shards)


# order matters! this must be put in front of del {id}
Expand All @@ -63,7 +63,7 @@ async def _update(
summary='Terminate all running Flows',
)
async def _clear_all():
store.clear()
await store.clear()


@router.delete(
Expand All @@ -73,7 +73,7 @@ async def _clear_all():
)
async def _delete(id: DaemonID):
try:
store.delete(id=id)
await store.delete(id=id)
except KeyError:
raise HTTPException(status_code=404, detail=f'{id} not found in {store!r}')

Expand Down
6 changes: 3 additions & 3 deletions daemon/api/endpoints/peas.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def _fetch_pea_params():
)
async def _create(pea: PeaDepends = Depends(PeaDepends)):
try:
return store.add(
return await store.add(
id=pea.id,
workspace_id=pea.workspace_id,
params=pea.params,
Expand All @@ -49,7 +49,7 @@ async def _create(pea: PeaDepends = Depends(PeaDepends)):
summary='Terminate all running Peas',
)
async def _clear_all():
store.clear()
await store.clear()


@router.delete(
Expand All @@ -59,7 +59,7 @@ async def _clear_all():
)
async def _delete(id: DaemonID, workspace: bool = False):
try:
store.delete(id=id, workspace=workspace)
await store.delete(id=id, workspace=workspace)
except KeyError:
raise HTTPException(status_code=404, detail=f'{id} not found in {store!r}')

Expand Down
7 changes: 4 additions & 3 deletions daemon/api/endpoints/pods.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def _fetch_pod_params():
)
async def _create(pod: PodDepends = Depends(PodDepends)):
try:
return store.add(
return await store.add(
id=pod.id,
workspace_id=pod.workspace_id,
params=pod.params,
Expand All @@ -45,7 +45,7 @@ async def _create(pod: PodDepends = Depends(PodDepends)):
summary='Terminate all running Pods',
)
async def _clear_all():
store.clear()
await store.clear()


@router.delete(
Expand All @@ -55,7 +55,7 @@ async def _clear_all():
)
async def _delete(id: DaemonID, workspace: bool = False):
try:
store.delete(id=id, workspace=workspace)
await store.delete(id=id, workspace=workspace)
except KeyError:
raise HTTPException(status_code=404, detail=f'{id} not found in {store!r}')

Expand All @@ -73,6 +73,7 @@ async def _status(id: DaemonID):
@router.put(path='/{id}/rolling_update', summary='Trigger a rolling update on this Pod')
async def _rolling_update(id: DaemonID, dump_path: str):
try:
# TODO: This logic should move to store
return requests.put(
f'{store[id].metadata.rest_api_uri}/pod/rolling_update',
params={'dump_path': dump_path},
Expand Down
25 changes: 24 additions & 1 deletion daemon/helper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import os
import re
from typing import TYPE_CHECKING, Tuple
from typing import Callable, TYPE_CHECKING, Tuple

import aiohttp

from .excepts import Runtime400Exception

if TYPE_CHECKING:
from .models import DaemonID
Expand Down Expand Up @@ -65,3 +69,22 @@ def get_log_file_path(log_id: 'DaemonID') -> Tuple[str, 'DaemonID']:
workspace_id = get_store_from_id(log_id)[log_id].workspace_id
filepath = get_workspace_path(workspace_id, 'logs', log_id, 'logging.log')
return filepath, workspace_id


def raise_if_not_alive(func: Callable):
"""Decorator to be used in store for connection valiation

:param func: function to be wrapped
:return: wrapped function
"""

async def wrapper(self, *args, **kwargs):
try:
return await func(self, *args, **kwargs)
except aiohttp.ClientConnectionError as e:
self._logger.error(f'connection to server failed: {e!r}')
raise Runtime400Exception(
f'connection to server failed during {func.__name__} for {self._kind.title()}'
)

return wrapper
2 changes: 1 addition & 1 deletion daemon/models/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ContainerMetadata(BaseModel):
image_id: str
network: str
ports: Dict
host: str
uri: str


class ContainerItem(StoreItem):
Expand Down
E377 25 changes: 9 additions & 16 deletions daemon/stores/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import os
import pickle
import shutil
from collections.abc import MutableMapping
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from datetime import datetime
from collections.abc import MutableMapping
from typing import Callable, Dict, Sequence, TYPE_CHECKING, Tuple, Union

from jina.logging.logger import JinaLogger
Expand Down Expand Up @@ -45,6 +44,13 @@ def delete(self, *args, **kwargs) -> DaemonID:
"""Deletes an element from the store. This method needs to be overridden by the subclass


.. #noqa: DAR101"""
raise NotImplementedError

def clear(self) -> None:
"""Deletes an element from the store. This method needs to be overridden by the subclass


.. #noqa: DAR101"""
raise NotImplementedError

Expand Down Expand Up @@ -154,21 +160,8 @@ def load(cls) -> Union[Dict, 'BaseStore']:
else:
return cls()

def clear(self, **kwargs) -> None:
"""Delete all the objects in the store

:param kwargs: keyward args
"""

_status = deepcopy(self.status)
for k in _status.items.keys():
self.delete(id=k, workspace=True, **kwargs)

def reset(self) -> None:
"""Calling :meth:`clear` and reset all stats """

self.clear()
self.status = self._status_model()

def __len__(self):
return len(self.items())
Loading
0