After covering Dapr workflow basics in the previous article, let’s take a look at the different application patterns that can be used with Dapr workflow and .NET. The patterns covered in this post are: chaining, fan-out/fan-in, monitor, and external system interaction.
Knowing which workflow patterns are available and when to use them is essential for using any workflow engine properly. Dapr workflows are stateful due to the underlying event-sourcing implementation. Workflows need to be written in a deterministic way because the workflow will replay several times during the execution. Some familiar programming constructs such as while loops, waiting/sleeping, and running parallel tasks are therefore required to be written in a workflow-friendly way, which this post covers in detail.
Prerequisites
All code samples shown in this post are taken from the dapr-workflow-demos GitHub repo. You can fork/clone the repository and run all the workflow samples locally. The following is required to run the samples:
- .NET 7 SDK
- Docker Desktop
- Dapr CLI (v1.11 or higher)
- A REST client, such as cURL, or the VSCode REST client extension.
For simplicity, all workflow samples are using the same workflow activity, named CreateGreetingActivity
. This activity will return a random greeting from an array of greetings and combines it with the input.
using Dapr.Workflow;
namespace BasicWorkflowSamples
{
public class CreateGreetingActivity : WorkflowActivity<string, string>
{
public override Task<string> RunAsync(WorkflowActivityContext context, string name)
{
var greetings = new []{"Hello", "Hi", "Hey", "Hola", "Bonjour", "Ciao", "Guten Tag", "Konnichiwa"};
var selectedGreeting = greetings[new Random().Next(0, greetings.Length)];
var message = $"{selectedGreeting} {name}";
return Task.FromResult(message);
}
}
}
Chaining pattern
The purpose of the chaining pattern is to enforce a specific order of activity executions. The output of an activity is usually required as an input for the next activity. Or there is a strict sequence of interactions with external systems that are called from the activities.
A real life use case where chaining is essential is an order process. First the inventory is checked, then the payment starts, and finally the shipping is initiated.
ChainingWorkflow sample
The ChainingWorkflow
sample in the GitHub repo contains a sequence of three activity calls. In this workflow, a random greeting will be added to the input string. Each time the CreateGreetingActivity
is called, the output of the activity is used as the input for the next activity.
using Dapr.Workflow;
namespace BasicWorkflowSamples
{
public class ChainingWorkflow : Workflow<string, string>
{
public override async Task<string> RunAsync(WorkflowContext context, string input)
{
var message1 = await context.CallActivityAsync<string>(
nameof(CreateGreetingActivity),
input);
var message2 = await context.CallActivityAsync<string>(
nameof(CreateGreetingActivity),
message1);
var message3 = await context.CallActivityAsync<string>(
nameof(CreateGreetingActivity),
message2);
return message3;
}
}
}
Note that each CallActivityAsync
method is awaited. This means the workflow will wait until that activity is completed before continuing with the next call. Every time an activity is scheduled for execution, the workflow becomes idle, and it restarts when the activity is done.
Running the ChainingWorkflow sample
1. Open the BasicWorkflowSamples
folder in a terminal and build the project:
cd BasicWorkflowSamples
dotnet build
2. Start the workflow application via the Dapr CLI:
dapr run --app-id basic-workflows --app-port 5065 --dapr-http-port 3500 --resources-path ./ResourcesLocal dotnet run
3. Start the ChainingWorkflow
via the Workflow HTTP API using cURL, or use the basicworkflows.http file if you're using VSCode with the REST client:
curl -i -X POST http://localhost:3500/v1.0-alpha1/workflows/dapr/ChainingWorkflow/start?instanceID=1234b \
-H "Content-Type: application/text/plain" \
-d '"World"'
Note that 1234b in the URL is the workflow instance ID. This can be any string you want.
Expected result:
{
"instanceID": "1234b"
}
4. Check the workflow status via Workflow HTTP API:
curl -i -X GET http://localhost:3500/v1.0-alpha1/workflows/dapr/1234b
Expected result:
{
"instanceID": "1234b",
"workflowName": "ChainingWorkflow",
"createdAt": "2023-06-19T13:21:08.611149200Z",
"lastUpdatedAt": "2023-06-19T13:21:08.647482Z",
"runtimeStatus": "COMPLETED",
"properties": {
"dapr.workflow.custom_status": "",
"dapr.workflow.input": "\"World\"",
"dapr.workflow.output": "\"Hello Hi Konnichiwa World\""
}
}
Fan-out/Fan-in
The purpose of the fan-out/fan-in pattern is to execute multiple activities in parallel (fan-out) and combine the result of these activities when all of them have completed (fan-in). The activities are independent of each other, and there’s no requirement of ordered execution.
A use case where fan-out/fan-in is useful is processing arrays of data, like order items. A workflow is started where the input argument is an array of order items, the workflow iterates over the array and processes each order item individually. The workflow ends with an aggregate, such as the summation of the total price across all order items.
FanOutFanInWorkflow sample
The FanOutFanInWorkflow
sample in the GitHub repo contains a fan-out/fan-in pattern where for each name in the input, an activity call to execute a CreateGreetingActivity
is created, but not immediately scheduled and executed. The result of the workflow is an array of three individual greetings.
using Dapr.Workflow;
namespace BasicWorkflowSamples
{
public class FanOutFanInWorkflow : Workflow<string[], string[]>
{
public override async Task<string[]> RunAsync(WorkflowContext context, string[] input)
{
var tasks = new List<Task<string>>();
foreach (var name in input)
{
tasks.Add(context.CallActivityAsync<string>(
nameof(CreateGreetingActivity),
name));
}
var messages = await Task.WhenAll(tasks);
return messages;
}
}
}
Note that in this sample, the activity calls are not directly awaited, as is the case with the chaining pattern. In this case, a list of Task<string>
is created, where string
is the output type of CreateGreetingActivity
, and the CallActivityAsync
tasks are added to this list. After the foreach loop, all tasks are scheduled and awaited with this line of code:
var messages = await Task.WhenAll(tasks);
This means that the workflow will wait until all activity tasks have been completed. The result of all the individual tasks are combined and is assigned to the messages variable. Since the output type of the activity is string
, the combined result (messages
) is an array of type string
.
Waiting for the first activity
A variation of the fan-out/fan-in pattern is to run the workflow until the first of the activity calls in the list is completed. An example of this is running competing tasks for the same input. A real-life use case is executing a text search and testing various search algorithms. The workflow schedules activities that use different search algorithms. The input for the activities are all the same; a search term and a body of text to search through. The activity that implements the most efficient search algorithm will complete first, and the workflow continues.
In this variation, Task.WhenAny
is used where the result is a single value from the activity that completed first.
var message = await Task.WhenAny(tasks);
Running the FanOutFanInWorkflow sample
1. Ensure that the BasicWorkflowSamples app is still running, if not change to the BasicWorkflowSamples directory, build the app, and run the app using the Dapr CLI:
cd BasicWorkflowSamples
dotnet build
dapr run --app-id basic-workflows --app-port 5065 --dapr-http-port 3500 --resources-path ./ResourcesLocal dotnet run
2. Start the FanOutFanInWorkflow
via the Workflow HTTP API using cURL, or use the basicworkflows.http file if you're using VSCode with the REST client:
curl -i -X POST http://localhost:3500/v1.0-alpha1/workflows/dapr/FanOutFanInWorkflow/start?instanceID=1234c \
-H "Content-Type: application/json" \
-d '["Amsterdam", "Chicago", "New York"]'
Note that 1234c in the URL is the workflow instance ID. This can be any string you want.
{
"instanceID": "1234c"
}
3. Check the workflow status via Workflow HTTP API:
curl -i -X GET http://localhost:3500/v1.0-alpha1/workflows/dapr/1234c
Expected result:
{
"instanceID": "1234c",
"workflowName": "FanOutFanInWorkflow",
"createdAt": "2023-06-19T13:22:42.056622400Z",
"lastUpdatedAt": "2023-06-19T13:22:42.093666600Z",
"runtimeStatus": "COMPLETED",
"properties": {
"dapr.workflow.custom_status": "",
"dapr.workflow.input": "[\"Amsterdam\",\"Chicago\",\"New York\"]",
"dapr.workflow.output": "[\"Hi Amsterdam\",\"Hola Chicago\",\"Guten Tag New York\"]"
}
}
Monitor
The purpose of the monitor pattern is to periodically execute one or more activities in a loop at a specified frequency, for example once every hour or daily at midnight. Occasionally, a condition is used that determines if the workflow is restarted or completed.
A real-life use case of the monitor pattern is cleaning up cloud resources at the end of every day.
MonitorWorkflow sample
The MonitorWorkflow
sample in the GitHub repo contains a workflow with a numeric input (counter), it calls the CreateGreetingActivity
, then checks if the counter is less than 10. If that is true, the counter is incremented, the workflow waits for one second and restarts (with an updated counter). The workflow is restarted until the counter reaches 10.
using Dapr.Workflow;
namespace BasicWorkflowSamples
{
public class MonitorWorkflow : Workflow<int, string>
{
public override async Task<string> RunAsync(WorkflowContext context, int counter)
{
var message = await context.CallActivityAsync<string>(
nameof(CreateGreetingActivity),
counter.ToString());
if (counter < 10 )
{
counter += 1;
await context.CreateTimer(TimeSpan.FromSeconds(1));
context.ContinueAsNew(counter);
}
return message;
}
}
}
Note that restarting the workflow is done by calling the ContinueAsNew
method. This instructs the workflow engine to restart the workflow as a new instance without the previous (replay) data. The updated counter
value is passed in as the input argument of the workflow.
Running the MonitorWorkflow sample
1. Ensure that the BasicWorkflowSamples app is still running, if not change to the BasicWorkflowSamples directory, build the app, and run the app using the Dapr CLI:
cd BasicWorkflowSamples
dotnet build
dapr run --app-id basic-workflows --app-port 5065 --dapr-http-port 3500 --resources-path ./ResourcesLocal dotnet run
2. Start the MonitorWorkflow
via the Workflow HTTP API using cURL, or use the basicworkflows.http file if you're using VSCode with the REST client:
curl -i -X POST http://localhost:3500/v1.0-alpha1/workflows/dapr/MonitorWorkflow/start?instanceID=1234d \
-H "Content-Type: application/json" \
-d '0'
Note that 1234c in the URL is the workflow instance ID. This can be any string you want.
Expected result:
{
"instanceID": "1234d"
}
3. Check the workflow status via Workflow HTTP API:
curl -i -X GET http://localhost:3500/v1.0-alpha1/workflows/dapr/1234d
Expected result:
{
"instanceID": "1234d",
"workflowName": "MonitorWorkflow",
"createdAt": "2023-06-19T13:24:23.004744700Z",
"lastUpdatedAt": "2023-06-19T13:24:23.016307900Z",
"runtimeStatus": "COMPLETED",
"properties": {
"dapr.workflow.custom_status": "",
"dapr.workflow.input": "10",
"dapr.workflow.output": "\"Hey 10\""
}
}
External system interaction
The purpose of the external system interaction pattern is to wait on the execution of a part of the workflow until an external system raises an event that is received by the workflow.
A real-life use case of the external system interaction pattern is an approval workflow where a manager needs to approve a purchase by one of their employees. The employee initiates the workflow by making a purchase, a message is sent to the manager and the workflow will wait until it has received the event that the manager sends.
ExternalInteractionWorkflow sample
The ExternalInteractionWorkflow
sample in the GitHub repo contains a workflow that will wait as soon as the workflow starts. The WaitForExternalEventAsync<T>
method is used to instruct the workflow engine to wait with further execution until an event named “approval-event”
has been received or the timeout of 20 seconds has expired. The ApprovalEvent
is used as the generic output type parameter. This is the type the workflow expects to receive. It can be any user-defined serializable object.
Once the event has been received or the timeout expired, the IsApproved
property of the event is checked to determine if the CreateGreetingActivity
will be scheduled.
using Dapr.Workflow;
namespace BasicWorkflowSamples
{
public class ExternalInteractionWorkflow : Workflow<string, string>
{
public override async Task<string> RunAsync(WorkflowContext context, string input)
{
var message = string.Empty;
try
{
var timeOut = TimeSpan.FromSeconds(20);
var approvalEvent = await context.WaitForExternalEventAsync<ApprovalEvent>(
"approval-event",
timeOut);
if (approvalEvent.IsApproved)
{
message = await context.CallActivityAsync<string>(
nameof(CreateGreetingActivity),
input);
}
}
catch (TaskCanceledException)
{
context.SetCustomStatus("Wait for external event is cancelled due to timeout.");
}
return message;
}
}
public record ApprovalEvent(bool IsApproved);
}
Although the timeout parameter is optional, it is considered a best practice to provide one. Otherwise, the workflow will be waiting indefinitely. The timeout in the sample is short for demonstration purposes, but a timeout in a real-life use case can be hours, days or longer.
If a try/catch
block is not used and the timeout expires, the workflow will terminate with a FAILED
status because of the TaskCanceledException
.
Running the ExternalInteractionWorkflow sample
1. Ensure that the BasicWorkflowSamples
app is still running, if not change to the BasicWorkflowSamples
directory, build the app, and run the app using the Dapr CLI:
cd BasicWorkflowSamples
dotnet build
dapr run --app-id basic-workflows --app-port 5065 --dapr-http-port 3500 --resources-path ./ResourcesLocal dotnet run
2. Start the ExternalInteractionWorkflow
via the Workflow HTTP API using cURL, or use the basicworkflows.http file if you're using VSCode with the REST client:
curl -i -X POST http://localhost:3500/v1.0-alpha1/workflows/dapr/ExternalInteractionWorkflow/start?instanceID=1234f \
-H "Content-Type: application/text/plain" \
-d '"World"'
Note that 1234f in the URL is the workflow instance ID. This can be any string you want.
Expected result:
{
"instanceID": "1234f"
}
3. Check the workflow status via Workflow HTTP API before the timeout expires (make the request within 20 seconds):
curl -i -X GET http://localhost:3500/v1.0-alpha1/workflows/dapr/1234f
The runtimeStatus
should be RUNNING
as the workflow is waiting for an event or the timeout:
{
"instanceID": "<WORKFLOW_ID>",
"workflowName": "ExternalInteractionWorkflow",
"createdAt": "2023-07-27T11:35:54.446941200Z",
"lastUpdatedAt": "2023-07-27T11:35:55.694310900Z",
"runtimeStatus": "RUNNING",
"properties": {
"dapr.workflow.custom_status": "",
"dapr.workflow.input": "\"World\""
}
}
4. Let the timeout expire (wait 20 seconds) and check the workflow status again via Workflow HTTP API:
curl -i -X GET http://localhost:3500/v1.0-alpha1/workflows/dapr/1234f
The runtimeStatus
will be COMPLETED
, with a custom_status
that the wait for external event is cancelled due to timeout:
{
"instanceID": "2283569d-5d56-4041-bd63-7df35fa3c879",
"workflowName": "ExternalInteractionWorkflow",
"createdAt": "2023-07-27T14:29:37.084711800Z",
"lastUpdatedAt": "2023-07-27T14:29:57.011878800Z",
"runtimeStatus": "COMPLETED",
"properties": {
"dapr.workflow.custom_status": "\"Wait for external event is cancelled due to timeout.\"",
"dapr.workflow.input": "\"World\"",
"dapr.workflow.output": "\"\""
}
}
5. Now start the workflow again, raise an event within the timeout duration (20 seconds), and retrieve the status of the workflow.
curl -i -X POST http://localhost:3500/v1.0-alpha1/workflows/dapr/ExternalInteractionWorkflow/start?instanceID=1234f \
-H "Content-Type: application/text/plain" \
-d '"World"'
6. Raising an event is done by calling the raiseEvent
endpoint of the workflow management API. Note that the workflow ID is also part of the URL. The payload is a JSON object with one property: "IsApproved":true
.
curl -i -X POST http://localhost:3500/v1.0-alpha1/workflows/dapr/1234f/raiseEvent/ \
-H "Content-Type: application/json" \
-d '{"IsApproved":true}'
7. Check the workflow status again via Workflow HTTP API:
curl -i -X GET http://localhost:3500/v1.0-alpha1/workflows/dapr/1234f
The runtimeStatus
should now be COMPLETED
and the output of the workflow should be present:
{
"instanceID": "1234f",
"workflowName": "ExternalInteractionWorkflow",
"createdAt": "2023-07-27T11:50:16.526722900Z",
"lastUpdatedAt": "2023-07-27T11:50:19.172489800Z",
"runtimeStatus": "COMPLETED",
"properties": {
"dapr.workflow.custom_status": "",
"dapr.workflow.input": "\"World\"",
"dapr.workflow.output": "\"Hello World\""
}
}
Next steps
In this post, you’ve seen how to write Dapr workflows in .NET and use patterns such as chaining, fan-out/fan-in, monitor, and external system interaction. With these patterns, you can make just about anything with Dapr workflows. It’s very common to combine the patterns in your workflows to achieve the desired behavior. Take a look at the CheckoutService sample in the dapr-workflow-demos repository for a real-life example.
Do you have any questions or comments about this blog post or the code? Join the Dapr discord and post a message in the #workflow channel. Have you made something with Dapr, and created a blog post or video? Post a message in the #show-and-tell channel, we love to see what you’re doing with Dapr!
Resources
- Dapr Workflow Demos on GitHub: github.com/diagrid-labs/dapr-workflow-demos
- Dapr Workflow Pattern Docs: docs.dapr.io/developing-applications/building-blocks/workflow/workflow-patterns/