Real-Time AI Data Analysis Agent
Business teams need answers to data questions — “What were our top products last quarter?” or “Which regions show declining retention?” — but getting these answers requires SQL knowledge, API access, statistical computation, and report formatting. Each question becomes a multi-hour request to the analytics team, creating a bottleneck where business decisions wait on data availability.
The gap is not data access but data reasoning: the information exists across databases and APIs, but synthesizing it into an actionable answer requires combining queries, calculations, and domain context. Traditional dashboards answer pre-defined questions; novel questions still require human analysts.
An AI agent with tool access bridges this gap by autonomously deciding which data sources to query, what calculations to perform, and how to synthesize findings into a natural language report — turning ad-hoc data questions into self-service insights.
Solution Architecture
Section titled “Solution Architecture”Beluga AI’s agent framework with tool.NewFuncTool[T] creates typed tools that the agent can invoke autonomously. The agent uses a ReAct planning strategy to reason about which tools to call, in what order, and how to combine results. Tools are defined with JSON schema descriptions so the LLM understands their capabilities and input requirements. The guard pipeline protects the database with read-only SQL validation.
┌──────────────┐│ Analyst ││ (Natural │──────────────────────────────────────┐│ Language │ ││ Query) │ │└──────┬───────┘ │ │ │ ▼ ▼┌──────────────┐ ┌──────────────┐ ┌──────────────┐│ Agent │───▶│ Tool: │ │ Tool: ││ (ReAct │ │ SQL Query │ │ Calculator ││ Planner) │───▶│ │ │ ││ │ └──────────────┘ └──────────────┘│ │───▶┌──────────────┐ ┌──────────────┐│ │ │ Tool: │ │ Tool: ││ │ │ REST API │ │ Report Gen │└──────────────┘ └──────────────┘ └──────────────┘Building the Analysis Agent
Section titled “Building the Analysis Agent”Define tools that connect to your data sources, then let the agent reason about how to answer the question.
package main
import ( "context" "database/sql" "encoding/json" "fmt" "math"
"github.com/lookatitude/beluga-ai/agent" "github.com/lookatitude/beluga-ai/llm" "github.com/lookatitude/beluga-ai/tool" "github.com/lookatitude/beluga-ai/schema"
_ "github.com/lookatitude/beluga-ai/llm/providers/openai")
// SQLQueryInput defines the input schema for the SQL tool.type SQLQueryInput struct { Query string `json:"query" jsonschema:"description=Read-only SQL query to execute"`}
// StatsInput defines the input schema for the statistics tool.type StatsInput struct { Values []float64 `json:"values" jsonschema:"description=Numeric values to analyze"`}
// StatsOutput holds computed statistics.type StatsOutput struct { Mean float64 `json:"mean"` Median float64 `json:"median"` StdDev float64 `json:"std_dev"` Min float64 `json:"min"` Max float64 `json:"max"` Count int `json:"count"`}
func createAnalysisAgent(ctx context.Context, db *sql.DB) (agent.Agent, error) { model, err := llm.New("openai", nil) if err != nil { return nil, fmt.Errorf("create model: %w", err) }
// SQL query tool — read-only access to the database sqlTool := tool.NewFuncTool[SQLQueryInput]( "query_database", "Execute a read-only SQL query against the analytics database", func(ctx context.Context, input SQLQueryInput) (*tool.Result, error) { rows, err := db.QueryContext(ctx, input.Query) if err != nil { return tool.ErrorResult(err), nil } defer rows.Close()
results, err := scanRowsToJSON(rows) if err != nil { return tool.ErrorResult(err), nil }
data, err := json.MarshalIndent(results, "", " ") if err != nil { return tool.ErrorResult(err), nil } return tool.TextResult(string(data)), nil }, )
// Statistics calculator tool statsTool := tool.NewFuncTool[StatsInput]( "calculate_statistics", "Calculate mean, median, standard deviation, min, and max for a set of values", func(ctx context.Context, input StatsInput) (*tool.Result, error) { stats := computeStats(input.Values) data, err := json.MarshalIndent(stats, "", " ") if err != nil { return tool.ErrorResult(err), nil } return tool.TextResult(string(data)), nil }, )
analysisAgent, err := agent.New( agent.WithID("data-analyst"), agent.WithPersona(agent.Persona{ Role: "Senior Data Analyst", Goal: "Provide accurate, data-driven insights from available sources", Backstory: "You analyze data by querying databases, computing statistics, " + "and synthesizing findings into clear, actionable reports. " + "Always show your methodology and cite the data behind conclusions.", }), agent.WithModel(model), agent.WithTools(sqlTool, statsTool), ) if err != nil { return nil, fmt.Errorf("create agent: %w", err) }
return analysisAgent, nil}Executing Analysis Queries
Section titled “Executing Analysis Queries”The agent autonomously decides which tools to use based on the question:
func main() { ctx := context.Background()
db, err := sql.Open("postgres", os.Getenv("DATABASE_URL")) if err != nil { log.Fatal(err) } defer db.Close()
analysisAgent, err := createAnalysisAgent(ctx, db) if err != nil { log.Fatal(err) }
// The agent will: 1) query the database, 2) compute stats, 3) generate report answer, err := analysisAgent.Invoke(ctx, "What were our top 5 products by revenue last quarter? "+ "Include growth rates compared to the previous quarter.", ) if err != nil { log.Fatal(err) }
fmt.Println(answer)}Streaming Analysis Results
Section titled “Streaming Analysis Results”For long-running analyses, stream intermediate results so the user sees progress:
for event, err := range analysisAgent.Stream(ctx, "Analyze customer churn trends") { if err != nil { log.Fatal(err) }
switch event.Type { case agent.EventToolCall: fmt.Printf("Querying: %s\n", event.ToolCall.Name) case agent.EventToolResult: fmt.Printf("Got data (%d chars)\n", len(event.ToolResult.Content[0].(schema.TextPart).Text)) case agent.EventText: fmt.Print(event.Text) }}Structured Report Output
Section titled “Structured Report Output”When you need structured reports rather than free-form text, combine the agent with structured output:
type AnalysisReport struct { Title string `json:"title"` Summary string `json:"summary"` KeyFindings []Finding `json:"key_findings"` Methodology string `json:"methodology"` DataSources []string `json:"data_sources"`}
type Finding struct { Metric string `json:"metric"` Value string `json:"value"` Trend string `json:"trend" jsonschema:"enum=up,down,flat"` Confidence float64 `json:"confidence"`}
func generateReport(ctx context.Context, model llm.ChatModel, rawAnalysis string) (AnalysisReport, error) { structured := llm.NewStructured[AnalysisReport](model)
msgs := []schema.Message{ &schema.SystemMessage{Parts: []schema.ContentPart{ schema.TextPart{Text: "Structure this analysis into a formal report."}, }}, &schema.HumanMessage{Parts: []schema.ContentPart{ schema.TextPart{Text: rawAnalysis}, }}, }
return structured.Generate(ctx, msgs)}Adding External API Tools
Section titled “Adding External API Tools”Extend the agent’s capabilities by adding tools for external data sources:
type APIQueryInput struct { Endpoint string `json:"endpoint" jsonschema:"description=API endpoint path"` Params map[string]string `json:"params" jsonschema:"description=Query parameters"`}
apiTool := tool.NewFuncTool[APIQueryInput]( "query_analytics_api", "Fetch data from the analytics REST API", func(ctx context.Context, input APIQueryInput) (*tool.Result, error) { resp, err := analyticsClient.Get(ctx, input.Endpoint, input.Params) if err != nil { return tool.ErrorResult(err), nil } return tool.TextResult(string(resp)), nil },)Production Considerations
Section titled “Production Considerations”Safety Guards
Section titled “Safety Guards”An agent with SQL access could potentially execute destructive queries (DROP TABLE, DELETE). Beluga AI’s guard pipeline applies a tool guard that validates SQL input before execution, blocking write operations at the framework level rather than relying on database permissions alone. This defense-in-depth approach catches prompt injection attempts that try to trick the agent into running unauthorized queries:
import "github.com/lookatitude/beluga-ai/guard"
// Guard that blocks write operations in SQLsqlGuard := guard.GuardFunc("sql-read-only", func(ctx context.Context, input guard.GuardInput) (guard.GuardResult, error) { // Check for write keywords upper := strings.ToUpper(input.Content) for _, keyword := range []string{"INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE"} { if strings.Contains(upper, keyword) { return guard.GuardResult{ Allowed: false, Reason: "Write operations are not permitted", }, nil } } return guard.GuardResult{Allowed: true}, nil})Resilience
Section titled “Resilience”Wrap database and API calls with retry logic for transient failures:
import "github.com/lookatitude/beluga-ai/resilience"
policy := resilience.RetryPolicy{ MaxAttempts: 3, InitialBackoff: 200 * time.Millisecond, MaxBackoff: 2 * time.Second, BackoffFactor: 2.0, Jitter: true,}
result, err := resilience.Retry(ctx, policy, func(ctx context.Context) (string, error) { return analysisAgent.Invoke(ctx, question)})Observability
Section titled “Observability”Track agent execution, tool calls, and query performance:
tracer := otel.Tracer("data-analyst")ctx, span := tracer.Start(ctx, "analysis.query")defer span.End()
span.SetAttributes( attribute.String("analysis.question", question), attribute.Int("analysis.tool_calls", toolCallCount), attribute.Float64("analysis.duration_ms", durationMs),)Scaling
Section titled “Scaling”- Connection pooling: Use
sql.DBconnection pooling for database queries. SetMaxOpenConnsandMaxIdleConnsappropriately. - Query timeouts: Set
context.WithTimeouton each database query to prevent long-running queries from blocking the agent. - Caching: Cache frequently requested analyses with TTL-based expiration. Use Beluga AI’s
cache/package for semantic cache. - Rate limiting: Limit the number of concurrent agent executions to prevent database overload.
Security
Section titled “Security”- Use read-only database credentials for the SQL tool
- Validate and sanitize SQL queries through the guard pipeline before execution
- Restrict API tool access to approved endpoints
- Log all queries for audit purposes using OpenTelemetry
Related Resources
Section titled “Related Resources”- Building Your First Agent for planner strategy selection
- Tools & MCP for building custom tools
- LLM Recipes for typed responses