Skip to content
Docs

Core API — Streams, Runnable, Errors

import "github.com/lookatitude/beluga-ai/core"

Package core provides the foundational primitives for the Beluga AI framework.

It defines the universal execution model, typed event streaming, batch processing, lifecycle management, multi-tenancy, context propagation, functional options, and structured error handling that all other packages build upon.

Runnable is the universal execution interface. Every component that processes input — LLMs, tools, agents, pipelines — implements Runnable. It supports both synchronous invocation and streaming:

type Runnable interface {
Invoke(ctx context.Context, input any, opts ...Option) (any, error)
Stream(ctx context.Context, input any, opts ...Option) iter.Seq2[any, error]
}

Runnables compose via Pipe (sequential) and Parallel (concurrent):

package main
import (
"context"
"fmt"
"log"
"github.com/lookatitude/beluga-ai/core"
)
func main() {
ctx := context.Background()
pipeline := core.Pipe(tokenizer, llmRunnable)
result, err := pipeline.Invoke(ctx, "Hello, world!")
if err != nil {
log.Fatal(err)
}
fmt.Println(result)
fanOut := core.Parallel(agent1, agent2, agent3)
results, err := fanOut.Invoke(ctx, input)
if err != nil {
log.Fatal(err)
}
// results is []any with one entry per Runnable
fmt.Println(results)
}

Pipe invokes a synchronously, then passes its output as input to b. Parallel fans out to all Runnables concurrently and returns all results as []any. If any Runnable returns an error, the first error is returned and remaining results may be incomplete.

Stream is a pull-based iterator built on Go 1.23+ iter.Seq2. Consumers range over the returned sequence:

stream := r.Stream(ctx, input)
for val, err := range stream {
if err != nil {
break
}
fmt.Println(val)
}

BatchInvoke executes a function over multiple inputs concurrently. It is generic over the input type I and output type O, and returns a []BatchResult[O] with one entry per input at the corresponding index:

package main
import (
"context"
"fmt"
"log"
"time"
"github.com/lookatitude/beluga-ai/core"
)
func embed(ctx context.Context, text string) ([]float32, error) {
// call embedding model
return nil, nil
}
func main() {
ctx := context.Background()
docs := []string{"doc one", "doc two", "doc three"}
results := core.BatchInvoke(ctx, embed, docs, core.BatchOptions{
MaxConcurrency: 10,
Timeout: 5 * time.Second,
})
for i, r := range results {
if r.Err != nil {
log.Printf("item %d failed: %v", i, r.Err)
continue
}
fmt.Printf("item %d: %d dimensions\n", i, len(r.Value))
}
}

BatchOptions fields:

FieldTypeDescription
MaxConcurrencyintMax concurrent executions. 0 = unlimited.
BatchSizeintInformational; BatchInvoke calls fn once per item.
Timeouttime.DurationPer-item timeout. 0 = no per-item timeout.
RetryPolicy*RetryPolicyOptional retry configuration per item.

BatchResult[O] carries Value O and Err error for each input.

The Lifecycle interface provides Start/Stop/Health semantics for components that require explicit initialization and graceful shutdown. App manages a set of Lifecycle components, starting them in registration order and stopping them in reverse:

app := core.NewApp()
app.Register(dbPool, cacheLayer, httpServer)
if err := app.Start(ctx); err != nil {
log.Fatal(err)
}
defer app.Shutdown(ctx)

WithTenant and GetTenant store and retrieve a TenantID from context, enabling tenant-scoped data isolation across all framework operations. TenantID is a named string type:

package main
import (
"context"
"fmt"
"github.com/lookatitude/beluga-ai/core"
)
func main() {
ctx := core.WithTenant(context.Background(), core.TenantID("acme-corp"))
tenantID := core.GetTenant(ctx)
fmt.Println(tenantID) // "acme-corp"
}

WithSessionID, GetSessionID, WithRequestID, and GetRequestID propagate session and request identifiers through context for correlation across distributed traces and logs:

ctx = core.WithSessionID(ctx, "session-abc")
ctx = core.WithRequestID(ctx, "req-xyz")
sessionID := core.GetSessionID(ctx) // "session-abc"
requestID := core.GetRequestID(ctx) // "req-xyz"

Error carries an operation name, ErrorCode, human-readable message, and optional wrapped cause:

type Error struct {
Op string
Code ErrorCode
Message string
Err error
}

Create errors with NewError:

err := core.NewError("llm.generate", core.ErrRateLimit, "quota exceeded", cause)

Error codes and their retryability:

CodeConstantRetryable
rate_limitErrRateLimityes
timeoutErrTimeoutyes
provider_unavailableErrProviderDownyes
auth_errorErrAuthno
invalid_inputErrInvalidInputno
tool_failedErrToolFailedno
guard_blockedErrGuardBlockedno
budget_exhaustedErrBudgetExhaustedno
not_foundErrNotFoundno

Use IsRetryable to decide whether to retry:

if core.IsRetryable(err) {
// safe to retry
}

Error implements errors.Is by matching on Code, and errors.As via Unwrap, so standard Go error chain traversal works as expected.

The Option interface and OptionFunc adapter implement the functional options pattern used throughout the framework for configuration.

  • docs/concepts.md — Design decisions
  • docs/packages.md — Package layout
  • llm — ChatModel built on Runnable
  • agent — Agent built on Runnable