Skip to content
Docs

High-Availability Streaming Proxy

Streaming LLM responses directly from providers works for prototypes, but production systems face compounding reliability challenges at scale. Each client connection is a long-lived HTTP stream, vulnerable to network interruptions, provider-side timeouts, and rate limit rejections. At peak traffic, a cloud services provider observed 5-10% failure rates on direct connections — each failure breaks the user’s real-time streaming experience and requires a full page reload or manual retry.

The problem is amplified by the nature of streaming: unlike request-response calls, a failed stream means the user has already seen partial output. Reconnecting to the same provider may return a different response, creating an inconsistent experience. Without connection management, each request opens a new TCP connection, adding latency and exhausting connection limits.

A high-availability streaming proxy solves these problems by sitting between clients and providers, managing connection lifecycle, monitoring provider health, and transparently failing over to backup providers when the primary degrades.

Beluga AI’s streaming pattern (iter.Seq2[schema.StreamChunk, error]) provides a composable foundation for building proxy layers. The streaming proxy implements connection pooling to amortize TCP setup costs, health monitoring to detect degraded providers before they affect users, and automatic failover to route traffic to healthy providers without client-side changes.

┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Client │───▶│ Streaming │───▶│ Health │
│ Requests │ │ Proxy │ │ Monitor │
└──────────────┘ └──────┬───────┘ └──────┬───────┘
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Connection │◀───│ Primary │
│ Pool │ │ Provider │
└──────┬───────┘ └──────────────┘
┌──────────────┐
│ Response │
│ Stream │
└──────────────┘

The proxy manages connections and routes requests to healthy providers. It implements llm.ChatModel’s Stream method using iter.Seq2, making it composable with any Beluga AI component that accepts a model — middleware, agents, and orchestration chains all work transparently through the proxy.

package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/lookatitude/beluga-ai/core"
"github.com/lookatitude/beluga-ai/llm"
"github.com/lookatitude/beluga-ai/schema"
)
// StreamingProxy implements high-availability streaming for LLM operations.
type StreamingProxy struct {
primary llm.ChatModel
backup llm.ChatModel
connPool *ConnectionPool
healthMon *HealthMonitor
}
// NewStreamingProxy creates a new high-availability streaming proxy.
func NewStreamingProxy(
primary llm.ChatModel,
backup llm.ChatModel,
) *StreamingProxy {
return &StreamingProxy{
primary: primary,
backup: backup,
connPool: NewConnectionPool(10),
healthMon: NewHealthMonitor(primary, backup),
}
}
// Stream streams responses with automatic failover.
func (p *StreamingProxy) Stream(
ctx context.Context,
msgs []schema.Message,
opts ...llm.GenerateOption,
) iter.Seq2[schema.StreamChunk, error] {
return func(yield func(schema.StreamChunk, error) bool) {
// Select healthy provider
provider := p.selectProvider(ctx)
// Get connection from pool
conn, err := p.connPool.Acquire(ctx)
if err != nil {
yield(schema.StreamChunk{}, fmt.Errorf("connection acquisition failed: %w", err))
return
}
defer p.connPool.Release(conn)
// Stream from provider with failover
for chunk, err := range provider.Stream(ctx, msgs, opts...) {
if err != nil {
// Attempt failover on error
p.healthMon.RecordFailure(provider)
backupProvider := p.selectProvider(ctx)
if backupProvider != provider {
// Retry with backup
for chunk, err := range backupProvider.Stream(ctx, msgs, opts...) {
if !yield(chunk, err) {
return
}
}
return
}
yield(chunk, err)
return
}
if !yield(chunk, nil) {
return
}
}
p.healthMon.RecordSuccess(provider)
}
}
func (p *StreamingProxy) selectProvider(ctx context.Context) llm.ChatModel {
if p.healthMon.IsHealthy(p.primary) {
return p.primary
}
return p.backup
}

Opening a new TCP connection per request adds 50-150ms of latency and risks exhausting OS-level connection limits under load. The connection pool pre-allocates and reuses connections, reducing per-request overhead to near zero. When the pool is exhausted, callers block with context-aware timeouts rather than failing immediately — this applies backpressure naturally without dropping requests.

