Monitoring and Observability for AI Agents
You cannot improve what you cannot measure. Production AI agents need the same observability as any other service — plus AI-specific metrics.
What to Measure
For AI agents, track:
- Latency — Time to first token and total response time
- Token usage — Input, output, cache read, cache write per request
- Cost — Calculated from token usage and model pricing
- Tool call rate — How often tools are called per agent run
- Success rate — Agent completions vs. failures vs. timeouts
- Quality score — If you have evaluation data
Structured Logging
import time
import logging
import json
from dataclasses import dataclass, asdict
import anthropic
client = anthropic.Anthropic()
logger = logging.getLogger("agent")
@dataclass
class AgentMetrics:
request_id: str
model: str
task_type: str
input_tokens: int
output_tokens: int
cache_read_tokens: int
latency_ms: float
tool_calls: int
cost_usd: float
success: bool
error: str | None = None
PRICING = {
"claude-haiku-4-5": {"input": 0.25, "output": 1.25}, # per million tokens
"claude-sonnet-4-5": {"input": 3.0, "output": 15.0},
"claude-opus-4-5": {"input": 15.0, "output": 75.0},
}
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
p = PRICING.get(model, PRICING["claude-sonnet-4-5"])
return (input_tokens / 1_000_000 * p["input"]) + (output_tokens / 1_000_000 * p["output"])
def tracked_complete(task: str, task_type: str = "general", model: str = "claude-sonnet-4-5") -> str:
request_id = f"req_{int(time.time() * 1000)}"
start_time = time.time()
tool_calls = 0
success = True
error_msg = None
try:
messages = [{"role": "user", "content": task}]
response = client.messages.create(model=model, max_tokens=2048, messages=messages)
latency = (time.time() - start_time) * 1000
usage = response.usage
metrics = AgentMetrics(
request_id=request_id,
model=model,
task_type=task_type,
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
cache_read_tokens=getattr(usage, "cache_read_input_tokens", 0),
latency_ms=latency,
tool_calls=tool_calls,
cost_usd=calculate_cost(model, usage.input_tokens, usage.output_tokens),
success=True,
)
logger.info("agent_request", extra={"metrics": asdict(metrics)})
return response.content[0].text
except Exception as e:
latency = (time.time() - start_time) * 1000
logger.error("agent_error", extra={
"request_id": request_id,
"error": str(e),
"latency_ms": latency,
})
raise
Sending Metrics to a Dashboard
import httpx
def send_to_datadog(metrics: AgentMetrics) -> None:
"""Ship metrics to Datadog."""
httpx.post(
"https://api.datadoghq.com/api/v2/series",
headers={"DD-API-KEY": "YOUR_DD_API_KEY"},
json={
"series": [
{"metric": "agent.latency_ms", "points": [[int(time.time()), metrics.latency_ms]], "tags": [f"model:{metrics.model}"]},
{"metric": "agent.cost_usd", "points": [[int(time.time()), metrics.cost_usd]], "tags": [f"task:{metrics.task_type}"]},
{"metric": "agent.tokens.input", "points": [[int(time.time()), metrics.input_tokens]]},
]
},
)
Observability is not just debugging — it is how you make data-driven decisions about model selection, caching, and architecture.