Skip to content
Docs

Temporal Workflow Provider

The Temporal provider implements both the workflow.DurableExecutor and workflow.WorkflowStore interfaces using Temporal. Temporal is a distributed, durable execution platform that provides workflow orchestration with fault tolerance, signal handling, activity execution, and replay-based recovery. This provider maps Beluga workflow concepts directly to Temporal primitives.

Choose Temporal when you need production-grade durable execution with full workflow orchestration: activity retries, signal-based human-in-the-loop, deterministic replay for recovery, and distributed task queues. Temporal is the right choice for long-running, mission-critical workflows where fault tolerance is essential. For lightweight development and testing, use the In-Memory store instead.

This is the only provider that registers into the workflow.DurableExecutor registry via init(), making it available through workflow.New("temporal", cfg).

Terminal window
go get github.com/lookatitude/beluga-ai/workflow/providers/temporal

Start a local Temporal server for development:

Terminal window
temporal server start-dev
package main
import (
"context"
"fmt"
"log"
"time"
"go.temporal.io/sdk/client"
"github.com/lookatitude/beluga-ai/workflow"
_ "github.com/lookatitude/beluga-ai/workflow/providers/temporal"
)
func main() {
// Create a Temporal SDK client
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
})
if err != nil {
log.Fatal(err)
}
defer c.Close()
// Create executor via the registry
executor, err := workflow.New("temporal", workflow.Config{
Extra: map[string]any{
"client": c,
"task_queue": "my-workflows",
},
})
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
handle, err := executor.Execute(ctx, myWorkflow, workflow.WorkflowOptions{
ID: "order-123",
Input: "process payment",
Timeout: 30 * time.Minute,
})
if err != nil {
log.Fatal(err)
}
result, err := handle.Result(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println("Result:", result)
}
func myWorkflow(ctx workflow.WorkflowContext, input any) (any, error) {
result, err := ctx.ExecuteActivity(processPayment, input)
if err != nil {
return nil, fmt.Errorf("process payment: %w", err)
}
return result, nil
}
func processPayment(ctx context.Context, input any) (any, error) {
return fmt.Sprintf("paid: %v", input), nil
}
ParameterTypeDefaultDescription
Clientclient.Client(required)Temporal SDK client
TaskQueuestring"beluga-workflows"Task queue for workflows and activities
DefaultTimeouttime.Duration10 * time.MinuteDefault workflow execution timeout

When using the registry (workflow.New), pass configuration via Extra:

KeyTypeDefaultDescription
"client"client.Client(required)Temporal SDK client
"task_queue"string"beluga-workflows"Task queue name

Create an executor directly without the registry:

import (
"go.temporal.io/sdk/client"
temporalwf "github.com/lookatitude/beluga-ai/workflow/providers/temporal"
)
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
})
if err != nil {
log.Fatal(err)
}
defer c.Close()
executor, err := temporalwf.NewExecutor(temporalwf.Config{
Client: c,
TaskQueue: "my-queue",
DefaultTimeout: 1 * time.Hour,
})
if err != nil {
log.Fatal(err)
}
func (e *Executor) Execute(ctx context.Context, fn workflow.WorkflowFunc, opts workflow.WorkflowOptions) (workflow.WorkflowHandle, error)

Starts a new Temporal workflow execution. The Beluga WorkflowFunc is wrapped in a Temporal-compatible function that creates a WorkflowContext bridging Beluga operations to Temporal primitives. If no ID is provided in opts, one is generated using a timestamp.

func (e *Executor) Signal(ctx context.Context, workflowID string, signal workflow.Signal) error

Sends a signal to a running Temporal workflow. The Signal.Name maps to the Temporal signal name and Signal.Payload carries the data.

func (e *Executor) Query(ctx context.Context, workflowID string, queryType string) (any, error)

Queries a running Temporal workflow. The queryType is passed as the Temporal query type. The response is deserialized into any.

func (e *Executor) Cancel(ctx context.Context, workflowID string) error