type Connection struct {
ID string
CreatedAt time.Time
LastUsed time.Time
}
type ConnectionPool struct {
maxSize int
connections chan *Connection
active map[string]*Connection
mu sync.Mutex
}
func NewConnectionPool(size int) *ConnectionPool {
return &ConnectionPool{
maxSize: size,
connections: make(chan *Connection, size),
active: make(map[string]*Connection),
}
}
func (cp *ConnectionPool) Acquire(ctx context.Context) (*Connection, error) {
select {
case conn := <-cp.connections:
conn.LastUsed = time.Now()
return conn, nil
default:
// Create new connection if pool not exhausted
cp.mu.Lock()
if len(cp.active) < cp.maxSize {
conn := &Connection{
ID: fmt.Sprintf("conn-%d", len(cp.active)),
CreatedAt: time.Now(),
LastUsed: time.Now(),
}
cp.active[conn.ID] = conn
cp.mu.Unlock()
return conn, nil
}
cp.mu.Unlock()
// Wait for available connection
select {
case conn := <-cp.connections:
conn.LastUsed = time.Now()
return conn, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
func (cp *ConnectionPool) Release(conn *Connection) {
select {
case cp.connections <- conn:
default:
// Pool is full, close connection
cp.mu.Lock()
delete(cp.active, conn.ID)
cp.mu.Unlock()
}
}

Reactive failover — waiting for a user request to fail before switching providers — adds latency to the worst-case path. Active health monitoring probes providers on a background ticker, detecting degradation before user traffic is affected. The monitor uses a simple failure counter with a threshold: three consecutive failures marks a provider unhealthy. This avoids flapping on single transient errors while responding quickly to genuine outages.

type HealthMonitor struct {
primary llm.ChatModel
backup llm.ChatModel
primaryFails int
backupFails int
lastCheck time.Time
checkInterval time.Duration
mu sync.RWMutex
}
func NewHealthMonitor(primary, backup llm.ChatModel) *HealthMonitor {
hm := &HealthMonitor{
primary: primary,
backup: backup,
checkInterval: 30 * time.Second,
}
go hm.monitorHealth(context.Background())
return hm
}
func (hm *HealthMonitor) monitorHealth(ctx context.Context) {
ticker := time.NewTicker(hm.checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
hm.checkHealth(ctx)
}
}
}
func (hm *HealthMonitor) checkHealth(ctx context.Context) {
// Simple health check with minimal prompt
testMsg := []schema.Message{
&schema.HumanMessage{
Parts: []schema.ContentPart{
schema.TextPart{Text: "ping"},
},
},
}
// Check primary
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
_, err := hm.primary.Generate(ctx, testMsg)
if err != nil {
hm.RecordFailure(hm.primary)
} else {
hm.RecordSuccess(hm.primary)
}
}
func (hm *HealthMonitor) IsHealthy(provider llm.ChatModel) bool {
hm.mu.RLock()
defer hm.mu.RUnlock()
if provider == hm.primary {
return hm.primaryFails < 3
}
return hm.backupFails < 3
}
func (hm *HealthMonitor) RecordSuccess(provider llm.ChatModel) {
hm.mu.Lock()
defer hm.mu.Unlock()
if provider == hm.primary {
hm.primaryFails = 0
} else {
hm.backupFails = 0
}
}
func (hm *HealthMonitor) RecordFailure(provider llm.ChatModel) {
hm.mu.Lock()
defer hm.mu.Unlock()
if provider == hm.primary {
hm.primaryFails++
} else {
hm.backupFails++
}
}

The proxy also supports non-streaming requests:

func (p *StreamingProxy) Generate(
ctx context.Context,
msgs []schema.Message,
opts ...llm.GenerateOption,
) (*schema.AIMessage, error) {
provider := p.selectProvider(ctx)
conn, err := p.connPool.Acquire(ctx)
if err != nil {
return nil, fmt.Errorf("connection acquisition failed: %w", err)
}
defer p.connPool.Release(conn)
result, err := provider.Generate(ctx, msgs, opts...)
if err != nil {
p.healthMon.RecordFailure(provider)
// Attempt failover
backupProvider := p.selectProvider(ctx)
if backupProvider != provider {
return backupProvider.Generate(ctx, msgs, opts...)
}
return nil, err
}
p.healthMon.RecordSuccess(provider)
return result, nil
}

Connection pool size should be tuned based on traffic patterns. Start with 10 connections per instance and monitor pool utilization. If connections are frequently exhausted, increase the pool size. Track wait times and connection reuse rates.

Active health checks should be lightweight to minimize overhead. Use a simple ping-style prompt and short timeout (5 seconds). Check frequency depends on provider reliability—more frequent checks for providers with known issues.

For multi-instance deployments, use a load balancer to distribute requests across proxy instances. Each instance maintains its own connection pool and health state. Consider sticky sessions for stateful operations.

Track key metrics for proxy operations:

import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
func (p *StreamingProxy) recordMetrics(ctx context.Context, provider llm.ChatModel, duration time.Duration) {
meter := otel.Meter("streaming-proxy")
histogram, _ := meter.Float64Histogram("proxy_request_duration_seconds")
histogram.Record(ctx, duration.Seconds(),
metric.WithAttributes(
attribute.String("provider", getProviderName(provider)),
),
)
counter, _ := meter.Int64Counter("proxy_requests_total")
counter.Add(ctx, 1,
metric.WithAttributes(
attribute.String("provider", getProviderName(provider)),
),
)
}
func getProviderName(provider llm.ChatModel) string {
return provider.ModelID()
}

When both primary and backup providers are unhealthy, the proxy should fail gracefully with informative error messages. Consider implementing a queue for requests during temporary outages.

MetricBeforeAfterImprovement
Uptime99.5%99.992%0.49% improvement
Request Failure Rate5-10%0.08%98-99% reduction
Average Latency (ms)200-20008557-96% reduction
P95 Latency (ms)500018096% reduction
Manual Interventions/Month80100% reduction