8000 feat: input filters for executors by JohannesMessner · Pull Request #4472 · jina-ai/serve · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: input filters for executors #4472

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 61 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
5c3e772
feat: add switch-like behavior
Feb 18, 2022
c436a47
style: fix overload and cli autocomplete
jina-bot Feb 18, 2022
26d8a2f
test: add tests of topology graph
Feb 21, 2022
c57356e
Merge branch 'master' of https://github.com/jina-ai/jina into switch-…
Feb 22, 2022
3c3ab4f
test: add basic tests of conditions in graph
Feb 22, 2022
5db6fb6
Merge branch 'master' of https://github.com/jina-ai/jina into switch-…
Feb 23, 2022
1c2002e
feat: add condition arg to deployment
Feb 23, 2022
79758da
style: fix overload and cli autocomplete
jina-bot Feb 23, 2022
595d105
style: fix overload and cli autocomplete
jina-bot Feb 23, 2022
0c22ed6
feat: condition is singular
Feb 23, 2022
21ba842
Merge branch 'switch-condition' of https://github.com/jina-ai/jina in…
Feb 23, 2022
2785ea1
style: fix overload and cli autocomplete
jina-bot Feb 23, 2022
d6859ff
refactor: change conditions to graph conditions
Feb 23, 2022
108550e
style: fix overload and cli autocomplete
jina-bot Feb 23, 2022
7722da9
refactor: refactor the gateway naming
Feb 23, 2022
8dedc48
refactor: dict not well handled by parser
Feb 23, 2022
2a4788e
test: add integration test for feature
Feb 23, 2022
60bb7d3
docs: add docs for feature
Feb 23, 2022
a7a90b4
feat: implement filter in reducing
Feb 23, 2022
bf5fc69
fix: use docarrays query language for filtering
JohannesMessner Mar 7, 2022
08e3dc4
feat: implement sorting logic for docs returned after filtering
JohannesMessner Mar 10, 2022
06306dd
test: add test for sorting response docs
JohannesMessner Mar 10, 2022
09abd5b
refactor: remove wrapper around filter condition
JohannesMessner Mar 10, 2022
8fe9e38
test: rename test function
JohannesMessner Mar 10, 2022
a6508f5
docs: add description for condition
JohannesMessner Mar 10, 2022
74dd9d6
docs: first draft of switch how-to
JohannesMessner Mar 11, 2022
72e9bda
docs: add howto to toctree
JohannesMessner Mar 11, 2022
f04eef8
chore: fix merge conflict
JohannesMessner Mar 11, 2022
0cf5f2e
style: fix overload and cli autocomplete
jina-bot Mar 11, 2022
f271751
docs: add link to how-to
JohannesMessner Mar 11, 2022
715ba7b
Merge remote-tracking branch 'origin/switch-condition' into switch-co…
JohannesMessner Mar 11, 2022
b5c54cf
docs: fix link
JohannesMessner Mar 11, 2022
798f681
docs: fix flow image
JohannesMessner Mar 11, 2022
289f8e8
docs: improve explanation
JohannesMessner Mar 11, 2022
721664f
docs: add line emphasis to code blocks
JohannesMessner Mar 11, 2022
4318ff3
test: adapt tests to filter query language
JohannesMessner Mar 11, 2022
2f58ff5
test: remove test that checks that requests with empty docarray are n…
JohannesMessner Mar 11, 2022
75ee441
docs: fix codeblock line highlighting
JohannesMessner Mar 11, 2022
491f05e
docs: add heading
JohannesMessner Mar 11, 2022
b905930
docs: make prettier
JohannesMessner Mar 11, 2022
d979fb4
fix(gateway): handle sorting of newly added / unknown response docs
JohannesMessner Mar 11, 2022
7dfd14d
docs: add flow topolgy diagram
JohannesMessner Mar 11, 2022
36a9cf8
docs: make wording more precise
JohannesMessner Mar 14, 2022
307a7fc
perf(gateway): avoid deserialising docs if no filter conditions are p…
JohannesMessner Mar 14, 2022
53dd99f
docs: update how-to with new exists syntax
JohannesMessner Mar 14, 2022
63d219f
docs: make wording more precise
JohannesMessner Mar 15, 2022
fa76486
docs: fix rendering
JohannesMessner Mar 15, 2022
c2da312
docs: make added filters explicit
JohannesMessner Mar 15, 2022
6ff1d80
refactor: bring env vars in recommended form/syntax
JohannesMessner Mar 15, 2022
7ed23c2
refactor: fix typo in class name
JohannesMessner Mar 15, 2022
9a1642f
docs: add links to docarray documentation
JohannesMessner Mar 15, 2022
8e6a39b
chore: bump docarray version for filter condition compatability
JohannesMessner Mar 15, 2022
b7cad95
refactor: add link to help string
JohannesMessner Mar 15, 2022
091ea1f
refactor: rename condition to input_condition
JohannesMessner Mar 15, 2022
a17462d
style: fix overload and cli autocomplete
jina-bot Mar 15, 2022
6c28a4f
docs: specify minimum jina version for filter feature
JohannesMessner Mar 15, 2022
f70d718
Merge remote-tracking branch 'origin/switch-condition' into switch-co…
JohannesMessner Mar 15, 2022
4dbc8bf
docs: remove line about docs being sent to every executor
JohannesMessner Mar 15, 2022
c2f34da
chore: add pydantic to script requirements
JohannesMessner Mar 15, 2022
aa13dde
Merge branch 'master' into switch-condition
JohannesMessner Mar 15, 2022
6cd0acf
Merge branch 'master' into switch-condition
Mar 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cli/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
'--proxy',
'--port-expose',
'--graph-description',
'--graph-conditions',
'--deployments-addresses',
'--runtime-backend',
'--runtime',
Expand Down Expand Up @@ -303,6 +304,7 @@
'--disable-reduce',
'--uses-before',
'--uses-after',
'--input-condition',
'--external',
'--deployment-role',
],
Expand Down
1 change: 1 addition & 0 deletions docs/fundamentals/executor/hub/index.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
(jina-hub)=
# Share Executors via Jina Hub

