Workflow Providers — Durable Execution
Beluga AI v2 includes a durable execution engine that manages long-running, fault-tolerant workflows with activity execution, signal handling, retry policies, and event-sourced state persistence. Workflow state providers implement the WorkflowStore interface for persisting execution history, while durable executor providers implement the DurableExecutor interface for executing workflows with production-grade guarantees.
Core Interfaces
Section titled “Core Interfaces”DurableExecutor
Section titled “DurableExecutor”The primary interface for executing and managing workflows:
type DurableExecutor interface { Execute(ctx context.Context, fn WorkflowFunc, opts WorkflowOptions) (WorkflowHandle, error) Signal(ctx context.Context, workflowID string, signal Signal) error Query(ctx context.Context, workflowID string, queryType string) (any, error) Cancel(ctx context.Context, workflowID string) error}WorkflowStore
Section titled “WorkflowStore”The interface for persisting workflow state:
type WorkflowStore interface { Save(ctx context.Context, state WorkflowState) error Load(ctx context.Context, workflowID string) (*WorkflowState, error) List(ctx context.Context, filter WorkflowFilter) ([]WorkflowState, error) Delete(ctx context.Context, workflowID string) error}WorkflowContext
Section titled “WorkflowContext”Extended context for deterministic workflow execution:
type WorkflowContext interface { context.Context
ExecuteActivity(fn ActivityFunc, input any, opts ...ActivityOption) (any, error) ReceiveSignal(name string) <-chan any Sleep(d time.Duration) error}WorkflowHandle
Section titled “WorkflowHandle”Access to running or completed workflow executions:
type WorkflowHandle interface { ID() string RunID() string Status() WorkflowStatus Result(ctx context.Context) (any, error)}Registry Usage
Section titled “Registry Usage”import ( "context" "log"
"github.com/lookatitude/beluga-ai/workflow"
// Register the executor provider via blank import _ "github.com/lookatitude/beluga-ai/workflow/providers/temporal")
func main() { executor, err := workflow.New("temporal", workflow.Config{ Extra: map[string]any{ "client": temporalClient, "task_queue": "my-workflows", }, }) if err != nil { log.Fatal(err) }
handle, err := executor.Execute(ctx, myWorkflow, workflow.WorkflowOptions{ ID: "order-123", }) if err != nil { log.Fatal(err) }
result, err := handle.Result(ctx) if err != nil { log.Fatal(err) }}Available Providers
Section titled “Available Providers”Durable Executors
Section titled “Durable Executors”| Provider | Registry Name | Type | Best For |
|---|---|---|---|
| Default (built-in) | default | In-process goroutine | Development, testing, simple workflows |
| Temporal | temporal | External orchestrator | Production distributed workflows |
Workflow Stores
Section titled “Workflow Stores”| Provider | Type | Durability | Best For |
|---|---|---|---|
| In-Memory | In-process map | None (process-local) | Development, testing |
| Dapr | Dapr state store | Depends on backend | Cloud-native, multi-cloud |
| Inngest | HTTP API | Durable | Event-driven serverless |
| Kafka | Compacted topic | Durable | Event streaming architectures |
| NATS | JetStream KV | Durable | Lightweight distributed |
| Temporal | Temporal visibility | Managed by Temporal | Temporal-native workflows |
Default Executor
Section titled “Default Executor”Beluga provides a built-in DefaultExecutor that runs workflows in goroutines with optional state persistence:
import ( "github.com/lookatitude/beluga-ai/workflow" "github.com/lookatitude/beluga-ai/workflow/providers/inmemory")
store := inmemory.New()
executor := workflow.NewExecutor( workflow.WithStore(store), workflow.WithExecutorHooks(workflow.Hooks{ OnWorkflowStart: func(ctx context.Context, wfID string, input any) { log.Printf("Workflow %s started", wfID) }, OnWorkflowComplete: func(ctx context.Context, wfID string, result any) { log.Printf("Workflow %s completed: %v", wfID, result) }, }),)Workflow Patterns
Section titled “Workflow Patterns”Defining a Workflow
Section titled “Defining a Workflow”func orderWorkflow(ctx workflow.WorkflowContext, input any) (any, error) { orderID := input.(string)
// Execute activities within the workflow context validated, err := ctx.ExecuteActivity(validateOrder, orderID, workflow.WithActivityRetry(workflow.DefaultRetryPolicy()), workflow.WithActivityTimeout(30*time.Second), ) if err != nil { return nil, fmt.Errorf("validate: %w", err) }
// Wait for human approval via signal approvalCh := ctx.ReceiveSignal("approve") select { case approval := <-approvalCh: if approval != "approved" { return nil, fmt.Errorf("order rejected") } case <-ctx.Done(): return nil, ctx.Err() }
// Process the order result, err := ctx.ExecuteActivity(processOrder, validated) if err != nil { return nil, fmt.Errorf("process: %w", err) }
return result, nil}Executing a Workflow
Section titled “Executing a Workflow”handle, err := executor.Execute(ctx, orderWorkflow, workflow.WorkflowOptions{ ID: "order-456", Input: "ORD-456", Timeout: 30 * time.Minute,})if err != nil { log.Fatal(err)}
// Send a signal to the running workflowerr = executor.Signal(ctx, "order-456", workflow.Signal{ Name: "approve", Payload: "approved",})if err != nil { log.Fatal(err)}
// Wait for the resultresult, err := handle.Result(ctx)if err != nil { log.Fatal(err)}Activity Helpers
Section titled “Activity Helpers”Beluga provides factory functions for common activity patterns:
// LLM-powered activityllmAct := workflow.LLMActivity(func(ctx context.Context, prompt string) (string, error) { return model.Generate(ctx, prompt)})
// Tool execution activitytoolAct := workflow.ToolActivity(func(ctx context.Context, name string, args map[string]any) (any, error) { return registry.Execute(ctx, name, args)})
// Human-in-the-loop activityhumanAct := workflow.HumanActivity(hitlManager)Retry Policies
Section titled “Retry Policies”Activities support configurable retry policies with exponential backoff and jitter:
policy := workflow.RetryPolicy{ MaxAttempts: 5, InitialInterval: 200 * time.Millisecond, BackoffCoefficient: 2.0, MaxInterval: 30 * time.Second,}
result, err := ctx.ExecuteActivity(riskyActivity, input, workflow.WithActivityRetry(policy),)The default retry policy uses 3 attempts with 100ms initial interval and 2x backoff.
Hooks observe workflow lifecycle events without modifying execution:
hooks := workflow.Hooks{ OnWorkflowStart: func(ctx context.Context, wfID string, input any) { ... }, OnWorkflowComplete: func(ctx context.Context, wfID string, result any) { ... }, OnWorkflowFail: func(ctx context.Context, wfID string, err error) { ... }, OnActivityStart: func(ctx context.Context, wfID string, input any) { ... }, OnActivityComplete: func(ctx context.Context, wfID string, result any) { ... }, OnSignal: func(ctx context.Context, wfID string, signal workflow.Signal) { ... }, OnRetry: func(ctx context.Context, wfID string, err error) { ... },}
// Compose multiple hook setscombined := workflow.ComposeHooks(loggingHooks, metricsHooks)Middleware
Section titled “Middleware”Middleware wraps a DurableExecutor to add cross-cutting behavior:
type Middleware func(DurableExecutor) DurableExecutor
// Apply middleware (first in list is outermost)executor = workflow.ApplyMiddleware(executor, loggingMW, metricsMW)
// Use the WithHooks middlewareexecutor = workflow.ApplyMiddleware(executor, workflow.WithHooks(hooks),)Provider Discovery
Section titled “Provider Discovery”List all registered executor providers at runtime:
names := workflow.List()// Returns sorted list: ["default", "temporal"]Choosing a Provider
Section titled “Choosing a Provider”| Use Case | Recommended |
|---|---|
| Development and testing | Default executor + In-Memory store |
| Production distributed workflows | Temporal executor |
| Cloud-native microservices | Default executor + Dapr store |
| Event-driven serverless | Default executor + Inngest store |
| Event streaming architectures | Default executor + Kafka store |
| Lightweight distributed systems | Default executor + NATS store |