AI-Generated Placeholder Documentation

This documentation page has been automatically generated by a Large Language Model (LLM) and serves as placeholder content. The information provided here may be incomplete, inaccurate, or subject to change.

For accurate and complete information, please refer to the Vanna source code on GitHub.

LLM Middlewares

LLM middlewares intercept and transform requests to and responses from LLM providers for caching, monitoring, content filtering, and more.

Request Path - Middlewares execute in order (1β†’2β†’3)
Response Path - Middlewares execute in reverse (3β†’2β†’1)
πŸ’‘ Key Concept: Each middleware can transform the request on the way in and the response on the way out, creating a pipeline pattern.
Loading diagram...

LlmMiddleware Interface

All middlewares extend the LlmMiddleware base class:

from vanna.core.middleware import LlmMiddleware
from vanna.core.llm import LlmRequest, LlmResponse

class LlmMiddleware(ABC):
    async def before_llm_request(self, request: LlmRequest) -> LlmRequest:
        """Called before sending request to LLM"""
        return request  # Return modified or original request
    
    async def after_llm_response(
        self,
        request: LlmRequest,
        response: LlmResponse
    ) -> LlmResponse:
        """Called after receiving response from LLM"""
        return response  # Return modified or original response

Registering Middlewares

Add middlewares when creating your agent:

from vanna import Agent

agent = Agent(
    llm_service=llm,
    llm_middlewares=[
        CachingMiddleware(),
        LoggingMiddleware(),
        CostTrackingMiddleware()
    ]
)

Middleware Examples

Example 1: Response Caching

import hashlib
import json
from vanna.core.middleware import LlmMiddleware

class CachingMiddleware(LlmMiddleware):
    def __init__(self, cache_backend, ttl=3600):
        self.cache = cache_backend
        self.ttl = ttl
    
    def _compute_cache_key(self, request: LlmRequest) -> str:
        # Create deterministic key from request
        key_data = {
            'messages': [
                {'role': m.role, 'content': m.content}
                for m in request.messages
            ],
            'model': request.model,
            'temperature': request.temperature
        }
        key_str = json.dumps(key_data, sort_keys=True)
        return hashlib.sha256(key_str.encode()).hexdigest()
    
    async def before_llm_request(self, request: LlmRequest) -> LlmRequest:
        # Check cache
        cache_key = self._compute_cache_key(request)
        cached_response = await self.cache.get(cache_key)
        
        if cached_response:
            # Store for retrieval in after_llm_response
            request.metadata = request.metadata or {}
            request.metadata['cached_response'] = cached_response
            request.metadata['cache_key'] = cache_key
        
        return request
    
    async def after_llm_response(
        self,
        request: LlmRequest,
        response: LlmResponse
    ) -> LlmResponse:
        # Return cached response if available
        if request.metadata and 'cached_response' in request.metadata:
            return request.metadata['cached_response']
        
        # Cache new response
        cache_key = self._compute_cache_key(request)
        await self.cache.set(cache_key, response, ttl=self.ttl)
        
        return response

Example 2: Request/Response Logging

import logging
from vanna.core.middleware import LlmMiddleware

class LoggingMiddleware(LlmMiddleware):
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    async def before_llm_request(self, request: LlmRequest) -> LlmRequest:
        self.logger.info(
            "LLM request",
            extra={
                'model': request.model,
                'message_count': len(request.messages),
                'temperature': request.temperature
            }
        )
        return request
    
    async def after_llm_response(
        self,
        request: LlmRequest,
        response: LlmResponse
    ) -> LlmResponse:
        self.logger.info(
            "LLM response received",
            extra={
                'model': request.model,
                'finish_reason': response.finish_reason,
                'tool_calls': len(response.tool_calls or [])
            }
        )
        return response

Example 3: Cost Tracking

class CostTrackingMiddleware(LlmMiddleware):
    # Token costs per million tokens (example pricing)
    COSTS = {
        'claude-3-opus': {'input': 15.0, 'output': 75.0},
        'claude-3-sonnet': {'input': 3.0, 'output': 15.0},
        'gpt-4': {'input': 30.0, 'output': 60.0},
        'gpt-3.5-turbo': {'input': 0.5, 'output': 1.5}
    }
    
    def __init__(self, cost_tracker):
        self.tracker = cost_tracker
    
    async def after_llm_response(
        self,
        request: LlmRequest,
        response: LlmResponse
    ) -> LlmResponse:
        # Calculate cost
        model_costs = self.COSTS.get(request.model, {'input': 0, 'output': 0})
        
        input_tokens = response.usage.get('input_tokens', 0)
        output_tokens = response.usage.get('output_tokens', 0)
        
        cost = (
            (input_tokens / 1_000_000) * model_costs['input'] +
            (output_tokens / 1_000_000) * model_costs['output']
        )
        
        # Track cost
        await self.tracker.record_cost(
            user_id=request.metadata.get('user_id'),
            model=request.model,
            cost=cost,
            input_tokens=input_tokens,
            output_tokens=output_tokens
        )
        
        return response

