Skip to content
Docs

Workflow Providers — Durable Execution

Beluga AI v2 includes a durable execution engine that manages long-running, fault-tolerant workflows with activity execution, signal handling, retry policies, and event-sourced state persistence. Workflow state providers implement the WorkflowStore interface for persisting execution history, while durable executor providers implement the DurableExecutor interface for executing workflows with production-grade guarantees.

The primary interface for executing and managing workflows:

type DurableExecutor interface {
Execute(ctx context.Context, fn WorkflowFunc, opts WorkflowOptions) (WorkflowHandle, error)
Signal(ctx context.Context, workflowID string, signal Signal) error
Query(ctx context.Context, workflowID string, queryType string) (any, error)
Cancel(ctx context.Context, workflowID string) error
}

The interface for persisting workflow state:

type WorkflowStore interface {
Save(ctx context.Context, state WorkflowState) error
Load(ctx context.Context, workflowID string) (*WorkflowState, error)
List(ctx context.Context, filter WorkflowFilter) ([]WorkflowState, error)
Delete(ctx context.Context, workflowID string) error
}

Extended context for deterministic workflow execution:

type WorkflowContext interface {
context.Context
ExecuteActivity(fn ActivityFunc, input any, opts ...ActivityOption) (any, error)
ReceiveSignal(name string) <-chan any
Sleep(d time.Duration) error
}

Access to running or completed workflow executions:

type WorkflowHandle interface {
ID() string
RunID() string
Status() WorkflowStatus
Result(ctx context.Context) (any, error)
}
import (
"context"
"log"
"github.com/lookatitude/beluga-ai/workflow"
// Register the executor provider via blank import
_ "github.com/lookatitude/beluga-ai/workflow/providers/temporal"
)
func main() {
executor, err := workflow.New("temporal", workflow.Config{
Extra: map[string]any{
"client": temporalClient,
"task_queue": "my-workflows",
},
})
if err != nil {
log.Fatal(err)
}
handle, err := executor.Execute(ctx, myWorkflow, workflow.WorkflowOptions{
ID: "order-123",
})
if err != nil {
log.Fatal(err)
}
result, err := handle.Result(ctx)
if err != nil {
log.Fatal(err)
}
}
ProviderRegistry NameTypeBest For
Default (built-in)defaultIn-process goroutineDevelopment, testing, simple workflows
TemporaltemporalExternal orchestratorProduction distributed workflows
ProviderTypeDurabilityBest For
In-MemoryIn-process mapNone (process-local)Development, testing
DaprDapr state storeDepends on backendCloud-native, multi-cloud
InngestHTTP APIDurableEvent-driven serverless
KafkaCompacted topicDurableEvent streaming architectures
NATSJetStream KVDurableLightweight distributed
TemporalTemporal visibilityManaged by TemporalTemporal-native workflows

Beluga provides a built-in DefaultExecutor that runs workflows in goroutines with optional state persistence:

import (
"github.com/lookatitude/beluga-ai/workflow"
"github.com/lookatitude/beluga-ai/workflow/providers/inmemory"
)
store := inmemory.New()
executor := workflow.NewExecutor(
workflow.WithStore(store),
workflow.WithExecutorHooks(workflow.Hooks{
OnWorkflowStart: func(ctx context.Context, wfID string, input any) {
log.Printf("Workflow %s started", wfID)
},
OnWorkflowComplete: func(ctx context.Context, wfID string, result any) {
log.Printf("Workflow %s completed: %v", wfID, result)
},
}),
)
func orderWorkflow(ctx workflow.WorkflowContext, input any) (any, error) {
orderID := input.(string)
// Execute activities within the workflow context
validated, err := ctx.ExecuteActivity(validateOrder, orderID,
workflow.WithActivityRetry(workflow.DefaultRetryPolicy()),
workflow.WithActivityTimeout(30*time.Second),
)
if err != nil {
return nil, fmt.Errorf("validate: %w", err)
}
// Wait for human approval via signal
approvalCh := ctx.ReceiveSignal("approve")
select {
case approval := <-approvalCh:
if approval != "approved" {
return nil, fmt.Errorf("order rejected")
}
case <-ctx.Done():
return nil, ctx.Err()
}
// Process the order
result, err := ctx.ExecuteActivity(processOrder, validated)
if err != nil {
return nil, fmt.Errorf("process: %w", err)
}
return result, nil
}
handle, err := executor.Execute(ctx, orderWorkflow, workflow.WorkflowOptions{
ID: "order-456",
Input: "ORD-456",
Timeout: 30 * time.Minute,
})
if err != nil {
log.Fatal(err)
}
// Send a signal to the running workflow
err = executor.Signal(ctx, "order-456", workflow.Signal{
Name: "approve",
Payload: "approved",
})
if err != nil {
log.Fatal(err)
}
// Wait for the result
result, err := handle.Result(ctx)
if err != nil {
log.Fatal(err)
}

Beluga provides factory functions for common activity patterns:

// LLM-powered activity
llmAct := workflow.LLMActivity(func(ctx context.Context, prompt string) (string, error) {
return model.Generate(ctx, prompt)
})
// Tool execution activity
toolAct := workflow.ToolActivity(func(ctx context.Context, name string, args map[string]any) (any, error) {
return registry.Execute(ctx, name, args)
})
// Human-in-the-loop activity
humanAct := workflow.HumanActivity(hitlManager)

Activities support configurable retry policies with exponential backoff and jitter:

policy := workflow.RetryPolicy{
MaxAttempts: 5,
InitialInterval: 200 * time.Millisecond,
BackoffCoefficient: 2.0,
MaxInterval: 30 * time.Second,
}
result, err := ctx.ExecuteActivity(riskyActivity, input,
workflow.WithActivityRetry(policy),
)

The default retry policy uses 3 attempts with 100ms initial interval and 2x backoff.

Hooks observe workflow lifecycle events without modifying execution:

hooks := workflow.Hooks{
OnWorkflowStart: func(ctx context.Context, wfID string, input any) { ... },
OnWorkflowComplete: func(ctx context.Context, wfID string, result any) { ... },
OnWorkflowFail: func(ctx context.Context, wfID string, err error) { ... },
OnActivityStart: func(ctx context.Context, wfID string, input any) { ... },
OnActivityComplete: func(ctx context.Context, wfID string, result any) { ... },
OnSignal: func(ctx context.Context, wfID string, signal workflow.Signal) { ... },
OnRetry: func(ctx context.Context, wfID string, err error) { ... },
}
// Compose multiple hook sets
combined := workflow.ComposeHooks(loggingHooks, metricsHooks)

Middleware wraps a DurableExecutor to add cross-cutting behavior:

type Middleware func(DurableExecutor) DurableExecutor
// Apply middleware (first in list is outermost)
executor = workflow.ApplyMiddleware(executor, loggingMW, metricsMW)
// Use the WithHooks middleware
executor = workflow.ApplyMiddleware(executor,
workflow.WithHooks(hooks),
)

List all registered executor providers at runtime:

names := workflow.List()
// Returns sorted list: ["default", "temporal"]
Use CaseRecommended
Development and testingDefault executor + In-Memory store
Production distributed workflowsTemporal executor
Cloud-native microservicesDefault executor + Dapr store
Event-driven serverlessDefault executor + Inngest store
Event streaming architecturesDefault executor + Kafka store
Lightweight distributed systemsDefault executor + NATS store