Requests cancellation of a running Temporal workflow.

The Temporal provider bridges the Beluga WorkflowContext interface to Temporal’s workflow primitives:

func (c *temporalContext) ExecuteActivity(fn workflow.ActivityFunc, input any, opts ...workflow.ActivityOption) (any, error)

Executes an activity within the Temporal workflow context. Activities run on the configured task queue with a default 5-minute start-to-close timeout. The result is decoded from Temporal’s response payload.

func (c *temporalContext) ReceiveSignal(name string) <-chan any

Returns a Go channel that delivers signal payloads. Internally, this launches a Temporal coroutine (workflow.Go) that listens on the named signal channel and bridges payloads to a standard Go channel.

func (c *temporalContext) Sleep(d time.Duration) error

Pauses workflow execution for the specified duration using Temporal’s deterministic timer. Unlike time.Sleep, this is recorded in the workflow history and replayed correctly during recovery.

The WorkflowHandle returned by Execute wraps a Temporal WorkflowRun:

handle, err := executor.Execute(ctx, myWorkflow, opts)
// Get the workflow ID
id := handle.ID()
// Get the Temporal run ID
runID := handle.RunID()
// Check status (always "running" until Result() is called)
status := handle.Status()
// Wait for the workflow to complete
result, err := handle.Result(ctx)

The Temporal provider also implements workflow.WorkflowStore for scenarios where you need the store interface. Since Temporal manages state internally, most operations are lightweight:

store := temporalwf.NewStore(temporalClient, "default")
MethodBehavior
SaveNo-op (Temporal manages state)
LoadRetrieves the workflow run and returns basic state
ListReturns nil (requires Temporal visibility features)
DeleteNo-op (Temporal manages lifecycle)

Temporal’s signal mechanism integrates with Beluga’s workflow patterns for human-in-the-loop scenarios:

func approvalWorkflow(ctx workflow.WorkflowContext, input any) (any, error) {
// Execute the initial processing activity
processed, err := ctx.ExecuteActivity(preProcess, input)
if err != nil {
return nil, err
}
// Wait for human approval via signal
approvalCh := ctx.ReceiveSignal("approval")
decision := <-approvalCh
if decision != "approved" {
return nil, fmt.Errorf("rejected by reviewer")
}
// Continue processing after approval
return ctx.ExecuteActivity(postProcess, processed)
}
// Send approval from an external system
err := executor.Signal(ctx, "wf-123", workflow.Signal{
Name: "approval",
Payload: "approved",
})

Temporal retry policies can be configured at the workflow level via Beluga’s RetryPolicy:

// The Temporal provider maps Beluga retry policies to Temporal retry policies:
// - MaxAttempts -> MaximumAttempts
// - InitialInterval -> InitialInterval
// - BackoffCoefficient -> BackoffCoefficient
// - MaxInterval -> MaximumInterval

Activity-level retries use Beluga’s WithActivityRetry option, which is applied via the Temporal activity options.

This provider depends on the Temporal Go SDK:

  • go.temporal.io/sdk/client
  • go.temporal.io/sdk/workflow
  • go.temporal.io/sdk/temporal

The Temporal WorkflowContext bridges Temporal’s deterministic execution model to Go’s context.Context. Note the following:

  • Done() returns nil because Temporal uses its own internal channel type for cancellation. Use Err() to check cancellation status instead of select on Done().
  • Sleep is durable — it survives process restarts, unlike time.Sleep.
  • Activities run within the configured task queue and are distributed across Temporal workers.
  • Signal channels are bridged via a Temporal coroutine (workflow.Go), so they behave like standard Go channels from the caller’s perspective.
  • Task Queue: Use dedicated task queues per workflow type for isolation.
  • Timeouts: Set appropriate workflow and activity timeouts to prevent resource leaks.
  • Workers: Deploy Temporal workers to execute the registered workflows and activities.
  • Namespace: Use separate Temporal namespaces for development, staging, and production.
  • Retry Policies: Configure retry policies on activities to handle transient failures gracefully.