Example 4: Content Filtering

class ContentFilterMiddleware(LlmMiddleware):
    def __init__(self, blocked_patterns):
        self.blocked_patterns = blocked_patterns
    
    async def before_llm_request(self, request: LlmRequest) -> LlmRequest:
        # Check messages for blocked content
        for message in request.messages:
            for pattern in self.blocked_patterns:
                if pattern in message.content.lower():
                    raise ValueError(f"Blocked pattern detected: {pattern}")
        
        return request
    
    async def after_llm_response(
        self,
        request: LlmRequest,
        response: LlmResponse
    ) -> LlmResponse:
        # Sanitize response
        if response.content:
            for pattern in self.blocked_patterns:
                response.content = response.content.replace(pattern, "[REDACTED]")
        
        return response

Example 5: Retry Logic

import asyncio
from vanna.core.middleware import LlmMiddleware

class RetryMiddleware(LlmMiddleware):
    def __init__(self, max_retries=3, backoff_factor=2):
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor
    
    async def before_llm_request(self, request: LlmRequest) -> LlmRequest:
        # Store retry count in request metadata
        if not hasattr(request, 'metadata'):
            request.metadata = {}
        request.metadata['retry_count'] = 0
        return request
    
    async def after_llm_response(
        self,
        request: LlmRequest,
        response: LlmResponse
    ) -> LlmResponse:
        # Check for errors requiring retry
        if self._should_retry(response):
            retry_count = request.metadata.get('retry_count', 0)
            
            if retry_count < self.max_retries:
                # Wait with exponential backoff
                wait_time = self.backoff_factor ** retry_count
                await asyncio.sleep(wait_time)
                
                # Increment retry count
                request.metadata['retry_count'] = retry_count + 1
                
                # Retry the request (would need to be implemented in the agent)
                # This is a simplified example
                raise RetryableError(f"Retrying after {wait_time}s")
        
        return response
    
    def _should_retry(self, response: LlmResponse) -> bool:
        # Retry on rate limits or transient errors
        return response.error and 'rate_limit' in response.error.lower()

Example 6: Token Limit Enforcement

class TokenLimitMiddleware(LlmMiddleware):
    def __init__(self, max_input_tokens=8000, max_output_tokens=2000):
        self.max_input_tokens = max_input_tokens
        self.max_output_tokens = max_output_tokens
    
    async def before_llm_request(self, request: LlmRequest) -> LlmRequest:
        # Estimate input tokens (rough approximation)
        total_tokens = sum(
            len(msg.content) // 4  # ~4 chars per token
            for msg in request.messages
        )
        
        if total_tokens > self.max_input_tokens:
            # Truncate messages
            request.messages = self._truncate_messages(
                request.messages,
                self.max_input_tokens
            )
        
        # Enforce output limit
        if not request.max_tokens or request.max_tokens > self.max_output_tokens:
            request.max_tokens = self.max_output_tokens
        
        return request
    
    def _truncate_messages(self, messages, max_tokens):
        # Keep system message + recent messages within limit
        # Implementation details omitted
        return messages

Middleware Execution Order

Middlewares execute in order for requests, and reverse order for responses:

agent = Agent(
    llm_middlewares=[
        Middleware1(),  # before_llm_request: 1st, after_llm_response: 3rd
        Middleware2(),  # before_llm_request: 2nd, after_llm_response: 2nd
        Middleware3()   # before_llm_request: 3rd, after_llm_response: 1st
    ]
)

Best Practices

  1. Keep middlewares focused - One responsibility per middleware
  2. Don’t modify unless necessary - Pass through unchanged when possible
  3. Handle errors gracefully - Don’t crash the request
  4. Be mindful of performance - Middlewares run on every LLM call
  5. Use metadata for state - Store temporary data in request.metadata
  6. Log middleware activity - For debugging and monitoring
  7. Test independently - Unit test each middleware

See Also