Now that you understand that Executor is a building block in Jina, the following questions may arise:
Expand Down
172 changes: 169 additions & 3 deletions docs/fundamentals/flow/create-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ Flow().add(host='123.45.67.89', port=12345, external=True)
```
This is adding an external Executor to the Flow. The Flow will not start or stop this Executor and assumes that is externally managed and available at `123.45.67.89:12345`


(flow-complex-topologies)=
## Complex Flow topologies
Flows are not restricted to sequential execution. Internally they are modelled as graphs and as such can represent any complex, non-cyclic topology.
A typical use case for such a Flow is a topology with a common pre-processing part, but different indexers separating embeddings and data.
Expand Down Expand Up @@ -479,9 +479,175 @@ This will get you the following output:
['foo was here and got 0 document', 'bar was here and got 1 document', 'baz was here and got 1 document']
```

So both `BarExecutor` and `BazExecutor` received only received a single `Document` from `FooExecutor` as they are run in parallel. The last Executor `executor3` will receive both DocumentArrays and merges them automatically.
So both `BarExecutor` and `BazExecutor` only received a single `Document` from `FooExecutor` as they are run in parallel. The last Executor `executor3` will receive both DocumentArrays and merges them automatically.
The automated merging can be disabled by setting `disable_reduce=True`. This can be useful when you need to provide your custom merge logic in a separate Executor. In this case the last `.add()` call would like `.add(needs=['barExecutor', 'bazExecutor'], uses=CustomMergeExecutor, disable_reduce=True)`. This feature requires Jina >= 3.0.2.

(flow-filter)=
### Add filter conditions to Executors

Starting from `Jina 3.2`, you can filter the input to each
Executor.

