AI-Generated Placeholder Documentation
This documentation page has been automatically generated by a Large Language Model (LLM) and serves as placeholder content. The information provided here may be incomplete, inaccurate, or subject to change.
For accurate and complete information, please refer to the Vanna source code on GitHub.
Error Recovery
Vanna 2.0 provides error recovery strategies to handle transient failures and improve reliability.
Retry Path - Exponential backoff and retry
Fallback Path - Alternative approach
Fail Path - Give up and return error
💡 Recovery Strategy: Determines whether to retry, use fallback, or fail based on error type and retry count.
Loading diagram...
ErrorRecoveryStrategy Interface
Implement custom recovery strategies by extending the base class:
from vanna.core.recovery import ErrorRecoveryStrategy, RecoveryAction
class ErrorRecoveryStrategy(ABC):
@abstractmethod
async def should_retry(
self,
error: Exception,
attempt: int,
context: Dict[str, Any]
) -> RecoveryAction:
"""Determine if and how to retry after an error"""
pass Built-in Strategies
ExponentialBackoffStrategy
Retry with exponentially increasing wait times:
from vanna.core.recovery import ExponentialBackoffStrategy
strategy = ExponentialBackoffStrategy(
max_retries=3,
initial_delay=1.0, # seconds
max_delay=30.0,
backoff_factor=2.0
)
agent = Agent(
llm_service=llm,
error_recovery_strategy=strategy
) Custom Recovery Strategies
Example 1: Simple Retry Strategy
from vanna.core.recovery import ErrorRecoveryStrategy, RecoveryAction, RecoveryActionType
class SimpleRetryStrategy(ErrorRecoveryStrategy):
def __init__(self, max_retries=3):
self.max_retries = max_retries
async def should_retry(
self,
error: Exception,
attempt: int,
context: Dict[str, Any]
) -> RecoveryAction:
# Retry on specific error types
retryable_errors = [
'timeout',
'connection_error',
'rate_limit'
]
error_type = type(error).__name__.lower()
if any(e in str(error).lower() for e in retryable_errors):
if attempt < self.max_retries:
return RecoveryAction(
action_type=RecoveryActionType.RETRY,
delay_seconds=2.0 ** attempt # Exponential backoff
)
return RecoveryAction(action_type=RecoveryActionType.FAIL) Example 2: Fallback Model Strategy
class FallbackModelStrategy(ErrorRecoveryStrategy):
def __init__(self, fallback_models: List[str]):
self.fallback_models = fallback_models
async def should_retry(
self,
error: Exception,
attempt: int,
context: Dict[str, Any]
) -> RecoveryAction:
# On model error, try fallback models
if 'model' in str(error).lower() and attempt < len(self.fallback_models):
return RecoveryAction(
action_type=RecoveryActionType.RETRY,
metadata={'use_model': self.fallback_models[attempt]}
)
return RecoveryAction(action_type=RecoveryActionType.FAIL) Example 3: Circuit Breaker
import time
from collections import defaultdict
class CircuitBreakerStrategy(ErrorRecoveryStrategy):
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = defaultdict(list)
async def should_retry(
self,
error: Exception,
attempt: int,
context: Dict[str, Any]
) -> RecoveryAction:
service = context.get('service', 'default')
now = time.time()
# Clean old failures
self.failures[service] = [
ts for ts in self.failures[service]
if now - ts < self.timeout
]
# Check if circuit should be open
if len(self.failures[service]) >= self.failure_threshold:
return RecoveryAction(
action_type=RecoveryActionType.FAIL,
metadata={'reason': 'circuit_breaker_open'}
)
# Record failure
self.failures[service].append(now)
# Retry with backoff
if attempt < 3:
return RecoveryAction(
action_type=RecoveryActionType.RETRY,
delay_seconds=2.0 ** attempt
)
return RecoveryAction(action_type=RecoveryActionType.FAIL) RecoveryAction Types
class RecoveryActionType(Enum):
RETRY = "retry" # Try again
FAIL = "fail" # Give up
FALLBACK = "fallback" # Use alternative approach
SKIP = "skip" # Skip and continue Error Handling in Tools
Tools can implement their own error recovery:
from vanna.core.tool import Tool, ToolResult
class ResilientApiTool(Tool):
async def execute(self, context: ToolContext, args: MyArgs) -> ToolResult:
max_retries = 3
for attempt in range(max_retries):
try:
result = await self.call_api(args)
return ToolResult(success=True, result_for_llm=result)
except ConnectionError as e:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
else:
return ToolResult(
success=False,
error=f"API call failed after {max_retries} attempts: {e}"
) Integrating with Observability
Track recovery attempts for monitoring:
class ObservableRetryStrategy(ErrorRecoveryStrategy):
def __init__(self, metrics_service, max_retries=3):
self.metrics = metrics_service
self.max_retries = max_retries
async def should_retry(
self,
error: Exception,
attempt: int,
context: Dict[str, Any]
) -> RecoveryAction:
# Record the error
await self.metrics.record_error(
error_type=type(error).__name__,
attempt=attempt,
context=context
)
if attempt < self.max_retries:
# Record retry
await self.metrics.increment_counter('retries')
return RecoveryAction(
action_type=RecoveryActionType.RETRY,
delay_seconds=2.0 ** attempt
)
# Record failure
await self.metrics.increment_counter('failures')
return RecoveryAction(action_type=RecoveryActionType.FAIL) Best Practices
- Identify retryable errors - Not all errors should be retried
- Use exponential backoff - Avoid overwhelming failing services
- Set maximum retries - Prevent infinite loops
- Log recovery attempts - For debugging and monitoring
- Consider circuit breakers - Fail fast when services are down
- Provide fallbacks - Alternative models or approaches
- Test failure scenarios - Ensure recovery works as expected
Common Error Scenarios
Network Timeouts
async def should_retry(self, error, attempt, context):
if isinstance(error, asyncio.TimeoutError):
if attempt < 3:
return RecoveryAction(
action_type=RecoveryActionType.RETRY,
delay_seconds=5.0
)
return RecoveryAction(action_type=RecoveryActionType.FAIL) Rate Limits
async def should_retry(self, error, attempt, context):
if 'rate_limit' in str(error).lower():
# Extract retry-after from error if available
retry_after = self._extract_retry_after(error)
return RecoveryAction(
action_type=RecoveryActionType.RETRY,
delay_seconds=retry_after or 60.0
)
return RecoveryAction(action_type=RecoveryActionType.FAIL) Model Overload
async def should_retry(self, error, attempt, context):
if 'overloaded' in str(error).lower():
if attempt == 0:
# Try a lighter model
return RecoveryAction(
action_type=RecoveryActionType.RETRY,
metadata={'use_model': 'claude-3-haiku'}
)
return RecoveryAction(action_type=RecoveryActionType.FAIL)