Real-Time PII Leakage Detection
When a healthcare chatbot sends a patient’s Social Security number to an LLM provider’s API, the organization faces a HIPAA violation with penalties up to $1.8M per incident, mandatory breach notification to affected individuals, and potential loss of healthcare contracts. GDPR violations carry fines up to 4% of global annual revenue. The risk is not hypothetical — users routinely include PII in natural language queries (“My SSN is 123-45-6789, can you check my claim status?”), and without interception, this data reaches third-party servers where it may be logged, cached, or used for model training.
The challenge compounds with LLM responses: even if input is clean, the model might hallucinate PII patterns or echo back sensitive data from its context window. Both directions — request and response — require scanning, and the scanning must happen at the framework level rather than relying on application developers to remember to sanitize every call.
Solution Architecture
Section titled “Solution Architecture”Beluga AI’s guard/ package implements a three-stage pipeline designed specifically for this bidirectional threat: input guards scan requests before LLM calls, output guards scan responses before returning to users, and tool guards scan tool execution parameters. This three-stage architecture is essential because PII can enter the system at any point — user input, tool results, or LLM responses. Pattern matching handles structured PII (SSNs, credit cards, phone numbers) with high confidence, while optional ML classification catches unstructured PII (names mentioned in free text) that regex alone would miss.
┌──────────────┐ ┌──────────────┐ ┌──────────────┐│ LLM Request │───▶│ Input Guard │───▶│ Pattern ││ │ │ (PII Check) │ │ Matcher │└──────────────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ ▼ │ ┌──────────────┐ │ │ ML │ │ │ Classifier │ │ └──────┬───────┘ │ │ ▼ ▼ ┌──────────────┐ ┌──────────────┐ │ Block │◀───│ PII │ │ Request │ │ Detected? │ └──────┬───────┘ └──────────────┘ │ ▼ ┌──────────────┐ │ Alert & │ │ Audit Log │ └──────────────┘PII Detection Implementation
Section titled “PII Detection Implementation”Implement PII detection with pattern matching and audit logging:
package main
import ( "context" "fmt" "regexp"
"github.com/lookatitude/beluga-ai/o11y" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace")
// PIIType represents a type of PIItype PIIType string
const ( PIITypeSSN PIIType = "ssn" PIITypeEmail PIIType = "email" PIITypePhone PIIType = "phone" PIITypeCreditCard PIIType = "credit_card" PIITypeMRN PIIType = "medical_record_number" PIITypePatientID PIIType = "patient_id")
// PIIDetection represents a detected PII instancetype PIIDetection struct { Type PIIType Value string // Redacted value Confidence float64 Location string // "input" or "output"}
// PIIDetector detects PII in texttype PIIDetector struct { patterns map[PIIType]*regexp.Regexp tracer trace.Tracer detectionsCounter metric.Int64Counter blockedCounter metric.Int64Counter}
// NewPIIDetector creates a new PII detectorfunc NewPIIDetector(ctx context.Context) (*PIIDetector, error) { patterns := make(map[PIIType]*regexp.Regexp)
// SSN pattern: XXX-XX-XXXX patterns[PIITypeSSN] = regexp.MustCompile(`\b\d{3}-\d{2}-\d{4}\b`)
// Email pattern patterns[PIITypeEmail] = regexp.MustCompile(`\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b`)
// Phone pattern: (XXX) XXX-XXXX or XXX-XXX-XXXX patterns[PIITypePhone] = regexp.MustCompile(`\b(\(?\d{3}\)?[-.\s])\d{3}[-.\s]\d{4}\b`)
// Credit card pattern patterns[PIITypeCreditCard] = regexp.MustCompile(`\b\d{4}[\s-]\d{4}[\s-]\d{4}[\s-]\d{4}\b`)
// Medical Record Number pattern patterns[PIITypeMRN] = regexp.MustCompile(`\bMRN[:\s-]?\d{6,}\b`)
meter := otel.GetMeterProvider().Meter("pii-detection") detectionsCounter, err := meter.Int64Counter("pii_detections_total") if err != nil { return nil, err } blockedCounter, err := meter.Int64Counter("pii_requests_blocked_total") if err != nil { return nil, err }
return &PIIDetector{ patterns: patterns, tracer: otel.Tracer("pii-detection"), detectionsCounter: detectionsCounter, blockedCounter: blockedCounter, }, nil}
// DetectPII detects PII in textfunc (d *PIIDetector) DetectPII(ctx context.Context, text string, location string) ([]PIIDetection, error) { ctx, span := d.tracer.Start(ctx, "pii.detect") defer span.End()
span.SetAttributes( attribute.String("location", location), attribute.Int("text_length", len(text)), )
var detections []PIIDetection
// Pattern-based detection for piiType, pattern := range d.patterns { matches := pattern.FindAllString(text, -1) for _, match := range matches { // Redact for logging (show only last 4 chars) redacted := d.redactPII(match, piiType)
detections = append(detections, PIIDetection{ Type: piiType, Value: redacted, Confidence: 1.0, Location: location, }) } }
if len(detections) > 0 { span.SetAttributes( attribute.Int("detection_count", len(detections)), )
d.detectionsCounter.Add(ctx, int64(len(detections)), metric.WithAttributes( attribute.String("location", location), ), ) }
return detections, nil}
func (d *PIIDetector) redactPII(value string, piiType PIIType) string { if len(value) <= 4 { return "****" } return "****" + value[len(value)-4:]}Guard Integration
Section titled “Guard Integration”Integrate PII detection with Beluga AI’s guard pipeline:
package main
import ( "context" "fmt"
"github.com/lookatitude/beluga-ai/guard" "github.com/lookatitude/beluga-ai/llm" "github.com/lookatitude/beluga-ai/schema")
// PIIGuard implements guard.Guard for PII detectiontype PIIGuard struct { detector *PIIDetector}
func NewPIIGuard(detector *PIIDetector) guard.Guard { return &PIIGuard{detector: detector}}
func (g *PIIGuard) Check(ctx context.Context, msgs []schema.Message) error { for _, msg := range msgs { // Extract text from message text := extractMessageText(msg)
// Check for PII detections, err := g.detector.DetectPII(ctx, text, "input") if err != nil { return fmt.Errorf("pii detection failed: %w", err) }
if len(detections) > 0 { g.detector.blockedCounter.Add(ctx, 1) return fmt.Errorf("PII detected: %d detection(s) found", len(detections)) } }
return nil}
// Use with LLMfunc createGuardedModel(ctx context.Context) (llm.ChatModel, error) { detector, err := NewPIIDetector(ctx) if err != nil { return nil, err }
model, err := llm.New("openai", llm.ProviderConfig{ APIKey: os.Getenv("OPENAI_API_KEY"), Model: "gpt-4", }) if err != nil { return nil, err }
// Apply PII guard return guard.Apply(model, NewPIIGuard(detector)), nil}Production Considerations
Section titled “Production Considerations”False Positive Tuning
Section titled “False Positive Tuning”Reduce false positives by refining regex patterns and implementing context-aware detection:
// Avoid matching phone numbers that are part of addressespatterns[PIITypePhone] = regexp.MustCompile(`\b(?!\d{5})\(?\d{3}\)?[-.\s]\d{3}[-.\s]\d{4}\b`)
// Avoid matching credit card test numbersfunc isTestCreditCard(number string) bool { testPrefixes := []string{"4111111111111111", "5555555555554444"} for _, prefix := range testPrefixes { if number == prefix { return true } } return false}Audit Logging
Section titled “Audit Logging”Log all PII detections for compliance audits:
func (d *PIIDetector) auditLog(ctx context.Context, detections []PIIDetection, location string) { for _, detection := range detections { log := map[string]interface{}{ "timestamp": time.Now().UTC(), "pii_type": detection.Type, "location": detection.Location, "confidence": detection.Confidence, "value": detection.Value, // Redacted } // Send to audit log system auditLogger.Log(ctx, log) }}Performance Optimization
Section titled “Performance Optimization”Optimize detection for high-volume systems:
// Compile patterns once at initializationvar compiledPatterns = compilePatterns()
// Use sync.Pool for temporary buffersvar bufferPool = sync.Pool{ New: func() interface{} { return new(bytes.Buffer) },}
// Batch process requestsfunc (d *PIIDetector) DetectBatch(ctx context.Context, texts []string) ([][]PIIDetection, error) { results := make([][]PIIDetection, len(texts)) var wg sync.WaitGroup
for i, text := range texts { wg.Add(1) go func(idx int, txt string) { defer wg.Done() results[idx], _ = d.DetectPII(ctx, txt, "input") }(i, text) }
wg.Wait() return results, nil}Related Resources
Section titled “Related Resources”- Guard Pipeline Guide for input/output validation patterns
- Token Cost Attribution for user-level monitoring
- Monitoring Dashboards for observability setup