8000 Workflow tutorials for Python by marcduiker · Pull Request #1191 · dapr/quickstarts · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Workflow tutorials for Python #1191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion tutorials/workflow/csharp/workflow-management/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ For more information on workflow management, see the [Dapr docs](https://docs.da

## Inspect the code

Open the `Program.cs` file in the `tutorials/workflow/csharp/child-workflows/WorkflowManagement` folder. This file contains the endpoint definitions that use the workflow management API. The workflow that is being managed is named `NeverEndingWorkflow` and is a counter that will keep running once it's started.
Open the `Program.cs` file in the `tutorials/workflow/csharp/workflow-management/WorkflowManagement` folder. This file contains the endpoint definitions that use the workflow management API. The workflow that is being managed is named `NeverEndingWorkflow` and is a counter that will keep running once it's started.

## Run the tutorial

Expand Down
25 changes: 25 additions & 0 deletions tutorials/workflow/python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Using the Dapr Workflow API with Python

This folder contains tutorials of using the Dapr Workflow API with Python. All examples can be run locally on your machine.

Before you start, it's recommended to read though the Dapr docs to get familiar with the many [Workflow features, concepts, and patterns](https://docs.dapr.io/developing-applications/building-blocks/workflow/).

## Prerequisites

- [Docker Desktop](https://www.docker.com/products/docker-desktop/)
- [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/) & [Initialization](https://docs.dapr.io/getting-started/install-dapr-selfhost/)
- [Python 3](https://www.python.org/downloads/)
- Optional: An IDE such as [VSCode](https://code.visualstudio.com/download) with a [REST client](https://marketplace.visualstudio.com/items?itemName=humao.rest-client).

## Tutorials

- [Workflow Basics](./fundamentals/README.md)
- [Task Chaining](./task-chaining/README.md)
- [Fan-out/Fan-in](./fan-out-fan-in/README.md)
- [Monitor](./monitor-pattern/README.md)
- [External Events](./external-system-interaction/README.md)
- [Child Workflows](./child-workflows/README.md)
- [Resiliency & Compensation](./resiliency-and-compensation/README.md)
- [Combined Patterns](./combined-patterns/README.md)
- [WorkflowManagement](./workflow-management/README.md)
- [Challenges & Tips](./challenges-tips/README.md)
112 changes: 112 additions & 0 deletions tutorials/workflow/python/child-workflows/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Child Workflows

This tutorial demonstrates how a workflow can call child workflows that are part of the same application. Child workflows can be used to break up large workflows into smaller, reusable parts. For more information about child workflows see the [Dapr docs](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-features-concepts/#child-workflows).

## Inspect the code

Open the `ParentWorkflow.cs` file in the `tutorials/workflow/python/child-workflows/child_workflows` folder. This file contains the definition for the workflows and activities.

The parent workflow iterates over the input array and schedules an instance of the `child_workflow` for each of the input elements. The `child_workflow` is a basic task-chaining workflow that contains a sequence of two activities. When all of the instances of the `child_workflow` complete, then the `parent_workflow` finishes.

### Parent workflow

```mermaid
graph LR
SW((Start
Workflow))
subgraph for each word in the input
GWL[Call child workflow]
end
ALL[Wait until all tasks
are completed]
EW((End
Workflow))
SW --> GWL
GWL --> ALL
ALL --> EW
```

### Child workflow

```mermaid
graph LR
SW((Start
Workflow))
A1[activity1]
A2[activity2]
EW((End
Workflow))
SW --> A1
A1 --> A2
A2 --> EW
```

## Run the tutorial

1. Use a terminal to navigate to the `tutorials/workflow/python/child-workflows/child_workflows` folder.
2. Install the dependencies using pip:

```bash
pip3 install -r requirements.txt
```

3. Navigate one level back to the `child-workflows` folder and use the Dapr CLI to run the Dapr Multi-App run file

<!-- STEP
name: Run multi app run template
expected_stdout_lines:
- 'Started Dapr with app id "childworkflows"'
expected_stderr_lines:
working_dir: .
output_match_mode: substring
background: true
sleep: 15
timeout_seconds: 30
-->
```bash
dapr run -f .
```
<!-- END_STEP -->

4. Use the POST request in the [`childworkflows.http`](./childworkflows.http) file to start the workflow, or use this cURL command:

```bash
curl -i --request POST \
--url http://localhost:5259/start \
--header 'content-type: application/json' \
--data '["Item 1","Item 2"]'
```

The input of the workflow is an array with two strings:

```json
[
"Item 1",
"Item 2"
]
```

The app logs should show both the items in the input values array being processed by each activity in the child workflow as follows:

```text
== APP - childworkflows == activity1: Received input: Item 1.
== APP - childworkflows == activity2: Received input: Item 1 is processed.
== APP - childworkflows == activity1: Received input: Item 2.
== APP - childworkflows == activity2: Received input: Item 2 is processed.
```

5. Use the GET request in the [`childworkflows.http`](./childworkflows.http) file to get the status of the workflow, or use this cURL command:

```bash
curl --request GET --url http://localhost:3559/v1.0/workflows/dapr/<INSTANCEID>
```

Where `<INSTANCEID>` is the workflow instance ID you received in the `instance_id` property in the previous step.

The expected serialized output of the workflow is an array with two strings:

```txt
"[\"Item 1 is processed as a child workflow.\",\"Item 2 is processed as a child workflow.\"]"
```

6. Stop the Dapr Multi-App run process by pressing `Ctrl+C`.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from fastapi import FastAPI, status
from contextlib import asynccontextmanager
from typing import List
from parent_child_workflow import wf_runtime, parent_workflow
import dapr.ext.workflow as wf
import uvicorn

@asynccontextmanager
async def lifespan(app: FastAPI):
wf_runtime.start()
yield
wf_runtime.shutdown()

app = FastAPI(lifespan=lifespan)

@app.post("/start", status_code=status.HTTP_202_ACCEPTED)
async def start_workflow(items: List[str]):
wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
workflow=parent_workflow,
input=items
)
return {"instance_id": instance_id}

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=5259)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import List
import dapr.ext.workflow as wf

wf_runtime = wf.WorkflowRuntime()

@wf_runtime.workflow(name='parent_workflow')
def parent_workflow(ctx: wf.DaprWorkflowContext, items: List[str]):

child_wf_tasks = [
ctx.call_child_workflow(child_workflow, input=item) for item in items
]
wf_result = yield wf.when_all(child_wf_tasks)

return wf_result

@wf_runtime.workflow(name='child_workflow')
def child_workflow(ctx: wf.DaprWorkflowContext, wf_input: str):
result1 = yield ctx.call_activity(activity1, input=wf_input)
wf_result = yield ctx.call_activity(activity2, input=result1)
return wf_result

@wf_runtime.activity(name='activity1')
def activity1(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
print(f'activity1: Received input: {act_input}.', flush=True)
return f"{act_input} is processed"

@wf_runtime.activity(name='activity2')
def activity2(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
print(f'activity2: Received input: {act_input}.', flush=True)
return f"{act_input} as a child workflow."
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
dapr>=1.15.0
dapr-ext-workflow>=1.15.0
fastapi>=0.115.0
16 changes: 16 additions & 0 deletions tutorials/workflow/python/child-workflows/childworkflows.http
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
@apphost=http://localhost:5259

### Start the ParentWorkflow workflow
# @name startWorkflowRequest
POST {{ apphost }}/start
Content-Type: application/json

[
"Item 1",
"Item 2"
]

### Get the workflow status
@instanceId={{startWorkflowRequest.response.body.instance_id}}
@daprHost=http://localhost:3559
GET {{ daprHost }}/v1.0/workflows/dapr/{{ instanceId }}
11 changes: 11 additions & 0 deletions tutorials/workflow/python/child-workflows/dapr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: 1
common:
resourcesPath: ../../resources
apps:
- appID: childworkflows
appDirPath: child_workflows
appPort: 5259
daprHTTPPort: 3559
command: ["python3", "app.py"]
appLogDestination: console
daprdLogDestination: console
2 changes: 2 additions & 0 deletions tutorials/workflow/python/child-workflows/makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
include ../../../../docker.mk
include ../../../../validate.mk
145 changes: 145 additions & 0 deletions tutorials/workflow/python/combined-patterns/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# Combined Workflow Patterns

This tutorial demonstrates how several workflow patterns can be combined in a single, more realistic, workflow. Some of the workflow activities are using other Dapr APIs, such as state management, service invocation, and Pub/Sub.

## Inspect the code

The demo consist of two applications:

- `workflow_app` is the main application that orchestrates an order process in the `order_workflow`.
- `shipping_app` is a supporting service that is being called by the `order_workflow`.

The `order_workflow` combines task chaining, fan-out/fan-in, and waiting for external event patterns. The workflow contains a number of activities for order processing including checking inventory, register shipment, process payment and more with a final order status being returned with the results of the order. It uses compensating logic in case the shipment fails to get registered and the customer needs to be reimbursed for the payment.

```mermaid
graph LR
SW((Start
Workflow))
EW((End
Workflow))
subgraph order_workflow
direction LR
CHKI[Check inventory]
CHKD[Check shipping
destination]
IF1{Success?}
PAY[Process
payment]
UPD[Update
inventory]
REG[Register
shipment]
WAIT[Wait for
confirmation]
IF2{Success?}
RI[Reimburse
customer]
end
subgraph Shipping
direction LR
REG2[register_shipment]
CHKD2[check_destination]
end
SW --> CHKI
SW --> CHKD <--> CHKD2
CHKI --> IF1
CHKD --> IF1
IF1 --> PAY
PAY --> UPD
UPD --> REG -.->|pub/sub| REG2
REG2 -.->|pub/sub| WAIT
REG --> WAIT
WAIT --> IF2
IF2 -->|Yes| EW
IF2 -->|No| RI
RI --> EW
```

## Run the tutorial

1. Use a terminal to navigate to the `tutorials/workflow/python/combined-patterns` folder.
2. Install the dependencies using pip:

```bash
cd workflow_app
pip3 install -r requirements.txt
cd ..
cd shipping_app
pip3 install -r requirements.txt
cd ..
```

3. Use the Dapr CLI to run the Dapr Multi-App run file. This starts both applications `order-workflow` and `shipping` with the Dapr components in the [resources](./resources) folder.

<!-- STEP
name: Run multi app run template
expected_stdout_lines:
- 'Started Dapr with app id "order-workflow"'
- 'Started Dapr with app id "shipping"'
expected_stderr_lines:
working_dir: .
output_match_mode: substring
background: true
sleep: 15
timeout_seconds: 30
-->
```bash
dapr run -f .
```
<!-- END_STEP -->

4. Use the POST request in the [`order-workflow.http`](./order-workflow.http) file to start the workflow, or use this cURL command:

```bash
curl -i --request POST \
--url http://localhost:5260/start \
--header 'content-type: application/json' \
--data '{"id": "b0d38481-5547-411e-ae7b-255761cce17a","order_item" : {"product_id": "RBD001","product_name": "Rubber Duck","quantity": 10,"total_price": 15.00},"customer_info" : {"id" : "Customer1","country" : "The Netherlands"}}'
```

The input for the workflow is an `Order` object:

```json
{
"id": "{{orderId}}",
"order_item" : {
"product_id": "RBD001",
"product_name": "Rubber Duck",
"quantity": 10,
"total_price": 15.00
},
"customer_info" : {
"id" : "Customer1",
"country" : "The Netherlands"
}
}
```

The app logs should come from both services executing all activities as follows:

```text
== APP - order-workflow == CheckInventory: Received input: OrderItem { ProductId = RBD001, ProductName = Rubber Duck, Quantity = 10, TotalPrice = 15.00 }.
== APP - order-workflow == CheckShippingDestination: Received input: Order { Id = 06d49c54-bf65-427b-90d1-730987e96e61, OrderItem = OrderItem { ProductId = RBD001, ProductName = Rubber Duck, Quantity = 10, TotalPrice = 15.00 }, CustomerInfo = CustomerInfo { Id = Customer1, Country = The Netherlands } }.
== APP - shipping == checkDestination: Received input: Order { Id = 06d49c54-bf65-427b-90d1-730987e96e61, OrderItem = OrderItem { ProductId = RBD001, ProductName = Rubber Duck, Quantity = 10, TotalPrice = 15.00 }, CustomerInfo = CustomerInfo { Id = Customer1, Country = The Netherlands } }.
== APP - order-workflow == ProcessPayment: Received input: Order { Id = 06d49c54-bf65-427b-90d1-730987e96e61, OrderItem = OrderItem { ProductId = RBD001, ProductName = Rubber Duck, Quantity = 10, TotalPrice = 15.00 }, CustomerInfo = CustomerInfo { Id = Customer1, Country = The Netherlands } }.
== APP - order-workflow == UpdateInventory: Received input: OrderItem { ProductId = RBD001, ProductName = Rubber Duck, Quantity = 10, TotalPrice = 15.00 }.
== APP - order-workflow == RegisterShipment: Received input: Order { Id = 06d49c54-bf65-427b-90d1-730987e96e61, OrderItem = OrderItem { ProductId = RBD001, ProductName = Rubber Duck, Quantity = 10, TotalPrice = 15.00 }, CustomerInfo = CustomerInfo { Id = Customer1, Country = The Netherlands } }.
== APP - shipping == registerShipment: Received input: Order { Id = 06d49c54-bf65-427b-90d1-730987e96e61, OrderItem = OrderItem { ProductId = RBD001, ProductName = Rubber Duck, Quantity = 10, TotalPrice = 15.00 }, CustomerInfo = CustomerInfo { Id = Customer1, Country = The Netherlands } }.
== APP - order-workflow == Shipment registered for order ShipmentRegistrationStatus { OrderId = 06d49c54-bf65-427b-90d1-730987e96e61, IsSuccess = True, Message = }
```

5. Use the GET request in the [`order-workflow.http`](./order-workflow.http) file to get the status of the workflow, or use this cURL command:

```bash
curl --request GET --url http://localhost:3560/v1.0/workflows/dapr/06d49c54-bf65-427b-90d1-730987e96e61
```

The expected serialized output of the workflow is:

```txt
{\"is_success\":true,\"message\":\"Order 06d49c54-bf65-427b-90d1-730987e96e61 processed successfully.\"}"
```

*The Order ID is generated when making the request and is different each time.*

6. Stop the Dapr Multi-App run process by pressing `Ctrl+C`.
Loading
Loading
0