Skip to content
Docs

Dapr Workflow Provider

The Dapr provider implements the workflow.WorkflowStore interface using Dapr’s state management API. It persists workflow state as JSON through Dapr’s pluggable state store abstraction, enabling deployment across any Dapr-supported backend (Redis, PostgreSQL, CosmosDB, DynamoDB, and others) without code changes.

Choose Dapr when you are already running a Dapr sidecar and want to reuse your existing state store configuration for workflow persistence. Dapr’s pluggable architecture means you can switch between Redis, PostgreSQL, CosmosDB, or DynamoDB without code changes. For full workflow orchestration with replay and signals, consider Temporal. For lightweight key-value persistence, consider NATS.

Terminal window
go get github.com/lookatitude/beluga-ai/workflow/providers/dapr

Ensure a Dapr sidecar is running with a configured state store component.

package main
import (
"context"
"fmt"
"log"
"github.com/lookatitude/beluga-ai/workflow"
"github.com/lookatitude/beluga-ai/workflow/providers/dapr"
)
func main() {
store, err := dapr.New(dapr.Config{
Client: daprClient, // your Dapr StateClient implementation
StoreName: "statestore",
})
if err != nil {
log.Fatal(err)
}
executor := workflow.NewExecutor(
workflow.WithStore(store),
)
ctx := context.Background()
handle, err := executor.Execute(ctx, myWorkflow, workflow.WorkflowOptions{
ID: "order-789",
Input: "process order",
})
if err != nil {
log.Fatal(err)
}
result, err := handle.Result(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println("Result:", result)
}
func myWorkflow(ctx workflow.WorkflowContext, input any) (any, error) {
return fmt.Sprintf("completed: %v", input), nil
}
ParameterTypeDefaultDescription
Clientdapr.StateClient(required)Dapr state client for state operations
StoreNamestring"statestore"Dapr state store component name

The provider defines a StateClient interface matching the subset of the Dapr SDK used for state operations:

type StateClient interface {
SaveState(ctx context.Context, storeName, key string, data []byte, meta map[string]string, so ...any) error
GetState(ctx context.Context, storeName, key string, meta map[string]string) (*StateItem, error)
DeleteState(ctx context.Context, storeName, key string, meta map[string]string) error
}

The StateItem type returned by GetState:

type StateItem struct {
Key string
Value []byte
Etag string
}

Use New for validated construction with a Config struct:

store, err := dapr.New(dapr.Config{
Client: daprClient,
StoreName: "my-state-store",
})
if err != nil {
log.Fatal(err)
}

Use NewWithClient for testing with mock implementations:

store := dapr.NewWithClient(mockClient, "statestore")
func (s *Store) Save(ctx context.Context, state workflow.WorkflowState) error

Serializes the workflow state to JSON and persists it via SaveState. The workflow ID serves as the state key. Returns an error if WorkflowID is empty or if the Dapr call fails.

func (s *Store) Load(ctx context.Context, workflowID string) (*workflow.WorkflowState, error)

Retrieves and deserializes workflow state by ID. Returns nil, nil if the key does not exist (empty value from Dapr).

func (s *Store) List(ctx context.Context, filter workflow.WorkflowFilter) ([]workflow.WorkflowState, error)

Returns workflows matching the filter. The store maintains an in-memory index of known workflow IDs and loads each from Dapr individually. Supports filtering by Status and limiting results with Limit.

func (s *Store) Delete(ctx context.Context, workflowID string) error

Removes the workflow state from the Dapr state store and the in-memory index.

The Dapr state management API does not natively support listing all keys. The provider maintains an in-memory index of workflow IDs that have been saved through this store instance. This means:

  • Workflows saved through a different instance or directly via the Dapr API are not discoverable through List.
  • The index is rebuilt from scratch on process restart.
  • For comprehensive listing, consider using a provider with native key enumeration (such as NATS or Kafka).

All Dapr API errors are wrapped with a descriptive prefix for traceability:

state, err := store.Load(ctx, "wf-123")
if err != nil {
// Error format: "dapr/load: <underlying error>"
log.Printf("Failed to load workflow: %v", err)
}

Any Dapr state store component works with this provider. Common choices include:

Dapr ComponentBackend
state.redisRedis
state.postgresqlPostgreSQL
state.cosmosdbAzure Cosmos DB
state.dynamodbAWS DynamoDB
state.mongodbMongoDB
state.in-memoryIn-process (development)

Configure the state store component in your Dapr configuration (typically components/statestore.yaml).