AI-Generated Placeholder Documentation
This documentation page has been automatically generated by a Large Language Model (LLM) and serves as placeholder content. The information provided here may be incomplete, inaccurate, or subject to change.
For accurate and complete information, please refer to the Vanna source code on GitHub.
Observability
Monitor and debug your Vanna agents with built-in observability features.
ObservabilityProvider Interface
from vanna.core.observability import ObservabilityProvider, Span, Metric
class ObservabilityProvider(ABC):
@abstractmethod
async def record_metric(
self,
name: str,
value: float,
unit: str = "",
tags: Optional[Dict[str, str]] = None
) -> None:
"""Record a metric measurement"""
pass
@abstractmethod
async def create_span(
self,
name: str,
attributes: Optional[Dict[str, Any]] = None
) -> Span:
"""Create a new span for tracing"""
pass Registering an Observability Provider
from vanna import Agent
agent = Agent(
llm_service=llm,
observability_provider=PrometheusProvider(registry)
) Built-in Providers
PrometheusProvider
Expose metrics for Prometheus scraping:
from vanna.core.observability import PrometheusProvider
from prometheus_client import CollectorRegistry
registry = CollectorRegistry()
provider = PrometheusProvider(registry)
agent = Agent(
llm_service=llm,
observability_provider=provider
) Custom Provider Examples
Example 1: Prometheus Integration
from prometheus_client import Counter, Histogram, Gauge, CollectorRegistry
from vanna.core.observability import ObservabilityProvider, Span
class PrometheusProvider(ObservabilityProvider):
def __init__(self, registry: CollectorRegistry):
self.registry = registry
# Define metrics
self.request_counter = Counter(
'agent_requests_total',
'Total agent requests',
['user_group', 'status'],
registry=registry
)
self.tool_duration = Histogram(
'agent_tool_duration_seconds',
'Tool execution duration',
['tool_name', 'success'],
registry=registry
)
self.active_conversations = Gauge(
'agent_active_conversations',
'Number of active conversations',
registry=registry
)
self.llm_tokens = Counter(
'agent_llm_tokens_total',
'Total LLM tokens used',
['model', 'token_type'],
registry=registry
)
async def record_metric(
self,
name: str,
value: float,
unit: str = "",
tags: Optional[Dict[str, str]] = None
) -> None:
tags = tags or {}
if name == "agent.request":
self.request_counter.labels(
user_group=tags.get('user_group', 'unknown'),
status=tags.get('status', 'success')
).inc()
elif name == "agent.tool.duration":
self.tool_duration.labels(
tool_name=tags.get('tool_name'),
success=tags.get('success', 'true')
).observe(value)
elif name == "agent.llm.tokens":
self.llm_tokens.labels(
model=tags.get('model'),
token_type=tags.get('type', 'total')
).inc(value)
async def create_span(
self,
name: str,
attributes: Optional[Dict[str, Any]] = None
) -> Span:
return Span(name=name, attributes=attributes or {}) Example 2: DataDog Integration
from datadog import statsd
from vanna.core.observability import ObservabilityProvider
class DataDogProvider(ObservabilityProvider):
def __init__(self, tags=None):
self.default_tags = tags or []
async def record_metric(
self,
name: str,
value: float,
unit: str = "",
tags: Optional[Dict[str, str]] = None
) -> None:
# Convert tags dict to list
tag_list = self.default_tags + [
f"{k}:{v}" for k, v in (tags or {}).items()
]
# Send to DataDog
if 'duration' in name:
statsd.histogram(name, value, tags=tag_list)
elif 'count' in name:
statsd.increment(name, value, tags=tag_list)
else:
statsd.gauge(name, value, tags=tag_list)
async def create_span(self, name, attributes=None):
# DataDog tracing integration
from ddtrace import tracer
span = tracer.trace(name)
if attributes:
for k, v in attributes.items():
span.set_tag(k, v)
return span Example 3: CloudWatch Integration
import boto3
from vanna.core.observability import ObservabilityProvider
class CloudWatchProvider(ObservabilityProvider):
def __init__(self, namespace='VannaAgent'):
self.cloudwatch = boto3.client('cloudwatch')
self.namespace = namespace
async def record_metric(
self,
name: str,
value: float,
unit: str = "",
tags: Optional[Dict[str, str]] = None
) -> None:
dimensions = [
{'Name': k, 'Value': v}
for k, v in (tags or {}).items()
]
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=[{
'MetricName': name,
'Value': value,
'Unit': unit or 'None',
'Dimensions': dimensions
}]
) Example 4: Custom Logging Provider
import logging
import json
from vanna.core.observability import ObservabilityProvider
class LoggingProvider(ObservabilityProvider):
def __init__(self):
self.logger = logging.getLogger('vanna.observability')
async def record_metric(
self,
name: str,
value: float,
unit: str = "",
tags: Optional[Dict[str, str]] = None
) -> None:
self.logger.info(
"Metric recorded",
extra={
'metric_name': name,
'value': value,
'unit': unit,
'tags': tags
}
)
async def create_span(self, name, attributes=None):
self.logger.debug(
f"Span started: {name}",
extra={'attributes': attributes}
)
return Span(name=name, attributes=attributes or {}) Key Metrics to Track
Request Metrics
# Total requests
await provider.record_metric(
"agent.requests.total",
1,
tags={'user_group': user.group, 'status': 'success'}
)
# Request duration
await provider.record_metric(
"agent.request.duration",
duration_seconds,
unit="seconds",
tags={'user_id': user.id}
) Tool Metrics
# Tool execution count
await provider.record_metric(
"agent.tool.executions",
1,
tags={'tool_name': tool.name, 'success': str(result.success)}
)
# Tool duration
await provider.record_metric(
"agent.tool.duration",
duration,
unit="seconds",
tags={'tool_name': tool.name}
) LLM Metrics
# Token usage
await provider.record_metric(
"agent.llm.tokens",
input_tokens,
tags={'model': model, 'type': 'input'}
)
await provider.record_metric(
"agent.llm.tokens",
output_tokens,
tags={'model': model, 'type': 'output'}
)
# LLM latency
await provider.record_metric(
"agent.llm.latency",
latency_ms,
unit="milliseconds",
tags={'model': model}
) Error Metrics
# Error rate
await provider.record_metric(
"agent.errors.total",
1,
tags={'error_type': type(error).__name__}
) Distributed Tracing
Use spans to trace requests across services:
# In your agent code
async def handle_request(user, message):
span = await observability.create_span(
"agent.handle_request",
attributes={
'user_id': user.id,
'message_length': len(message)
}
)
try:
# Process request
result = await process(message)
span.set_attribute('success', True)
return result
except Exception as e:
span.set_attribute('error', str(e))
raise
finally:
span.end() Dashboards
Prometheus + Grafana
# Example Grafana dashboard query
# Request rate
rate(agent_requests_total[5m])
# Tool success rate
sum(rate(agent_tool_duration_seconds_count{success="true"}[5m])) /
sum(rate(agent_tool_duration_seconds_count[5m]))
# P95 tool latency
histogram_quantile(0.95, agent_tool_duration_seconds_bucket)
# Token usage by model
sum(rate(agent_llm_tokens_total[5m])) by (model) Best Practices
- Use consistent naming - Follow naming conventions for metrics
- Add relevant tags - Makes filtering and aggregation easier
- Track business metrics - Not just technical metrics
- Set up alerts - Monitor error rates, latency, etc.
- Use sampling for high-volume - Avoid overwhelming monitoring systems
- Correlate metrics - Link requests, tools, LLM calls
- Monitor costs - Track LLM token usage and API costs