AI-Generated Placeholder Documentation
This documentation page has been automatically generated by a Large Language Model (LLM) and serves as placeholder content. The information provided here may be incomplete, inaccurate, or subject to change.
For accurate and complete information, please refer to the Vanna source code on GitHub.
Lifecycle Hooks
Lifecycle hooks allow you to intercept and customize agent behavior at key points in the execution flow.
before_message - Execute in order (1β2β3)
before_tool - Execute in order (1β2β3)
after_tool - Execute in reverse (3β2β1)
after_message - Execute in reverse (3β2β1)
Loading diagram...
LifecycleHook Interface
All lifecycle hooks extend the LifecycleHook base class:
from vanna.core.lifecycle import LifecycleHook
from vanna.core.user import User
from vanna.core.tool import Tool, ToolContext, ToolResult
class LifecycleHook(ABC):
async def before_message(self, user: User, message: str) -> Optional[str]:
"""Called before processing a user message"""
return None # Return modified message or None
async def after_message(self, result: Any) -> None:
"""Called after message processing completes"""
pass
async def before_tool(self, tool: Tool, context: ToolContext) -> None:
"""Called before tool execution"""
pass
async def after_tool(self, result: ToolResult) -> Optional[ToolResult]:
"""Called after tool execution"""
return None # Return modified result or None Registering Hooks
Add hooks when creating your agent:
from vanna import Agent
agent = Agent(
llm_service=llm,
sql_runner=sql_runner,
lifecycle_hooks=[
LoggingHook(),
QuotaCheckHook(),
MetricsHook()
]
) Built-in Hooks
LoggingHook
Basic logging of agent activity:
from vanna.core.lifecycle import LoggingHook
hook = LoggingHook(logger=my_logger) Custom Hook Examples
Example 1: Logging Hook
import logging
from vanna.core.lifecycle import LifecycleHook
class DetailedLoggingHook(LifecycleHook):
def __init__(self):
self.logger = logging.getLogger(__name__)
async def before_message(self, user: User, message: str) -> None:
self.logger.info(
f"User {user.username} ({user.id}) sent message",
extra={"message_length": len(message)}
)
return None # Don't modify message
async def after_message(self, result: Any) -> None:
self.logger.info("Message processing complete")
async def before_tool(self, tool: Tool, context: ToolContext) -> None:
self.logger.info(
f"Executing tool: {tool.name}",
extra={
"user_id": context.user.id,
"conversation_id": context.conversation_id
}
)
async def after_tool(self, result: ToolResult) -> None:
self.logger.info(
f"Tool execution {'succeeded' if result.success else 'failed'}",
extra={"has_ui_component": result.ui_component is not None}
)
return None # Don't modify result Example 2: Usage Quota Hook
from vanna.core.lifecycle import LifecycleHook
from vanna.core.errors import AgentError
class QuotaCheckHook(LifecycleHook):
def __init__(self, quota_service):
self.quota_service = quota_service
async def before_message(self, user: User, message: str) -> None:
# Check if user has remaining quota
remaining = await self.quota_service.get_remaining(user.id)
if remaining <= 0:
raise AgentError(
"Usage quota exceeded. Please upgrade your plan."
)
# Decrement quota
await self.quota_service.decrement(user.id)
return None
async def before_tool(self, tool: Tool, context: ToolContext) -> None:
# Track tool usage for billing
await self.quota_service.record_tool_usage(
user_id=context.user.id,
tool_name=tool.name
) Example 3: Content Moderation Hook
class ModerationHook(LifecycleHook):
def __init__(self, moderation_api):
self.moderation_api = moderation_api
async def before_message(self, user: User, message: str) -> Optional[str]:
# Check message for inappropriate content
result = await self.moderation_api.check(message)
if result.flagged:
raise AgentError(
"Message contains inappropriate content"
)
# Optionally sanitize the message
if result.needs_sanitization:
return result.sanitized_text
return None # No changes needed Example 4: Performance Monitoring Hook
import time
from vanna.core.lifecycle import LifecycleHook
class PerformanceHook(LifecycleHook):
def __init__(self, metrics_service):
self.metrics = metrics_service
self.start_times = {}
async def before_tool(self, tool: Tool, context: ToolContext) -> None:
# Record start time
key = f"{context.request_id}:{tool.name}"
self.start_times[key] = time.time()
async def after_tool(self, result: ToolResult) -> None:
# Calculate and record duration
if hasattr(result, 'metadata') and 'request_id' in result.metadata:
key = f"{result.metadata['request_id']}:{result.metadata['tool_name']}"
if key in self.start_times:
duration = time.time() - self.start_times[key]
await self.metrics.record_duration(
tool_name=result.metadata['tool_name'],
duration_ms=duration * 1000,
success=result.success
)
del self.start_times[key]
return None Example 5: Result Transformation Hook
class ResultEnhancementHook(LifecycleHook):
async def after_tool(self, result: ToolResult) -> Optional[ToolResult]:
# Add metadata to all successful results
if result.success:
if result.metadata is None:
result.metadata = {}
result.metadata['processed_at'] = datetime.now().isoformat()
result.metadata['enhanced_by_hook'] = True
# Modify the result for LLM
result.result_for_llm += "\n\n[Result verified and enhanced]"
return result # Return modified result Example 6: Caching Hook
import hashlib
import json
class CachingHook(LifecycleHook):
def __init__(self, cache_backend):
self.cache = cache_backend
self.pending_calls = {}
async def before_tool(self, tool: Tool, context: ToolContext) -> None:
# Generate cache key from tool name + args
cache_key = self._generate_cache_key(tool.name, context)
# Check if we have a cached result
cached_result = await self.cache.get(cache_key)
if cached_result:
# Store for retrieval in after_tool
self.pending_calls[context.request_id] = {
'cached': True,
'result': cached_result
}
# We can't short-circuit here, but can in after_tool
else:
self.pending_calls[context.request_id] = {
'cached': False,
'cache_key': cache_key
}
async def after_tool(self, result: ToolResult) -> Optional[ToolResult]:
request_id = result.metadata.get('request_id') if result.metadata else None
if request_id in self.pending_calls:
pending = self.pending_calls[request_id]
if pending['cached']:
# Return cached result
return pending['result']
else:
# Cache new result
if result.success:
await self.cache.set(
pending['cache_key'],
result,
ttl=3600 # 1 hour
)
del self.pending_calls[request_id]
return None
def _generate_cache_key(self, tool_name: str, context: ToolContext) -> str:
# Create deterministic cache key
key_data = {
'tool': tool_name,
'user_id': context.user.id,
'args': context.metadata.get('args', {})
}
key_str = json.dumps(key_data, sort_keys=True)
return hashlib.sha256(key_str.encode()).hexdigest() Hook Execution Order
Hooks are executed in the order theyβre registered:
agent = Agent(
llm_service=llm,
lifecycle_hooks=[
Hook1(), # Runs first
Hook2(), # Runs second
Hook3() # Runs third
]
) Error Handling
Hooks can raise errors to halt processing:
class ValidationHook(LifecycleHook):
async def before_tool(self, tool: Tool, context: ToolContext) -> None:
if not self.is_valid_request(context):
raise AgentError("Invalid request") Best Practices
- Keep hooks focused - One responsibility per hook
- Handle errors gracefully - Donβt crash the agent
- Be mindful of performance - Hooks run on every request
- Use async properly - All hooks are async
- Donβt modify unless needed - Return None to pass through unchanged
- Log hook activity - For debugging and monitoring
- Test hooks independently - Unit test each hook
Combining with Other Features
Hooks work well with other extensibility points:
agent = Agent(
llm_service=llm,
workflow_handler=CommandHandler(),
lifecycle_hooks=[QuotaHook(), LoggingHook()],
llm_middlewares=[CachingMiddleware()],
context_enrichers=[UserPreferencesEnricher()],
conversation_filters=[ContextWindowFilter()]
) See Also
- Workflow Handlers - Intercept messages before LLM processing
- LLM Middlewares
- Context Enrichers
- Observability
- Audit Logging