Skip to content
Docs

Core API — Streams, Runnable, Errors

import "github.com/lookatitude/beluga-ai/core"

Package core provides the foundational primitives for the Beluga AI framework.

It defines the universal execution model, typed event streaming, batch processing, lifecycle management, multi-tenancy, context propagation, functional options, and structured error handling that all other packages build upon.

Runnable is the universal execution interface. Every component that processes input — LLMs, tools, agents, pipelines — implements Runnable. It supports both synchronous invocation and streaming:

type Runnable interface {
Invoke(ctx context.Context, input any, opts ...Option) (any, error)
Stream(ctx context.Context, input any, opts ...Option) iter.Seq2[any, error]
}

Runnables compose via Pipe (sequential) and Parallel (concurrent):

pipeline := core.Pipe(tokenizer, llm)
result, err := pipeline.Invoke(ctx, "Hello, world!")
parallel := core.Parallel(agent1, agent2, agent3)
results, err := parallel.Invoke(ctx, input)

Stream is a pull-based event iterator built on Go 1.23+ [iter.Seq2]. Events are generic via Event, carrying a typed payload, event type, optional error, and metadata:

for event, err := range stream {
if err != nil { break }
switch event.Type {
case core.EventData:
fmt.Print(event.Payload)
case core.EventToolCall:
// handle tool invocation
}
}

Stream utilities include CollectStream, MapStream, FilterStream, MergeStreams, and FanOut for transforming and combining streams. BufferedStream adds backpressure control between fast producers and slow consumers, and FlowController provides semaphore-based concurrency limiting.

BatchInvoke executes a function over multiple inputs concurrently with configurable concurrency limits, per-item timeouts, and retry policies:

results := core.BatchInvoke(ctx, embedFn, documents, core.BatchOptions{
MaxConcurrency: 10,
Timeout: 5 * time.Second,
})

The Lifecycle interface provides Start/Stop/Health semantics for components that require explicit initialization and graceful shutdown. App manages a set of Lifecycle components, starting them in registration order and stopping them in reverse:

app := core.NewApp()
app.Register(dbPool, cacheLayer, httpServer)
if err := app.Start(ctx); err != nil {
log.Fatal(err)
}
defer app.Shutdown(ctx)

WithTenant and GetTenant store and retrieve a TenantID from context, enabling tenant-scoped data isolation across all framework operations.

WithSessionID, GetSessionID, WithRequestID, and GetRequestID propagate session and request identifiers through context for correlation across distributed traces and logs.

Error carries an operation name, ErrorCode, human-readable message, and optional wrapped cause. Error codes like ErrRateLimit, ErrTimeout, and ErrProviderDown enable programmatic retry decisions via IsRetryable:

if core.IsRetryable(err) {
// safe to retry
}

The Option interface and OptionFunc adapter implement the functional options pattern used throughout the framework for configuration.