To define a filter condition, you can use [DocArrays rich query language](https://docarray.jina.ai/fundamentals/documentarray/find/#query-by-conditions).
You can set a filter for each individual Executor, and every Document that does not satisfy the filter condition will be
removed before reaching that Executor.

To add a filter condition to an Executor, you pass it to the `input_condition` parameter of `flow.add()`:

````{tab} Python

```{code-block} python
---
emphasize-lines: 4, 9
---
from docarray import DocumentArray, Document
from jina import Flow

f = Flow().add().add(input_condition={'tags__key': {'$eq': 5}}) # Create the empty Flow, add condition

with f: # Using it as a Context Manager will start the Flow
ret = f.post(
on='/search',
inputs=DocumentArray([Document(tags={'key': 5}), Document(tags={'key': 4})]),
)

print(
ret[:, 'tags']
) # only the Document fullfilling the condition is processed and therefore returned.
```

```console
[{'key': 5.0}]
```

````

````{tab} Load from YAML
`flow.yml`:

```yaml
jtype: Flow
executors:
- name: executor
input_condition:
tags__key:
$eq: 5
```

```{code-block} python
---
emphasize-lines: 9
---
from docarray import DocumentArray, Document
from jina import Flow

f = Flow.load_config('flow.yml') # Load the Flow definition from Yaml file

with f: # Using it as a Context Manager will start the Flow
ret = f.post(
on='/search',
inputs=DocumentArray([Document(tags={'key': 5}), Document(tags={'key': 4})]),
)

print(
ret[:, 'tags']
) # only the Document fullfilling the condition is processed and therefore returned.
```

```console
[{'key': 5.0}]
```
````

Note that whenever a Document does not satisfy the `input_condition` of a filter, the filter removes it *for the entire branch of the Flow*.
This means that every Executor that is located behind a filter is affected by this, not just the specific Executor that defines the condition.
Like with a real-life filter, once something does not pass through it, it will not re-appear behind the filter.

Naturally, parallel branches in a Flow do not affect each other. So if a Document gets filtered out in only one branch, it can
still be used in the other branch, and also after the branches are re-joined together:

````{tab} Parallel Executors

```{code-block} python
---
emphasize-lines: 7, 8, 21
---
from docarray import DocumentArray, Document
from jina import Flow

f = (
Flow()
.add(name='first')
.add(input_condition={'tags__key': {'$eq': 5}}, needs='first', name='exec1')
.add(input_condition={'tags__key': {'$eq': 4}}, needs='first', name='exec2')
.needs_all(name='join')
) # Create Flow with parallel Executors

# exec1
# / \
# Flow topology: Gateway --> first join --> Gateway
# \ /
# exec2

with f:
ret = f.post(
on='/search',
inputs=DocumentArray([Document(tags={'key': 5}), Document(tags={'key': 4})]),
)

print(ret[:, 'tags']) # Each Document satisfies one parallel branch/filter
```

```console
[{'key': 5.0}, {'key': 4.0}]
```

````

The automated merging can be disabled by setting `disable_reduce=True`. This can be useful when you need to provide your custom merge logic in a separate Executor. In this case the last `.add()` call would like `.add(needs=['barExecutor', 'bazExecutor'], uses=CustomMergeExecutor, disable_reduce=True)`. This feature requires Jina >= 3.0.2
````{tab} Sequential Executors
```{code-block} python
---
emphasize-lines: 7, 8, 21
---
from docarray import DocumentArray, Document
from jina import Flow

f = (
Flow()
.add(name='first')
.add(input_condition={'tags__key': {'$eq': 5}}, name='exec1', needs='first')
.add(input_condition={'tags__key': {'$eq': 4}}, needs='exec1', name='exec2)
) # Create Flow with sequential Executors

# Flow topology: Gateway --> first --> exec1 --> exec2 --> Gateway

with f:
ret = f.post(
on='/search',
inputs=DocumentArray([Document(tags={'key': 5}), Document(tags={'key': 4})]),
)

print(ret[:, 'tags']) # No Document satisfies both sequential filters
```

```console
[]
```
````

This feature is useful to prevent some specialized Executors from processing certain Documents.
It can also be used to build *switch-like nodes*, where some Documents pass through one parallel branch of the Flow,
while other Documents pass through a different branch.

Also note that whenever a Document does not satisfy the condition of an Executor, it will not even be sent to that Executor.
Instead, only a lightweight Request without any payload will be transferred.
This means that you can not only use this feature to build complex logic, but also to minimize your networking overhead.

````{admonition} See Also
:class: seealso

For a hands-on example on how to leverage these filter conditions, see {ref}`this how-to <flow-switch>`.
````

### Replicate Executors

Expand Down
1 change: 1 addition & 0 deletions docs/how-to/executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ async-executors
scale-out
gpu-executor
external-executor
flow-switch
debug-executor
```
Loading
0