Skip to main content
Chapter 7 Production Deployment

Monitoring and Observability

20 min read Lesson 27 / 28

Monitoring and Observability for AI Agents

You cannot improve what you cannot measure. Production AI agents need the same observability as any other service — plus AI-specific metrics.

What to Measure

For AI agents, track:

  • Latency — Time to first token and total response time
  • Token usage — Input, output, cache read, cache write per request
  • Cost — Calculated from token usage and model pricing
  • Tool call rate — How often tools are called per agent run
  • Success rate — Agent completions vs. failures vs. timeouts
  • Quality score — If you have evaluation data

Structured Logging

import time
import logging
import json
from dataclasses import dataclass, asdict
import anthropic

client = anthropic.Anthropic()
logger = logging.getLogger("agent")

@dataclass
class AgentMetrics:
    request_id: str
    model: str
    task_type: str
    input_tokens: int
    output_tokens: int
    cache_read_tokens: int
    latency_ms: float
    tool_calls: int
    cost_usd: float
    success: bool
    error: str | None = None

PRICING = {
    "claude-haiku-4-5": {"input": 0.25, "output": 1.25},     # per million tokens
    "claude-sonnet-4-5": {"input": 3.0, "output": 15.0},
    "claude-opus-4-5": {"input": 15.0, "output": 75.0},
}

def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    p = PRICING.get(model, PRICING["claude-sonnet-4-5"])
    return (input_tokens / 1_000_000 * p["input"]) + (output_tokens / 1_000_000 * p["output"])

def tracked_complete(task: str, task_type: str = "general", model: str = "claude-sonnet-4-5") -> str:
    request_id = f"req_{int(time.time() * 1000)}"
    start_time = time.time()
    tool_calls = 0
    success = True
    error_msg = None

    try:
        messages = [{"role": "user", "content": task}]
        response = client.messages.create(model=model, max_tokens=2048, messages=messages)

        latency = (time.time() - start_time) * 1000
        usage = response.usage

        metrics = AgentMetrics(
            request_id=request_id,
            model=model,
            task_type=task_type,
            input_tokens=usage.input_tokens,
            output_tokens=usage.output_tokens,
            cache_read_tokens=getattr(usage, "cache_read_input_tokens", 0),
            latency_ms=latency,
            tool_calls=tool_calls,
            cost_usd=calculate_cost(model, usage.input_tokens, usage.output_tokens),
            success=True,
        )

        logger.info("agent_request", extra={"metrics": asdict(metrics)})

        return response.content[0].text

    except Exception as e:
        latency = (time.time() - start_time) * 1000
        logger.error("agent_error", extra={
            "request_id": request_id,
            "error": str(e),
            "latency_ms": latency,
        })
        raise

Sending Metrics to a Dashboard

import httpx

def send_to_datadog(metrics: AgentMetrics) -> None:
    """Ship metrics to Datadog."""
    httpx.post(
        "https://api.datadoghq.com/api/v2/series",
        headers={"DD-API-KEY": "YOUR_DD_API_KEY"},
        json={
            "series": [
                {"metric": "agent.latency_ms", "points": [[int(time.time()), metrics.latency_ms]], "tags": [f"model:{metrics.model}"]},
                {"metric": "agent.cost_usd", "points": [[int(time.time()), metrics.cost_usd]], "tags": [f"task:{metrics.task_type}"]},
                {"metric": "agent.tokens.input", "points": [[int(time.time()), metrics.input_tokens]]},
            ]
        },
    )

Observability is not just debugging — it is how you make data-driven decisions about model selection, caching, and architecture.