AI-Generated Placeholder Documentation
This documentation page has been automatically generated by a Large Language Model (LLM) and serves as placeholder content. The information provided here may be incomplete, inaccurate, or subject to change.
For accurate and complete information, please refer to the Vanna source code on GitHub.
Audit Logging
Vanna 2.0 provides comprehensive audit logging to track user actions, tool execution, and access control decisions for compliance and security.
AuditLogger Interface
from vanna.core.audit import AuditLogger, AuditEvent
class AuditLogger(ABC):
@abstractmethod
async def log_event(self, event: AuditEvent) -> None:
"""Log an audit event"""
pass Configuration
Enable and configure audit logging through AuditConfig:
from vanna.core.agent import AgentConfig, AuditConfig
config = AgentConfig(
audit_config=AuditConfig(
enabled=True,
log_tool_access_checks=True,
log_tool_invocations=True,
log_tool_results=True,
log_ui_feature_checks=False, # Can be noisy
log_ai_responses=True,
include_full_ai_responses=False, # Privacy concern
sanitize_tool_parameters=True # Redact sensitive params
)
)
agent = Agent(
llm_service=llm,
config=config,
audit_logger=PostgresAuditLogger(connection_string)
) Built-in Audit Loggers
PostgresAuditLogger
Store audit logs in PostgreSQL:
from vanna.core.audit import PostgresAuditLogger
logger = PostgresAuditLogger(
connection_string="postgresql://user:pass@host:5432/db",
table_name="vanna_audit_log"
)
agent = Agent(
llm_service=llm,
audit_logger=logger
) The logger automatically creates the necessary table schema.
Custom Audit Loggers
Example 1: File-Based Logging
import json
from datetime import datetime
from vanna.core.audit import AuditLogger, AuditEvent
class FileAuditLogger(AuditLogger):
def __init__(self, log_file_path: str):
self.log_file = log_file_path
async def log_event(self, event: AuditEvent) -> None:
log_entry = {
'timestamp': event.timestamp.isoformat(),
'event_type': event.event_type,
'user_id': event.user_id,
'user_email': event.user_email,
'action': event.action,
'resource': event.resource,
'result': event.result,
'metadata': event.metadata
}
with open(self.log_file, 'a') as f:
f.write(json.dumps(log_entry) + '\n') Example 2: CloudWatch Logs
import boto3
from vanna.core.audit import AuditLogger
class CloudWatchAuditLogger(AuditLogger):
def __init__(self, log_group: str, log_stream: str):
self.client = boto3.client('logs')
self.log_group = log_group
self.log_stream = log_stream
async def log_event(self, event: AuditEvent) -> None:
self.client.put_log_events(
logGroupName=self.log_group,
logStreamName=self.log_stream,
logEvents=[{
'timestamp': int(event.timestamp.timestamp() * 1000),
'message': json.dumps({
'event_type': event.event_type,
'user_id': event.user_id,
'action': event.action,
'resource': event.resource,
'result': event.result,
'metadata': event.metadata
})
}]
) Example 3: Elasticsearch Integration
from elasticsearch import AsyncElasticsearch
from vanna.core.audit import AuditLogger
class ElasticsearchAuditLogger(AuditLogger):
def __init__(self, es_client: AsyncElasticsearch, index: str):
self.es = es_client
self.index = index
async def log_event(self, event: AuditEvent) -> None:
doc = {
'timestamp': event.timestamp,
'event_type': event.event_type,
'user': {
'id': event.user_id,
'email': event.user_email,
'groups': event.metadata.get('user_groups', [])
},
'action': event.action,
'resource': event.resource,
'result': event.result,
'metadata': event.metadata
}
await self.es.index(
index=self.index,
document=doc
) Audit Event Types
Vanna logs several types of audit events:
Tool Access Checks
ToolAccessCheckEvent(
timestamp=datetime.now(),
user_id=user.id,
user_email=user.email,
tool_name=tool.name,
access_granted=True,
user_groups=user.group_memberships,
required_groups=tool.access_groups
) Tool Invocations
ToolInvocationEvent(
timestamp=datetime.now(),
user_id=user.id,
tool_name=tool.name,
arguments=sanitized_args, # Sensitive params redacted
conversation_id=context.conversation_id
) Tool Results
ToolResultEvent(
timestamp=datetime.now(),
user_id=user.id,
tool_name=tool.name,
success=result.success,
error=result.error,
duration_ms=execution_time
) UI Feature Access
UiFeatureAccessCheckEvent(
timestamp=datetime.now(),
user_id=user.id,
feature_name=feature,
access_granted=can_access,
user_groups=user.group_memberships
) AI Responses
AiResponseEvent(
timestamp=datetime.now(),
user_id=user.id,
conversation_id=conversation_id,
model=model_name,
token_count=total_tokens,
response_summary=summary # Full response optional
) Querying Audit Logs
SQL Queries (PostgreSQL)
-- Find all failed tool executions in last 24 hours
SELECT
timestamp,
user_email,
tool_name,
error
FROM vanna_audit_log
WHERE event_type = 'tool_result'
AND result = 'failure'
AND timestamp > NOW() - INTERVAL '24 hours'
ORDER BY timestamp DESC;
-- User activity summary
SELECT
user_email,
COUNT(*) as total_actions,
COUNT(DISTINCT tool_name) as unique_tools_used
FROM vanna_audit_log
WHERE event_type = 'tool_invocation'
AND timestamp > NOW() - INTERVAL '7 days'
GROUP BY user_email
ORDER BY total_actions DESC;
-- Access denied events
SELECT
timestamp,
user_email,
resource,
metadata->>'required_groups' as required_groups,
metadata->>'user_groups' as user_groups
FROM vanna_audit_log
WHERE event_type = 'access_check'
AND result = 'denied'
ORDER BY timestamp DESC
LIMIT 100; Compliance Use Cases
SOC 2 Compliance
Track who accessed what data:
config = AuditConfig(
enabled=True,
log_tool_access_checks=True,
log_tool_invocations=True,
log_tool_results=True,
sanitize_tool_parameters=True
) GDPR Data Access Requests
Find all data accessed by a user:
SELECT
timestamp,
tool_name,
arguments,
result
FROM vanna_audit_log
WHERE user_id = 'user_12345'
AND event_type IN ('tool_invocation', 'tool_result')
ORDER BY timestamp; HIPAA Audit Trails
Track access to protected health information:
class HIPAAAuditLogger(AuditLogger):
async def log_event(self, event: AuditEvent) -> None:
# Enhanced logging for HIPAA
if self._contains_phi(event):
event.metadata['phi_accessed'] = True
event.metadata['access_justification'] = self._get_justification(event)
await self.base_logger.log_event(event) Security Monitoring
Detect Suspicious Activity
-- Unusual number of failed access attempts
SELECT
user_email,
COUNT(*) as failed_attempts
FROM vanna_audit_log
WHERE event_type = 'access_check'
AND result = 'denied'
AND timestamp > NOW() - INTERVAL '1 hour'
GROUP BY user_email
HAVING COUNT(*) > 10;
-- Access outside business hours
SELECT *
FROM vanna_audit_log
WHERE (EXTRACT(HOUR FROM timestamp) < 6 OR EXTRACT(HOUR FROM timestamp) > 20)
AND event_type = 'tool_invocation'
ORDER BY timestamp DESC; Data Retention
Configure retention policies:
class RetentionPolicyLogger(AuditLogger):
def __init__(self, base_logger, retention_days=90):
self.base_logger = base_logger
self.retention_days = retention_days
async def cleanup_old_logs(self):
"""Remove logs older than retention period"""
cutoff_date = datetime.now() - timedelta(days=self.retention_days)
await self.base_logger.delete_before(cutoff_date) Best Practices
- Enable audit logging in production - Required for compliance
- Sanitize sensitive parameters - Protect passwords, tokens, PII
- Set appropriate retention - Balance compliance needs vs storage
- Monitor audit logs - Set up alerts for security events
- Secure audit logs - Restrict access to audit data
- Regular reviews - Periodically audit the audit logs
- Test compliance queries - Ensure you can answer audit questions
Performance Considerations
For high-volume systems:
class BufferedAuditLogger(AuditLogger):
def __init__(self, base_logger, buffer_size=100):
self.base_logger = base_logger
self.buffer = []
self.buffer_size = buffer_size
async def log_event(self, event: AuditEvent) -> None:
self.buffer.append(event)
if len(self.buffer) >= self.buffer_size:
await self.flush()
async def flush(self):
if self.buffer:
await self.base_logger.log_batch(self.buffer)
self.buffer = []