Document Loaders
Before documents can be embedded and searched, they need to be loaded from wherever they live — local filesystems, cloud storage buckets, databases, APIs, or SaaS platforms. Document loaders handle this ingestion step, converting source data into Beluga AI’s schema.Document type with content and metadata preserved.
This guide covers the available loader integrations and shows how to implement custom loaders for specialized sources.
Available Loaders
Section titled “Available Loaders”Filesystem Loaders
Section titled “Filesystem Loaders”Directory Loader
Section titled “Directory Loader”Load documents recursively from local filesystem.
import ( "github.com/lookatitude/beluga-ai/pkg/documentloaders")
fsys := os.DirFS("/path/to/documents")loader, err := documentloaders.NewDirectoryLoader(fsys, documentloaders.WithMaxDepth(10), documentloaders.WithExtensions(".md", ".txt", ".pdf"), documentloaders.WithExclusions("**/node_modules/**", "**/.git/**"),)
docs, err := loader.Load(ctx)Supported formats: txt, md, html, pdf, docx, csv, json
PDF Loader
Section titled “PDF Loader”Extract text from PDF files.
import ( "github.com/lookatitude/beluga-ai/pkg/documentloaders/providers/pdf")
loader := pdf.NewLoader("/path/to/file.pdf", pdf.WithExtractImages(false), pdf.WithPreserveFormatting(true), pdf.WithPageSeparator("\n---\n"),)
docs, err := loader.Load(ctx)Cloud Storage Loaders
Section titled “Cloud Storage Loaders”AWS S3 Loader
Section titled “AWS S3 Loader”Load documents from S3 buckets.
import ( "github.com/lookatitude/beluga-ai/pkg/documentloaders/providers/s3" "github.com/aws/aws-sdk-go/aws/session")
sess, _ := session.NewSession(&aws.Config{ Region: aws.String("us-west-2"),})
loader := s3.NewLoader(sess, "my-bucket", s3.WithPrefix("documents/"), s3.WithRecursive(true), s3.WithIncludeExtensions(".txt", ".md", ".pdf"),)
docs, err := loader.Load(ctx)Configuration:
- Supports IAM roles and access keys
- Handles large files via multipart downloads
- Supports S3-compatible storage (MinIO, DigitalOcean Spaces)
Google Cloud Storage
Section titled “Google Cloud Storage”Load from GCS buckets.
import ( "github.com/lookatitude/beluga-ai/pkg/documentloaders/providers/gcs" "cloud.google.com/go/storage")
client, _ := storage.NewClient(ctx)
loader := gcs.NewLoader(client, "my-bucket", gcs.WithPrefix("documents/"), gcs.WithRecursive(true),)
docs, err := loader.Load(ctx)Azure Blob Storage
Section titled “Azure Blob Storage”Load from Azure containers.
import ( "github.com/lookatitude/beluga-ai/pkg/documentloaders/providers/azureblob")
loader := azureblob.NewLoader( "account-name", "container-name", azureblob.WithAccountKey(os.Getenv("AZURE_STORAGE_KEY")), azureblob.WithPrefix("documents/"),)
docs, err := loader.Load(ctx)Web Loaders
Section titled “Web Loaders”HTML Loader
Section titled “HTML Loader”Scrape and parse HTML content.
import ( "github.com/lookatitude/beluga-ai/pkg/documentloaders/providers/html")
loader := html.NewLoader("https://example.com", html.WithRemoveScripts(true), html.WithRemoveStyles(true), html.WithExtractMetadata(true), html.WithFollowLinks(false),)
docs, err := loader.Load(ctx)Sitemap Crawler
Section titled “Sitemap Crawler”Crawl entire websites via sitemaps.
import ( "github.com/lookatitude/beluga-ai/pkg/documentloaders/providers/sitemap")
loader := sitemap.NewLoader("https://example.com/sitemap.xml", sitemap.WithMaxDepth(3), sitemap.WithConcurrency(5), sitemap.WithRateLimit(10), // requests per second)
docs, err := loader.Load(ctx)Database Loaders
Section titled “Database Loaders”SQL Loader
Section titled “SQL Loader”Load data from SQL databases.
import ( "github.com/lookatitude/beluga-ai/pkg/documentloaders/providers/sql" "database/sql" _ "github.com/lib/pq")
db, _ := sql.Open("postgres", "postgres://user:pass@localhost/db")
loader := sql.NewLoader(db, sql.WithQuery("SELECT id, title, content FROM articles"), sql.WithContentColumn("content"), sql.WithMetadataColumns("id", "title"),)
docs, err := loader.Load(ctx)MongoDB Loader
Section titled “MongoDB Loader”Load documents from MongoDB collections.
import ( "github.com/lookatitude/beluga-ai/pkg/documentloaders/providers/mongodb" "go.mongodb.org/mongo-driver/mongo")
client, _ := mongo.Connect(ctx)
loader := mongodb.NewLoader( client.Database("mydb").Collection("documents"), mongodb.WithFilter(bson.M{"status": "published"}), mongodb.WithContentField("content"), mongodb.WithMetadataFields("title", "author", "created_at"),)
docs, err := loader.Load(ctx)API Loaders
Section titled “API Loaders”REST API Loader
Section titled “REST API Loader”Load data from REST APIs.
import ( "github.com/lookatitude/beluga-ai/pkg/documentloaders/providers/restapi")
loader := restapi.NewLoader("https://api.example.com/documents", restapi.WithHeaders(map[string]string{ "Authorization": "Bearer " + os.Getenv("API_TOKEN"), }), restapi.WithPagination(restapi.PaginationConfig{ Type: "offset", PageSize: 100, PageParam: "offset", }), restapi.WithContentPath("data.content"),)
docs, err := loader.Load(ctx)Google Drive Loader
Section titled “Google Drive Loader”Load documents from Google Drive.
import ( "github.com/lookatitude/beluga-ai/pkg/documentloaders/providers/googledrive" "google.golang.org/api/drive/v3")
service, _ := drive.NewService(ctx)
loader := googledrive.NewLoader(service, googledrive.WithFolderID("folder-id"), googledrive.WithRecursive(true), googledrive.WithMimeTypes("application/pdf", "text/plain"),)
docs, err := loader.Load(ctx)Implementing Custom Loaders
Section titled “Implementing Custom Loaders”Create custom loaders for specialized sources.
package custom
import ( "context" "github.com/lookatitude/beluga-ai/pkg/schema")
type CustomLoader struct { source string apiClient *APIClient maxResults int}
func NewCustomLoader(source string, apiKey string) *CustomLoader { return &CustomLoader{ source: source, apiClient: NewAPIClient(apiKey), maxResults: 1000, }}
func (cl *CustomLoader) Load(ctx context.Context) ([]schema.Document, error) { // Fetch data from custom source items, err := cl.apiClient.FetchItems(ctx, cl.source, cl.maxResults) if err != nil { return nil, fmt.Errorf("fetch items: %w", err) }
// Convert to Beluga documents docs := make([]schema.Document, len(items)) for i, item := range items { docs[i] = schema.Document{ PageContent: item.Content, Metadata: map[string]interface{}{ "source": cl.source, "id": item.ID, "created_at": item.CreatedAt, "author": item.Author, }, } }
return docs, nil}
func (cl *CustomLoader) LazyLoad(ctx context.Context) (chan schema.DocumentResult, error) { ch := make(chan schema.DocumentResult, 10)
go func() { defer close(ch)
offset := 0 pageSize := 100
for { items, err := cl.apiClient.FetchItemsPaginated(ctx, cl.source, offset, pageSize) if err != nil { ch <- schema.DocumentResult{Error: err} return }
if len(items) == 0 { return }
for _, item := range items { doc := schema.Document{ PageContent: item.Content, Metadata: map[string]interface{}{ "source": cl.source, "id": item.ID, }, }
select { case <-ctx.Done(): ch <- schema.DocumentResult{Error: ctx.Err()} return case ch <- schema.DocumentResult{Document: doc}: } }
offset += pageSize } }()
return ch, nil}Event-Driven Loading
Section titled “Event-Driven Loading”Load documents reactively based on events.
S3 Event-Driven Loader
Section titled “S3 Event-Driven Loader”Process new files as they arrive in S3.
import ( "github.com/aws/aws-sdk-go/service/sqs")
type S3EventLoader struct { sqsClient *sqs.SQS queueURL string s3Loader *s3.Loader}
func NewS3EventLoader(sess *session.Session, queueURL string, bucket string) *S3EventLoader { return &S3EventLoader{ sqsClient: sqs.New(sess), queueURL: queueURL, s3Loader: s3.NewLoader(sess, bucket), }}
func (sel *S3EventLoader) ProcessEvents(ctx context.Context) error { for { // Poll SQS for S3 events result, err := sel.sqsClient.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{ QueueUrl: aws.String(sel.queueURL), MaxNumberOfMessages: aws.Int64(10), WaitTimeSeconds: aws.Int64(20), }) if err != nil { return err }
for _, msg := range result.Messages { // Parse S3 event event, err := parseS3Event(msg.Body) if err != nil { log.Printf("Parse event error: %v", err) continue }
// Load document doc, err := sel.s3Loader.LoadSingleFile(ctx, event.ObjectKey) if err != nil { log.Printf("Load document error: %v", err) continue }
// Process document (e.g., embed and index) if err := processDocument(ctx, doc); err != nil { log.Printf("Process document error: %v", err) continue }
// Delete message from queue sel.sqsClient.DeleteMessage(&sqs.DeleteMessageInput{ QueueUrl: aws.String(sel.queueURL), ReceiptHandle: msg.ReceiptHandle, }) }
select { case <-ctx.Done(): return ctx.Err() default: } }}Best Practices
Section titled “Best Practices”When working with document loaders:
- Use lazy loading for large datasets
- Implement retry logic for network failures
- Add rate limiting to respect API limits
- Cache credentials securely
- Validate document content before processing
- Track loading metrics (throughput, errors)
- Handle pagination correctly
- Process in batches for efficiency
- Add comprehensive metadata for filtering
- Implement checkpointing for resumability
Next Steps
Section titled “Next Steps”- Learn about Text Splitter Integrations for chunking
- Explore Document Processing for complete pipelines
- Read RAG Pipeline for integration patterns