AI-Generated Placeholder Documentation

This documentation page has been automatically generated by a Large Language Model (LLM) and serves as placeholder content. The information provided here may be incomplete, inaccurate, or subject to change.

For accurate and complete information, please refer to the Vanna source code on GitHub.

Error Recovery

Vanna 2.0 provides error recovery strategies to handle transient failures and improve reliability.

Retry Path - Exponential backoff and retry
Fallback Path - Alternative approach
Fail Path - Give up and return error
💡 Recovery Strategy: Determines whether to retry, use fallback, or fail based on error type and retry count.
Loading diagram...

ErrorRecoveryStrategy Interface

Implement custom recovery strategies by extending the base class:

from vanna.core.recovery import ErrorRecoveryStrategy, RecoveryAction

class ErrorRecoveryStrategy(ABC):
    @abstractmethod
    async def should_retry(
        self,
        error: Exception,
        attempt: int,
        context: Dict[str, Any]
    ) -> RecoveryAction:
        """Determine if and how to retry after an error"""
        pass

Built-in Strategies

ExponentialBackoffStrategy

Retry with exponentially increasing wait times:

from vanna.core.recovery import ExponentialBackoffStrategy

strategy = ExponentialBackoffStrategy(
    max_retries=3,
    initial_delay=1.0,  # seconds
    max_delay=30.0,
    backoff_factor=2.0
)

agent = Agent(
    llm_service=llm,
    error_recovery_strategy=strategy
)

Custom Recovery Strategies

Example 1: Simple Retry Strategy

from vanna.core.recovery import ErrorRecoveryStrategy, RecoveryAction, RecoveryActionType

class SimpleRetryStrategy(ErrorRecoveryStrategy):
    def __init__(self, max_retries=3):
        self.max_retries = max_retries
    
    async def should_retry(
        self,
        error: Exception,
        attempt: int,
        context: Dict[str, Any]
    ) -> RecoveryAction:
        # Retry on specific error types
        retryable_errors = [
            'timeout',
            'connection_error',
            'rate_limit'
        ]
        
        error_type = type(error).__name__.lower()
        
        if any(e in str(error).lower() for e in retryable_errors):
            if attempt < self.max_retries:
                return RecoveryAction(
                    action_type=RecoveryActionType.RETRY,
                    delay_seconds=2.0 ** attempt  # Exponential backoff
                )
        
        return RecoveryAction(action_type=RecoveryActionType.FAIL)

Example 2: Fallback Model Strategy

class FallbackModelStrategy(ErrorRecoveryStrategy):
    def __init__(self, fallback_models: List[str]):
        self.fallback_models = fallback_models
    
    async def should_retry(
        self,
        error: Exception,
        attempt: int,
        context: Dict[str, Any]
    ) -> RecoveryAction:
        # On model error, try fallback models
        if 'model' in str(error).lower() and attempt < len(self.fallback_models):
            return RecoveryAction(
                action_type=RecoveryActionType.RETRY,
                metadata={'use_model': self.fallback_models[attempt]}
            )
        
        return RecoveryAction(action_type=RecoveryActionType.FAIL)

Example 3: Circuit Breaker

import time
from collections import defaultdict

class CircuitBreakerStrategy(ErrorRecoveryStrategy):
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = defaultdict(list)
    
    async def should_retry(
        self,
        error: Exception,
        attempt: int,
        context: Dict[str, Any]
    ) -> RecoveryAction:
        service = context.get('service', 'default')
        now = time.time()
        
        # Clean old failures
        self.failures[service] = [
            ts for ts in self.failures[service]
            if now - ts < self.timeout
        ]
        
        # Check if circuit should be open
        if len(self.failures[service]) >= self.failure_threshold:
            return RecoveryAction(
                action_type=RecoveryActionType.FAIL,
                metadata={'reason': 'circuit_breaker_open'}
            )
        
        # Record failure
        self.failures[service].append(now)
        
        # Retry with backoff
        if attempt < 3:
            return RecoveryAction(
                action_type=RecoveryActionType.RETRY,
                delay_seconds=2.0 ** attempt
            )
        
        return RecoveryAction(action_type=RecoveryActionType.FAIL)

RecoveryAction Types

class RecoveryActionType(Enum):
    RETRY = "retry"           # Try again
    FAIL = "fail"             # Give up
    FALLBACK = "fallback"     # Use alternative approach
    SKIP = "skip"             # Skip and continue

Error Handling in Tools

Tools can implement their own error recovery:

from vanna.core.tool import Tool, ToolResult

class ResilientApiTool(Tool):
    async def execute(self, context: ToolContext, args: MyArgs) -> ToolResult:
        max_retries = 3
        
        for attempt in range(max_retries):
            try:
                result = await self.call_api(args)
                return ToolResult(success=True, result_for_llm=result)
            
            except ConnectionError as e:
                if attempt < max_retries - 1:
                    await asyncio.sleep(2 ** attempt)
                    continue
                else:
                    return ToolResult(
                        success=False,
                        error=f"API call failed after {max_retries} attempts: {e}"
                    )

Integrating with Observability

Track recovery attempts for monitoring:

class ObservableRetryStrategy(ErrorRecoveryStrategy):
    def __init__(self, metrics_service, max_retries=3):
        self.metrics = metrics_service
        self.max_retries = max_retries
    
    async def should_retry(
        self,
        error: Exception,
        attempt: int,
        context: Dict[str, Any]
    ) -> RecoveryAction:
        # Record the error
        await self.metrics.record_error(
            error_type=type(error).__name__,
            attempt=attempt,
            context=context
        )
        
        if attempt < self.max_retries:
            # Record retry
            await self.metrics.increment_counter('retries')
            
            return RecoveryAction(
                action_type=RecoveryActionType.RETRY,
                delay_seconds=2.0 ** attempt
            )
        
        # Record failure
        await self.metrics.increment_counter('failures')
        return RecoveryAction(action_type=RecoveryActionType.FAIL)

Best Practices

  1. Identify retryable errors - Not all errors should be retried
  2. Use exponential backoff - Avoid overwhelming failing services
  3. Set maximum retries - Prevent infinite loops
  4. Log recovery attempts - For debugging and monitoring
  5. Consider circuit breakers - Fail fast when services are down
  6. Provide fallbacks - Alternative models or approaches
  7. Test failure scenarios - Ensure recovery works as expected

Common Error Scenarios

Network Timeouts

async def should_retry(self, error, attempt, context):
    if isinstance(error, asyncio.TimeoutError):
        if attempt < 3:
            return RecoveryAction(
                action_type=RecoveryActionType.RETRY,
                delay_seconds=5.0
            )
    return RecoveryAction(action_type=RecoveryActionType.FAIL)

Rate Limits

async def should_retry(self, error, attempt, context):
    if 'rate_limit' in str(error).lower():
        # Extract retry-after from error if available
        retry_after = self._extract_retry_after(error)
        return RecoveryAction(
            action_type=RecoveryActionType.RETRY,
            delay_seconds=retry_after or 60.0
        )
    return RecoveryAction(action_type=RecoveryActionType.FAIL)

Model Overload

async def should_retry(self, error, attempt, context):
    if 'overloaded' in str(error).lower():
        if attempt == 0:
            # Try a lighter model
            return RecoveryAction(
                action_type=RecoveryActionType.RETRY,
                metadata={'use_model': 'claude-3-haiku'}
            )
    return RecoveryAction(action_type=RecoveryActionType.FAIL)

See Also