8000 feat: connect to pred config by maximilianwerk · Pull Request #3051 · jina-ai/serve · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: connect to pred config #3051

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cli/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
'--memory-hwm',
'--on-error-strategy',
'--num-part',
'--dynamic-routing',
'--dynamic-routing-out',
'--dynamic-routing-in',
'--entrypoint',
Expand Down Expand Up @@ -114,6 +113,8 @@
'--external',
'--peas-hosts',
'--pod-role',
'--no-dynamic-routing',
'--connect-to-predecessor',
],
'pod': [
'--help',
Expand Down Expand Up @@ -147,7 +148,6 @@
'--memory-hwm',
'--on-error-strategy',
'--num-part',
'--dynamic-routing',
'--dynamic-routing-out',
'--dynamic-routing-in',
'--entrypoint',
Expand Down Expand Up @@ -182,6 +182,8 @@
'--external',
'--peas-hosts',
'--pod-role',
'--no-dynamic-routing',
'--connect-to-predecessor',
],
'flow': [
'--help',
Expand Down Expand Up @@ -228,7 +230,6 @@
'--memory-hwm',
'--on-error-strategy',
'--num-part',
'--dynamic-routing',
'--dynamic-routing-out',
'--dynamic-routing-in',
'--prefetch',
Expand Down Expand Up @@ -259,6 +260,8 @@
'--noblock-on-start',
'--runs-in-docker',
'--routing-table',
'--dynamic-routing',
'--connect-to-predecessor',
],
'hub new': ['--help'],
'hub push': [
Expand Down Expand Up @@ -304,7 +307,6 @@
'--memory-hwm',
'--on-error-strategy',
'--num-part',
'--dynamic-routing',
'--dynamic-routing-out',
'--dynamic-routing-in',
'--entrypoint',
Expand Down
9 changes: 8 additions & 1 deletion jina/flow/base.py
8000
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def __init__(
compress: Optional[str] = 'NONE',
compress_min_bytes: Optional[int] = 1024,
compress_min_ratio: Optional[float] = 1.1,
connect_to_predecessor: Optional[bool] = False,
cors: Optional[bool] = False,
ctrl_with_ipc: Optional[bool] = True,
daemon: Optional[bool] = False,
Expand Down Expand Up @@ -144,6 +145,7 @@ def __init__(
it depends on the settings of `--compress-min-bytes` and `compress-min-ratio`
:param compress_min_bytes: The original message size must be larger than this number to trigger the compress algorithm, -1 means disable compression.
:param compress_min_ratio: The compression ratio (uncompressed_size/compressed_size) must be higher than this number to trigger the compress algorithm.
:param connect_to_predecessor: The head Pea of this Pod will connect to the TailPea of the predecessor Pod.
:param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.
:param ctrl_with_ipc: If set, use ipc protocol for control socket
:param daemon: The Pea attempts to terminate all of its Runtime child processes/threads on existing. setting it to true basically tell the Pea do not wait on the Runtime when closing
Expand Down Expand Up @@ -431,6 +433,7 @@ def needs_all(self, name: str = 'joiner', *args, **kwargs) -> 'Flow':
@overload
def add(
self,
connect_to_predecessor: Optional[bool] = False,
ctrl_with_ipc: Optional[bool] = False,
daemon: Optional[bool] = False,
docker_kwargs: Optional[dict] = None,
Expand Down Expand Up @@ -484,6 +487,7 @@ def add(
) -> Union['Flow', 'AsyncFlow']:
"""Add an Executor to the current Flow object.

:param connect_to_predecessor: The head Pea of this Pod will connect to the TailPea of the predecessor Pod.
:param ctrl_with_ipc: If set, use ipc protocol for control socket
:param daemon: The Pea attempts to terminate all of its Runtime child processes/threads on existing. setting it to true basically tell the Pea do not wait on the Runtime when closing
:param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker '
Expand Down Expand Up @@ -804,7 +808,10 @@ def _get_routing_table(self) -> RoutingTable:
start = f'start-{GATEWAY_NAME}'

start_pod = graph._get_target_pod(start)
if is_remote_local_connection(start_pod.host, pod.head_host):

if pod.connect_to_predecessor or is_remote_local_connection(
start_pod.host, pod.head_host
):
pod.head_args.hosts_in_connect.append(
graph._get_target_pod(start).full_out_address
)
Expand Down
16 changes: 16 additions & 0 deletions jina/parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,22 @@ def set_gateway_parser(parser=None):
help='Routing graph for the gateway' if _SHOW_ALL_ARGS else argparse.SUPPRESS,
)

parser.add_argument(
'--dynamic-routing',
action='store_true',
default=True,
help='The Pod will setup the socket types of the HeadPea and TailPea depending on this argument.'
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)

parser.add_argument(
'--connect-to-predecessor',
action='store_true',
default=False,
help='The head Pea of this Pod will connect to the TailPea of the predecessor Pod.',
)

return parser


Expand Down
15 changes: 15 additions & 0 deletions jina/parsers/peapods/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,18 @@ def mixin_base_pod_parser(parser):
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)

parser.add_argument(
'--no-dynamic-routing',
action='store_false',
dest='dynamic_routing',
default=True,
help='The Pod will setup the socket types of the HeadPea and TailPea depending on this argument.',
)

parser.add_argument(
'--connect-to-predecessor',
action='store_true',
default=False,
help='The head Pea of this Pod will connect to the TailPea of the predecessor Pod.',
)
9 changes: 0 additions & 9 deletions jina/parsers/peapods/runtimes/zed.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,6 @@ def mixin_zed_runtime_parser(parser):
else argparse.SUPPRESS,
)

parser.add_argument(
'--dynamic-routing',
action='store_true',
default=True,
help='The Pod will setup the socket types of the HeadPea and TailPea depending on this argument.'
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)

gp.add_argument(
'--dynamic-routing-out',
action='store_true',
Expand Down
7 changes: 7 additions & 0 deletions jina/peapods/pods/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ def name(self) -> str:
"""
return self.args.name

@property
def connect_to_predecessor(self) -> str:
"""True, if the Pod should open a connect socket in the HeadPea to the predecessor Pod.
.. # noqa: DAR201
"""
return self.args.connect_to_predecessor

@property
def head_host(self) -> str:
"""Get the host of the HeadPea of this pod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ services:
ipv4_address: 10.1.0.100
volumes:
- /var/run/docker.sock:/var/run/docker.sock
entrypoint: "jina pod --port-in 45678 --port-out 45679 --parallel 2 --dynamic-routing"
entrypoint: "jina pod --port-in 45678 --port-out 45679 --parallel 2"
networks:
test:
driver: bridge
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ services:
ipv4_address: 10.1.0.100
volumes:
- /var/run/docker.sock:/var/run/docker.sock
entrypoint: "jina pod --port-in 45678 --port-out 45679 --dynamic-routing"
entrypoint: "jina pod --port-in 45678 --port-out 45679"
networks:
test:
driver: bridge
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def external_pod_args():
del args['external']
del args['pod_role']
del args['host']
del args['dynamic_routing']
return args


Expand Down
7 changes: 6 additions & 1 deletion tests/integration/external_pod/test_external_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def test_flow_with_external_pod(
del external_args['name']
del external_args['external']
del external_args['pod_role']
del external_args['dynamic_routing']
flow = Flow().add(
**external_args,
name='external_fake',
Expand Down Expand Up @@ -132,6 +133,7 @@ def test_two_flow_with_shared_external_pod(
del external_args['name']
del external_args['external']
del external_args['pod_role']
del external_args['dynamic_routing']
flow1 = Flow().add(
**external_args,
name='external_fake',
Expand Down Expand Up @@ -253,10 +255,11 @@ def test_flow_with_external_pod_parallel(
del external_args_1['name']
del external_args_1['external']
del external_args_1['pod_role']
del external_args_1['dynamic_routing']
del external_args_2['name']
del external_args_2['external']
del external_args_2['pod_role']

del external_args_2['dynamic_routing']
flow = (
Flow()
.add(name='pod1')
Expand Down Expand Up @@ -318,6 +321,7 @@ def test_flow_with_external_pod_pre_parallel(
del external_args['name']
del external_args['external']
del external_args['pod_role']
del external_args['dynamic_routing']
flow = (
Flow()
.add(
Expand Down Expand Up @@ -380,6 +384,7 @@ def test_flow_with_external_pod_join(
del external_args['name']
del external_args['external']
del external_args['pod_role']
del external_args['dynamic_routing']
flow = (
Flow()
.add(
Expand Down
10 changes: 10 additions & 0 deletions tests/unit/flow/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,3 +817,13 @@ def validate_routes(x):
)

parallel_flow.index(inputs=Document(), on_done=validate_routes)


def test_connect_to_predecessor():
f = Flow().add(name='pod1').add(name='pod2', connect_to_predecessor=True)

f.build()

assert len(f._pod_nodes['gateway'].head_args.hosts_in_connect) == 0
assert len(f._pod_nodes['pod1'].head_args.hosts_in_connect) == 0
assert len(f._pod_nodes['pod2'].head_args.hosts_in_connect) == 1
0