AI-Generated Placeholder Documentation

This documentation page has been automatically generated by a Large Language Model (LLM) and serves as placeholder content. The information provided here may be incomplete, inaccurate, or subject to change.

For accurate and complete information, please refer to the Vanna source code on GitHub.

Audit Logging

Vanna 2.0 provides comprehensive audit logging to track user actions, tool execution, and access control decisions for compliance and security.

AuditLogger Interface

from vanna.core.audit import AuditLogger, AuditEvent

class AuditLogger(ABC):
    @abstractmethod
    async def log_event(self, event: AuditEvent) -> None:
        """Log an audit event"""
        pass

Configuration

Enable and configure audit logging through AuditConfig:

from vanna.core.agent import AgentConfig, AuditConfig

config = AgentConfig(
    audit_config=AuditConfig(
        enabled=True,
        log_tool_access_checks=True,
        log_tool_invocations=True,
        log_tool_results=True,
        log_ui_feature_checks=False,  # Can be noisy
        log_ai_responses=True,
        include_full_ai_responses=False,  # Privacy concern
        sanitize_tool_parameters=True  # Redact sensitive params
    )
)

agent = Agent(
    llm_service=llm,
    config=config,
    audit_logger=PostgresAuditLogger(connection_string)
)

Built-in Audit Loggers

PostgresAuditLogger

Store audit logs in PostgreSQL:

from vanna.core.audit import PostgresAuditLogger

logger = PostgresAuditLogger(
    connection_string="postgresql://user:pass@host:5432/db",
    table_name="vanna_audit_log"
)

agent = Agent(
    llm_service=llm,
    audit_logger=logger
)

The logger automatically creates the necessary table schema.

Custom Audit Loggers

Example 1: File-Based Logging

import json
from datetime import datetime
from vanna.core.audit import AuditLogger, AuditEvent

class FileAuditLogger(AuditLogger):
    def __init__(self, log_file_path: str):
        self.log_file = log_file_path
    
    async def log_event(self, event: AuditEvent) -> None:
        log_entry = {
            'timestamp': event.timestamp.isoformat(),
            'event_type': event.event_type,
            'user_id': event.user_id,
            'user_email': event.user_email,
            'action': event.action,
            'resource': event.resource,
            'result': event.result,
            'metadata': event.metadata
        }
        
        with open(self.log_file, 'a') as f:
            f.write(json.dumps(log_entry) + '\n')

Example 2: CloudWatch Logs

import boto3
from vanna.core.audit import AuditLogger

class CloudWatchAuditLogger(AuditLogger):
    def __init__(self, log_group: str, log_stream: str):
        self.client = boto3.client('logs')
        self.log_group = log_group
        self.log_stream = log_stream
    
    async def log_event(self, event: AuditEvent) -> None:
        self.client.put_log_events(
            logGroupName=self.log_group,
            logStreamName=self.log_stream,
            logEvents=[{
                'timestamp': int(event.timestamp.timestamp() * 1000),
                'message': json.dumps({
                    'event_type': event.event_type,
                    'user_id': event.user_id,
                    'action': event.action,
                    'resource': event.resource,
                    'result': event.result,
                    'metadata': event.metadata
                })
            }]
        )

Example 3: Elasticsearch Integration

from elasticsearch import AsyncElasticsearch
from vanna.core.audit import AuditLogger

class ElasticsearchAuditLogger(AuditLogger):
    def __init__(self, es_client: AsyncElasticsearch, index: str):
        self.es = es_client
        self.index = index
    
    async def log_event(self, event: AuditEvent) -> None:
        doc = {
            'timestamp': event.timestamp,
            'event_type': event.event_type,
            'user': {
                'id': event.user_id,
                'email': event.user_email,
                'groups': event.metadata.get('user_groups', [])
            },
            'action': event.action,
            'resource': event.resource,
            'result': event.result,
            'metadata': event.metadata
        }
        
        await self.es.index(
            index=self.index,
            document=doc
        )

Audit Event Types

Vanna logs several types of audit events:

Tool Access Checks

ToolAccessCheckEvent(
    timestamp=datetime.now(),
    user_id=user.id,
    user_email=user.email,
    tool_name=tool.name,
    access_granted=True,
    user_groups=user.group_memberships,
    required_groups=tool.access_groups
)

Tool Invocations

ToolInvocationEvent(
    timestamp=datetime.now(),
    user_id=user.id,
    tool_name=tool.name,
    arguments=sanitized_args,  # Sensitive params redacted
    conversation_id=context.conversation_id
)

Tool Results

