Rate Limiting and Error Handling
Production AI systems fail in ways that development systems do not: rate limits, network timeouts, model overload errors. Robust retry logic and error handling are not optional.
Understanding Anthropic API Errors
import anthropic
from anthropic import APIConnectionError, APIStatusError, RateLimitError
client = anthropic.Anthropic()
try:
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}],
)
except RateLimitError as e:
# HTTP 429 — you have exceeded your rate limit
print(f"Rate limited: {e}")
# Implement exponential backoff here
except APIStatusError as e:
# HTTP 4xx or 5xx errors
print(f"API error {e.status_code}: {e.message}")
except APIConnectionError as e:
# Network issues — safe to retry
print(f"Connection error: {e}")
Exponential Backoff with Jitter
import time
import random
import anthropic
from anthropic import RateLimitError, APIConnectionError
client = anthropic.Anthropic()
def create_with_retry(
max_retries: int = 5,
initial_delay: float = 1.0,
**kwargs,
) -> anthropic.types.Message:
"""Retry API calls with exponential backoff and jitter."""
delay = initial_delay
for attempt in range(max_retries):
try:
return client.messages.create(**kwargs)
except (RateLimitError, APIConnectionError) as e:
if attempt == max_retries - 1:
raise # Re-raise on final attempt
# Add jitter to prevent thundering herd
jitter = random.uniform(0, delay * 0.1)
sleep_time = delay + jitter
print(f"Attempt {attempt + 1} failed ({type(e).__name__}). Retrying in {sleep_time:.1f}s...")
time.sleep(sleep_time)
# Exponential backoff: 1s, 2s, 4s, 8s, 16s
delay = min(delay * 2, 60.0) # Cap at 60 seconds
# Usage — same interface as client.messages.create
response = create_with_retry(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain recursion."}],
)
JavaScript Retry Logic
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic({
maxRetries: 3, // Built-in retry support in the SDK
timeout: 30_000, // 30 second timeout
});
async function createWithRetry(
params: Anthropic.Messages.MessageCreateParamsNonStreaming,
maxAttempts = 5
): Promise<Anthropic.Message> {
let delay = 1000;
for (let attempt = 0; attempt < maxAttempts; attempt++) {
try {
return await client.messages.create(params);
} catch (error) {
if (attempt === maxAttempts - 1) throw error;
if (
error instanceof Anthropic.RateLimitError ||
error instanceof Anthropic.APIConnectionError
) {
const jitter = Math.random() * 100;
await new Promise((r) => setTimeout(r, delay + jitter));
delay = Math.min(delay * 2, 60_000);
} else {
throw error; // Non-retriable error
}
}
}
throw new Error("Max retries exceeded");
}
Circuit Breaker Pattern
For high-traffic production systems, add a circuit breaker to prevent cascading failures:
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing — reject requests immediately
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = CircuitState.CLOSED
self.last_failure_time = 0
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker OPEN — request rejected")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise
def on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
Defensive error handling is what separates prototypes from production systems.