AWS S3 Event-Driven Loader
In many organizations, documents arrive continuously — uploaded by users, generated by internal systems, or synced from external sources into S3 buckets. Manually triggering re-indexing is fragile and introduces lag between document availability and searchability. An event-driven loader eliminates this gap by processing documents automatically as they arrive.
This guide demonstrates how to build a custom S3 event-driven loader that connects S3 upload events to your Beluga AI RAG pipeline, enabling real-time document ingestion without manual intervention.
Overview
Section titled “Overview”The S3 event-driven loader listens for S3 object creation events and automatically loads new documents into your RAG pipeline. This pattern is common in enterprise environments where documents are uploaded to S3 by external systems and need to be processed without manual intervention.
The loader implements Beluga AI’s DocumentLoader interface from the rag/loader package, making it compatible with the full RAG pipeline including splitters, embedders, and vector stores.
Prerequisites
Section titled “Prerequisites”- Go 1.23 or later
- Beluga AI framework installed
- AWS account with S3 and SQS/Lambda access
- AWS credentials configured (IAM role or access keys)
Installation
Section titled “Installation”Install the AWS SDK for Go v2:
go get github.com/aws/aws-sdk-go-v2/service/s3go get github.com/aws/aws-sdk-go-v2/configConfigure your AWS credentials. In production, use IAM roles attached to your compute resource (EC2, ECS, Lambda). For local development:
export AWS_REGION="us-east-1"export AWS_ACCESS_KEY_ID="your-key"export AWS_SECRET_ACCESS_KEY="your-secret"The DocumentLoader Interface
Section titled “The DocumentLoader Interface”Before building the custom loader, review the interface it must satisfy:
// From github.com/lookatitude/beluga-ai/rag/loadertype DocumentLoader interface { Load(ctx context.Context, source string) ([]schema.Document, error)}Each document returned uses schema.Document with a Content field for the text body and Metadata for key-value pairs such as the S3 source URI, object key, and size.
Basic S3 Loader
Section titled “Basic S3 Loader”Build a loader that reads all objects from an S3 bucket and returns them as documents:
package main
import ( "context" "fmt" "io" "log" "strings"
"github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/lookatitude/beluga-ai/schema")
// S3Loader loads documents from an S3 bucket. It implements a Load method// compatible with Beluga AI's document loading patterns.type S3Loader struct { client *s3.Client bucket string}
// NewS3Loader creates a new S3 loader for the given bucket.func NewS3Loader(ctx context.Context, bucket string) (*S3Loader, error) { cfg, err := awsconfig.LoadDefaultConfig(ctx) if err != nil { return nil, fmt.Errorf("load aws config: %w", err) }
return &S3Loader{ client: s3.NewFromConfig(cfg), bucket: bucket, }, nil}
// Load reads all objects from the configured bucket and returns them as// documents. The source parameter can specify a key prefix to filter objects.func (l *S3Loader) Load(ctx context.Context, prefix string) ([]schema.Document, error) { input := &s3.ListObjectsV2Input{ Bucket: aws.String(l.bucket), } if prefix != "" { input.Prefix = aws.String(prefix) }
result, err := l.client.ListObjectsV2(ctx, input) if err != nil { return nil, fmt.Errorf("list objects: %w", err) }
var docs []schema.Document for _, obj := range result.Contents { doc, err := l.loadObject(ctx, *obj.Key) if err != nil { log.Printf("skipping %s: %v", *obj.Key, err) continue } docs = append(docs, doc) }
return docs, nil}
func (l *S3Loader) loadObject(ctx context.Context, key string) (schema.Document, error) { resp, err := l.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(l.bucket), Key: aws.String(key), }) if err != nil { return schema.Document{}, fmt.Errorf("get object %s: %w", key, err) } defer resp.Body.Close()
var b strings.Builder if _, err := io.Copy(&b, resp.Body); err != nil { return schema.Document{}, fmt.Errorf("read object %s: %w", key, err) }
return schema.Document{ Content: b.String(), Metadata: map[string]any{ "source": fmt.Sprintf("s3://%s/%s", l.bucket, key), "key": key, }, }, nil}Handling S3 Events
Section titled “Handling S3 Events”For real-time ingestion, process S3 event notifications delivered via SQS or Lambda. Define the event structure and a handler:
// S3Event represents an S3 event notification. This structure matches the// JSON format sent by S3 to SQS or Lambda.type S3Event struct { Records []struct { S3 struct { Bucket struct { Name string `json:"name"` } `json:"bucket"` Object struct { Key string `json:"key"` } `json:"object"` } `json:"s3"` } `json:"Records"`}
// ProcessEvent handles an S3 event by downloading each referenced object// and returning the resulting documents.func (l *S3Loader) ProcessEvent(ctx context.Context, event S3Event) ([]schema.Document, error) { var docs []schema.Document for _, record := range event.Records { key := record.S3.Object.Key doc, err := l.loadObject(ctx, key) if err != nil { return nil, fmt.Errorf("process event for %s: %w", key, err) } docs = append(docs, doc) } return docs, nil}Lambda Handler
Section titled “Lambda Handler”Deploy the loader as an AWS Lambda function to automatically process uploads:
package main
import ( "context" "fmt" "log"
"github.com/aws/aws-lambda-go/lambda")
func handler(ctx context.Context, event S3Event) error { if len(event.Records) == 0 { return nil }
bucket := event.Records[0].S3.Bucket.Name loader, err := NewS3Loader(ctx, bucket) if err != nil { return fmt.Errorf("create loader: %w", err) }
docs, err := loader.ProcessEvent(ctx, event) if err != nil { return fmt.Errorf("process event: %w", err) }
log.Printf("processed %d documents from %s", len(docs), bucket)
// Pass documents to your RAG pipeline: // vectorstore, splitter, embedder, etc.
return nil}
func main() { lambda.Start(handler)}Advanced Topics
Section titled “Advanced Topics”Observability with OpenTelemetry
Section titled “Observability with OpenTelemetry”Add tracing to the loader for production visibility:
import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute")
type TracedS3Loader struct { *S3Loader tracer trace.Tracer}
func NewTracedS3Loader(ctx context.Context, bucket string) (*TracedS3Loader, error) { base, err := NewS3Loader(ctx, bucket) if err != nil { return nil, err } return &TracedS3Loader{ S3Loader: base, tracer: otel.Tracer("beluga.loader.s3"), }, nil}
func (l *TracedS3Loader) Load(ctx context.Context, prefix string) ([]schema.Document, error) { ctx, span := l.tracer.Start(ctx, "s3.load") defer span.End()
span.SetAttributes( attribute.String("s3.bucket", l.bucket), attribute.String("s3.prefix", prefix), )
docs, err := l.S3Loader.Load(ctx, prefix) if err != nil { span.RecordError(err) return nil, err }
span.SetAttributes(attribute.Int("s3.document_count", len(docs))) return docs, nil}Integrating with a RAG Pipeline
Section titled “Integrating with a RAG Pipeline”Connect the S3 loader to splitters, embedders, and vector stores:
import ( "github.com/lookatitude/beluga-ai/config" "github.com/lookatitude/beluga-ai/rag/embedding" "github.com/lookatitude/beluga-ai/rag/splitter"
_ "github.com/lookatitude/beluga-ai/rag/embedding/providers/openai")
// Load documents from S3docs, err := loader.Load(ctx, "documents/")if err != nil { log.Fatal(err)}
// Split into chunkssplit, err := splitter.New("recursive", config.ProviderConfig{ Options: map[string]any{ "chunk_size": 1000.0, "chunk_overlap": 200.0, },})if err != nil { log.Fatal(err)}
// Embed and storeemb, err := embedding.New("openai", config.ProviderConfig{ APIKey: os.Getenv("OPENAI_API_KEY"), Model: "text-embedding-3-small",})if err != nil { log.Fatal(err)}Configuration
Section titled “Configuration”| Option | Description | Default | Required |
|---|---|---|---|
Bucket | S3 bucket name | - | Yes |
Region | AWS region (set via AWS_REGION) | us-east-1 | No |
Prefix | Object key prefix filter | - | No |
Troubleshooting
Section titled “Troubleshooting”“Access Denied” errors — Ensure the IAM role or user has s3:GetObject and s3:ListBucket permissions on the target bucket. In Lambda, attach the policy to the execution role.
“NoSuchBucket” errors — Verify the bucket name and that the AWS region matches the bucket’s region.
Large object handling — For objects exceeding available memory, consider streaming the content or using the S3 Transfer Manager for multipart downloads.
Production Considerations
Section titled “Production Considerations”- Use IAM roles instead of access keys for authentication in production environments
- Configure S3 event notifications to SQS for durable, at-least-once delivery
- Implement dead-letter queues for events that fail processing
- Apply object key prefix filters to limit which uploads trigger processing
- Monitor S3 request rates and implement exponential backoff for throttling
- Use appropriate S3 storage classes (Standard, Intelligent-Tiering) based on access patterns
Related Resources
Section titled “Related Resources”- Document Loaders — All document loader integrations
- Embedding Providers — Generating embeddings for loaded documents
- Google Drive Scraper — Google Drive document loading