ToolResultEvent(
    timestamp=datetime.now(),
    user_id=user.id,
    tool_name=tool.name,
    success=result.success,
    error=result.error,
    duration_ms=execution_time
)

UI Feature Access

UiFeatureAccessCheckEvent(
    timestamp=datetime.now(),
    user_id=user.id,
    feature_name=feature,
    access_granted=can_access,
    user_groups=user.group_memberships
)

AI Responses

AiResponseEvent(
    timestamp=datetime.now(),
    user_id=user.id,
    conversation_id=conversation_id,
    model=model_name,
    token_count=total_tokens,
    response_summary=summary  # Full response optional
)

Querying Audit Logs

SQL Queries (PostgreSQL)

-- Find all failed tool executions in last 24 hours
SELECT 
    timestamp,
    user_email,
    tool_name,
    error
FROM vanna_audit_log
WHERE event_type = 'tool_result'
    AND result = 'failure'
    AND timestamp > NOW() - INTERVAL '24 hours'
ORDER BY timestamp DESC;

-- User activity summary
SELECT 
    user_email,
    COUNT(*) as total_actions,
    COUNT(DISTINCT tool_name) as unique_tools_used
FROM vanna_audit_log
WHERE event_type = 'tool_invocation'
    AND timestamp > NOW() - INTERVAL '7 days'
GROUP BY user_email
ORDER BY total_actions DESC;

-- Access denied events
SELECT 
    timestamp,
    user_email,
    resource,
    metadata->>'required_groups' as required_groups,
    metadata->>'user_groups' as user_groups
FROM vanna_audit_log
WHERE event_type = 'access_check'
    AND result = 'denied'
ORDER BY timestamp DESC
LIMIT 100;

Compliance Use Cases

SOC 2 Compliance

Track who accessed what data:

config = AuditConfig(
    enabled=True,
    log_tool_access_checks=True,
    log_tool_invocations=True,
    log_tool_results=True,
    sanitize_tool_parameters=True
)

GDPR Data Access Requests

Find all data accessed by a user:

SELECT 
    timestamp,
    tool_name,
    arguments,
    result
FROM vanna_audit_log
WHERE user_id = 'user_12345'
    AND event_type IN ('tool_invocation', 'tool_result')
ORDER BY timestamp;

HIPAA Audit Trails

Track access to protected health information:

class HIPAAAuditLogger(AuditLogger):
    async def log_event(self, event: AuditEvent) -> None:
        # Enhanced logging for HIPAA
        if self._contains_phi(event):
            event.metadata['phi_accessed'] = True
            event.metadata['access_justification'] = self._get_justification(event)
        
        await self.base_logger.log_event(event)

Security Monitoring

Detect Suspicious Activity

-- Unusual number of failed access attempts
SELECT 
    user_email,
    COUNT(*) as failed_attempts
FROM vanna_audit_log
WHERE event_type = 'access_check'
    AND result = 'denied'
    AND timestamp > NOW() - INTERVAL '1 hour'
GROUP BY user_email
HAVING COUNT(*) > 10;

-- Access outside business hours
SELECT *
FROM vanna_audit_log
WHERE (EXTRACT(HOUR FROM timestamp) < 6 OR EXTRACT(HOUR FROM timestamp) > 20)
    AND event_type = 'tool_invocation'
ORDER BY timestamp DESC;

Data Retention

Configure retention policies:

class RetentionPolicyLogger(AuditLogger):
    def __init__(self, base_logger, retention_days=90):
        self.base_logger = base_logger
        self.retention_days = retention_days
    
    async def cleanup_old_logs(self):
        """Remove logs older than retention period"""
        cutoff_date = datetime.now() - timedelta(days=self.retention_days)
        await self.base_logger.delete_before(cutoff_date)

Best Practices

  1. Enable audit logging in production - Required for compliance
  2. Sanitize sensitive parameters - Protect passwords, tokens, PII
  3. Set appropriate retention - Balance compliance needs vs storage
  4. Monitor audit logs - Set up alerts for security events
  5. Secure audit logs - Restrict access to audit data
  6. Regular reviews - Periodically audit the audit logs
  7. Test compliance queries - Ensure you can answer audit questions

Performance Considerations

For high-volume systems:

class BufferedAuditLogger(AuditLogger):
    def __init__(self, base_logger, buffer_size=100):
        self.base_logger = base_logger
        self.buffer = []
        self.buffer_size = buffer_size
    
    async def log_event(self, event: AuditEvent) -> None:
        self.buffer.append(event)
        
        if len(self.buffer) >= self.buffer_size:
            await self.flush()
    
    async def flush(self):
        if self.buffer:
            await self.base_logger.log_batch(self.buffer)
            self.buffer = []

See Also