Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

第14章:可观测性与调试

“能观测的系统才能调试,能调试的系统才能信任。”

凌晨3点,你的Self-healing Server Agent突然停止响应。日志里只有一条:“Task failed”。没有堆栈跟踪,没有上下文,没有任何线索。你只能祈祷白天它自己恢复,或者手动登录服务器逐个检查。

这就是没有可观测性的Agent系统的真实写照。

与传统软件不同,Agent系统的失败模式更加微妙:

  • 不报错但答非所问(模型理解偏差)
  • 突然变慢(上下文膨胀)
  • 输出质量下降(模型降级或prompt漂移)
  • 多Agent协调失败(状态不同步)

本章将帮你建立一套实用的可观测性体系,让Agent的行为可见、可追踪、可调试。


14.1 Agent行为的可观测性

传统软件的“三大支柱“(Logs、Metrics、Traces)在Agent系统中依然适用,但需要针对性扩展。

结构化日志:记录Agent的“思考过程“

传统日志(不够用)

# ❌ 信息不足
logging.info("Agent started")
logging.info("Task completed")

这样的日志在Agent失败时毫无用处。你不知道它收到了什么输入,做了什么推理,输出了什么。

Agent专用结构化日志

import json
import logging
from datetime import datetime

class AgentLogger:
    def __init__(self, agent_name, log_file="agent.jsonl"):
        self.agent_name = agent_name
        self.log_file = log_file
        
    def log_invocation(self, task_id, input_data, model, prompt_tokens):
        """记录Agent调用"""
        entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "agent": self.agent_name,
            "task_id": task_id,
            "event": "invocation",
            "input": input_data,
            "model": model,
            "prompt_tokens": prompt_tokens
        }
        self._write(entry)
        
    def log_decision(self, task_id, reasoning, decision):
        """记录决策过程"""
        entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "agent": self.agent_name,
            "task_id": task_id,
            "event": "decision",
            "reasoning": reasoning,
            "decision": decision
        }
        self._write(entry)
        
    def log_completion(self, task_id, output, duration_ms, success):
        """记录完成状态"""
        entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "agent": self.agent_name,
            "task_id": task_id,
            "event": "completion",
            "output": output,
            "duration_ms": duration_ms,
            "success": success
        }
        self._write(entry)
        
    def log_error(self, task_id, error_type, error_message, context):
        """记录错误"""
        entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "agent": self.agent_name,
            "task_id": task_id,
            "event": "error",
            "error_type": error_type,
            "error_message": error_message,
            "context": context
        }
        self._write(entry)
        
    def _write(self, entry):
        with open(self.log_file, "a") as f:
            f.write(json.dumps(entry) + "\n")

使用示例

logger = AgentLogger("research_agent")

def research_task(query):
    task_id = generate_task_id()
    
    try:
        # 记录开始
        logger.log_invocation(
            task_id=task_id,
            input_data={"query": query},
            model="gemini-flash",
            prompt_tokens=len(query.split()) * 1.3
        )
        
        # 执行检索
        results = search_web(query)
        
        # 记录决策
        logger.log_decision(
            task_id=task_id,
            reasoning=f"找到{len(results)}条结果,选择前3条",
            decision={"selected": results[:3]}
        )
        
        # 生成摘要
        summary = summarize(results[:3])
        
        # 记录完成
        logger.log_completion(
            task_id=task_id,
            output=summary,
            duration_ms=1250,
            success=True
        )
        
        return summary
        
    except Exception as e:
        logger.log_error(
            task_id=task_id,
            error_type=type(e).__name__,
            error_message=str(e),
            context={"query": query, "stage": "search"}
        )
        raise

现在当出错时,你可以精确追踪

# 查看某个任务的完整生命周期
$ cat agent.jsonl | jq 'select(.task_id == "task_12345")'

# 查看所有错误
$ cat agent.jsonl | jq 'select(.event == "error")'

# 统计每个Agent的成功率
$ cat agent.jsonl | jq -s 'group_by(.agent) | map({agent: .[0].agent, total: length, success: map(select(.success == true)) | length})'

💡 AI辅助提示 不熟悉jq命令行工具?问AI: “jq是什么?给我10个常用的jq命令示例,用于分析JSON日志文件。” jq是处理JSONL日志的瑞士军刀。

Metrics:Agent健康度指标

关键指标

from prometheus_client import Counter, Histogram, Gauge, start_http_server

# 调用次数
agent_invocations = Counter(
    'agent_invocations_total',
    'Total agent invocations',
    ['agent_name', 'model']
)

# 响应时间分布
agent_duration = Histogram(
    'agent_duration_seconds',
    'Agent execution duration',
    ['agent_name']
)

# Token消耗
agent_tokens = Counter(
    'agent_tokens_total',
    'Total tokens consumed',
    ['agent_name', 'model', 'token_type']
)

# 成功率
agent_success = Counter(
    'agent_success_total',
    'Successful completions',
    ['agent_name']
)

agent_failure = Counter(
    'agent_failure_total',
    'Failed completions',
    ['agent_name', 'error_type']
)

# 当前运行的任务数
agent_active_tasks = Gauge(
    'agent_active_tasks',
    'Currently running tasks',
    ['agent_name']
)

集成到Agent

def execute_with_metrics(agent_name, task_func, *args):
    agent_invocations.labels(agent_name=agent_name, model="claude-sonnet").inc()
    agent_active_tasks.labels(agent_name=agent_name).inc()
    
    start_time = time.time()
    try:
        result = task_func(*args)
        agent_success.labels(agent_name=agent_name).inc()
        return result
    except Exception as e:
        agent_failure.labels(
            agent_name=agent_name,
            error_type=type(e).__name__
        ).inc()
        raise
    finally:
        duration = time.time() - start_time
        agent_duration.labels(agent_name=agent_name).observe(duration)
        agent_active_tasks.labels(agent_name=agent_name).dec()

启动Metrics服务

# 在Agent启动时
start_http_server(8000)  # Prometheus可以从http://localhost:8000/metrics抓取

用Grafana可视化

# grafana-dashboard.json(简化版)
{
  "panels": [
    {
      "title": "Agent调用QPS",
      "targets": [{
        "expr": "rate(agent_invocations_total[5m])"
      }]
    },
    {
      "title": "成功率",
      "targets": [{
        "expr": "rate(agent_success_total[5m]) / rate(agent_invocations_total[5m])"
      }]
    },
    {
      "title": "P95响应时间",
      "targets": [{
        "expr": "histogram_quantile(0.95, agent_duration_seconds)"
      }]
    }
  ]
}

🔧 遇到错误? Prometheus/Grafana配置复杂?试试这个快捷方式: “给我一个最简单的Prometheus + Grafana Docker Compose配置,用于监控Python应用。” 5分钟内就能跑起来一个基础监控系统。

Traces:多Agent调用链追踪

当Multi-Agent系统涉及多个Agent协作时,单一日志不够用,你需要分布式追踪

OpenTelemetry集成

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor

# 初始化
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# 添加Exporter(可以换成Jaeger、Zipkin等)
trace.get_tracer_provider().add_span_processor(
    SimpleSpanProcessor(ConsoleSpanExporter())
)

def agent_with_trace(agent_name, task_func):
    with tracer.start_as_current_span(f"{agent_name}.execute") as span:
        span.set_attribute("agent.name", agent_name)
        span.set_attribute("agent.version", "1.0")
        
        try:
            result = task_func()
            span.set_attribute("success", True)
            return result
        except Exception as e:
            span.set_attribute("success", False)
            span.set_attribute("error.type", type(e).__name__)
            span.set_attribute("error.message", str(e))
            raise

Multi-Agent场景

def content_pipeline(topic):
    with tracer.start_as_current_span("content_pipeline") as parent_span:
        parent_span.set_attribute("topic", topic)
        
        # 策略Agent
        with tracer.start_as_current_span("strategy_agent"):
            strategy = strategy_agent.plan(topic)
        
        # 研究Agent
        with tracer.start_as_current_span("research_agent"):
            research = research_agent.gather(strategy)
        
        # 写作Agent
        with tracer.start_as_current_span("writer_agent"):
            draft = writer_agent.write(research)
        
        # 编辑Agent
        with tracer.start_as_current_span("editor_agent"):
            final = editor_agent.review(draft)
        
        return final

Trace可视化(Jaeger UI)

content_pipeline [总耗时: 45s]
├─ strategy_agent [2s]
├─ research_agent [15s]  ← 瓶颈!
├─ writer_agent [20s]
└─ editor_agent [8s]

一眼就能看出研究阶段是瓶颈。

📚 深入学习 想了解更多分布式追踪?问AI: “OpenTelemetry和Jaeger是什么关系?如何在Python中快速集成?”


14.2 常见问题诊断

即使有完善的可观测性,问题还是会发生。以下是Agent系统最常见的故障模式和诊断方法。

问题1:Agent不响应

症状:Agent调用后长时间无返回,或直接超时。

可能原因与诊断

# 诊断脚本
def diagnose_no_response(agent_name, task_id):
    # 1. 检查日志
    logs = get_logs(agent_name, task_id)
    
    if not logs:
        return "❌ 完全没有日志 → Agent可能未启动或配置错误"
    
    if logs[-1]["event"] == "invocation":
        # 有调用记录但没有后续
        return "⚠️ Agent收到请求但未响应 → 可能API超时或死循环"
    
    if logs[-1]["event"] == "error":
        return f"❌ 出现错误:{logs[-1]['error_message']}"
    
    # 2. 检查API状态
    api_status = check_api_health(logs[-1]["model"])
    if api_status != "ok":
        return f"⚠️ API服务异常:{api_status}"
    
    # 3. 检查Token限制
    if logs[-1]["prompt_tokens"] > 100000:
        return "⚠️ 输入过长,可能触发API限制"
    
    return "🤔 无法确定,需要更多信息"

常见解决方案

原因解决方案
API超时增加timeout配置,或切换到更快的模型
上下文过长实施第13章的上下文裁剪策略
API配额耗尽检查账户余额和速率限制
死循环(自反馈)添加最大迭代次数限制

问题2:输出质量突然下降

症状:Agent以前工作正常,现在输出变得离谱或无关。

诊断检查清单

def diagnose_quality_drop(agent_name, recent_tasks):
    report = {}
    
    # 1. 模型是否被降级?
    models_used = [task["model"] for task in recent_tasks]
    if len(set(models_used)) > 1:
        report["model_change"] = f"检测到模型切换:{set(models_used)}"
    
    # 2. Prompt是否被修改?
    prompt_hashes = [hash(task["prompt"]) for task in recent_tasks]
    if len(set(prompt_hashes[-10:])) > 5:
        report["prompt_drift"] = "Prompt频繁变化,可能影响一致性"
    
    # 3. 输入质量是否下降?
    avg_input_length = sum(len(t["input"]) for t in recent_tasks) / len(recent_tasks)
    if avg_input_length < 100:
        report["input_quality"] = "输入过短,可能缺少必要上下文"
    
    # 4. 温度参数是否过高?
    temps = [task["temperature"] for task in recent_tasks if "temperature" in task]
    if temps and max(temps) > 0.9:
        report["high_temperature"] = f"温度过高:{max(temps)}"
    
    return report

真实案例:Morning Briefing Agent(第6章)突然开始输出乱码。

调查过程:
1. 检查日志 → 发现模型从claude-sonnet切换到了gpt-4o-mini(成本优化时改错了)
2. 对比输出 → gpt-4o-mini对特定prompt的理解偏差
3. 解决:回滚模型 or 调整prompt以适配新模型

🔧 遇到错误? 输出质量下降但找不到原因?试试A/B对比: “我的Agent之前输出正常,现在[具体问题]。帮我设计一个A/B测试,对比新旧版本的差异。”

问题3:上下文混乱

症状:Agent回答时引用了错误的信息,或混淆了不同任务的上下文。

根本原因:多任务并发或状态泄露。

诊断示例

# 错误实现:共享状态
class BuggyAgent:
    def __init__(self):
        self.context = ""  # ❌ 所有任务共享!
    
    def process(self, task):
        self.context += task.input  # 会累积所有任务
        return self.llm.call(self.context)

# 正确实现:每任务独立上下文
class CorrectAgent:
    def process(self, task):
        context = self.build_context(task)  # ✅ 每次构建新的
        return self.llm.call(context)

检测脚本

def detect_context_leak():
    """检测上下文是否在任务间泄露"""
    # 发送两个完全无关的任务
    task1 = "总结这篇关于AI的文章:[文章A]"
    task2 = "总结这篇关于烹饪的文章:[文章B]"
    
    response1 = agent.process(task1)
    response2 = agent.process(task2)
    
    # 检查task2的回答是否提到task1的内容
    if "AI" in response2 or "人工智能" in response2:
        print("⚠️ 检测到上下文泄露!task2提到了task1的内容")
        return True
    
    return False

问题4:多Agent协调失败

症状:Multi-Agent系统中某个Agent没有收到前一个Agent的输出,或输出格式不兼容。

诊断策略

def diagnose_coordination_failure(pipeline_run_id):
    # 1. 检查每个Agent的输出
    stages = ["strategy", "research", "writer", "editor"]
    outputs = {}
    
    for stage in stages:
        output = get_agent_output(pipeline_run_id, stage)
        outputs[stage] = output
        
        if not output:
            print(f"❌ {stage} Agent没有产生输出")
            return
        
        # 2. 检查输出格式
        if stage in ["strategy", "research"]:
            if not isinstance(output, dict):
                print(f"⚠️ {stage} 输出格式错误,期望dict,实际{type(output)}")
    
    # 3. 检查数据流
    for i in range(len(stages) - 1):
        current_stage = stages[i]
        next_stage = stages[i + 1]
        
        # 检查下一个Agent是否使用了上一个Agent的输出
        next_input = get_agent_input(pipeline_run_id, next_stage)
        if outputs[current_stage] not in str(next_input):
            print(f"⚠️ {next_stage} 未接收到 {current_stage} 的输出")

Autonomous PM案例(第5章、第10章深入):

失败场景:
- Strategy Agent输出JSON:{"tasks": [...]}
- Research Agent期望YAML格式
- 结果:Research Agent解析失败,整个Pipeline中断

解决方案:
1. 标准化输出格式(统一用JSON)
2. 添加格式转换层
3. 在每个Agent的输入端验证格式

📚 深入学习 想系统学习多Agent调试?问AI: “Multi-Agent系统中常见的协调失败模式有哪些?如何设计健壮的Agent间通信协议?”


14.3 n8n可视化调试

对于使用n8n编排的工作流(第7章、第11章深入讨论),调试变得直观得多。

n8n的内置可视化

案例引用:n8n Workflow Orchestration(第7章:凭证隔离架构 → 第11章:基础设施集成 → 本章:调试实战)

优势

  1. 每个节点的输入输出都可见

    [Webhook] → 接收到请求
       ↓
    [Extract Data] → 提取字段 {user_id: 123, action: "deploy"}
       ↓
    [Check Permissions] → 权限验证通过
       ↓
    [Execute Script] → ❌ 失败:SSH timeout
    
  2. 可以单独重跑失败的节点

    • 右键点击失败节点 → “Execute Node”
    • 无需重跑整个workflow
  3. 可以修改输入进行测试

    • 点击节点 → “Edit” → 修改输入数据
    • 立即看到输出变化

调试实战:Self-healing Server

场景:第11章的Self-healing Server用n8n编排了一个修复流程,但某次执行失败了。

n8n Workflow

[Cron: 每5分钟]
  ↓
[Health Check] ← SSH检查服务状态
  ↓
[Condition: 服务异常?]
  ├─ 是 → [Diagnose] ← Agent分析日志
  │         ↓
  │       [Generate Fix] ← Agent生成修复命令
  │         ↓
  │       [Execute Fix] ← SSH执行
  │         ↓
  │       [Verify] ← 再次检查
  │
  └─ 否 → [Log: All Good]

失败案例

  1. 打开n8n执行历史:看到“Execute Fix“节点失败
  2. 点击失败节点:看到输入和输出
    输入:{
      "command": "sudo systemctl restart nginx",
      "host": "prod-server-1"
    }
    输出:{
      "error": "Permission denied (publickey)"
    }
    
  3. 问题定位:SSH密钥失效
  4. 修复
    • 在n8n的Credentials中更新SSH密钥
    • 点击“Retry Execution“ → 成功

整个调试过程不到2分钟,无需查看任何日志文件。

n8n调试技巧

技巧1:使用“Pin Data“固定测试数据

在开发workflow时,不想每次都等真实数据:
1. 执行一次workflow,获取真实数据
2. 右键节点 → "Pin Data" → 保存这次的输出
3. 后续调试时直接用这个固定数据

技巧2:添加“Function“节点打印调试信息

// 在任意位置插入Function节点
const input = $input.all();
console.log("当前数据:", JSON.stringify(input, null, 2));
return input;  // 原样传递给下一个节点

在n8n的执行日志中可以看到打印内容。

技巧3:用“Error Trigger“捕获异常

[主Workflow] → [可能失败的节点]
                      ↓ (失败时)
               [Error Trigger]
                      ↓
               [Send Alert] → Telegram通知
                      ↓
               [Log to File] → 保存失败详情

这样所有失败都会自动通知你,并保存现场。

💡 AI辅助提示 n8n节点配置复杂?问AI: “我想在n8n中[具体功能],应该用哪个节点?如何配置?” AI可以给你详细的节点配置示例。

对比:n8n vs 纯代码调试

维度n8n纯代码
可视化✅ 每个步骤清晰可见❌ 需要自己加日志
失败定位✅ 立即知道哪个节点失败⚠️ 需要分析日志/堆栈
部分重试✅ 右键重跑失败节点❌ 需要重跑整个流程
实时修改✅ 直接在UI改⚠️ 改代码+重启
版本控制⚠️ JSON格式(可以Git管理)✅ 原生代码
复杂逻辑⚠️ 适合中等复杂度✅ 无限制

推荐策略

  • 简单到中等复杂度的工作流:优先用n8n(调试效率高)
  • 高度复杂或需要深度定制:用代码(灵活性高)
  • 混合:n8n做编排,复杂逻辑用Code节点或外部API

本章小结

关键要点

  1. 结构化日志是基础:记录输入、决策、输出、错误,形成完整追踪链
  2. Metrics让问题浮出水面:成功率、响应时间、Token消耗是健康度核心指标
  3. Traces串联多Agent调用:看清整个Pipeline的瓶颈
  4. 常见问题有套路:不响应、质量下降、上下文混乱、协调失败各有诊断方法
  5. n8n提供最佳调试体验:可视化、可重跑、可修改

调试清单

  • 为每个Agent添加结构化日志
  • 暴露Prometheus metrics端点
  • 在CI/CD中集成基础健康检查
  • 文档化常见错误和解决方案
  • 设置告警规则(成功率<95%、P95延迟>10s)

反模式警告

  • ❌ 只在出问题时才加日志(太晚了)
  • ❌ 日志用print()而不是结构化(无法分析)
  • ❌ 没有保留历史数据(无法对比趋势)
  • ❌ 完全依赖第三方平台(平台故障时你也瞎了)

下一章预告

你现在有了可观测性和调试工具,但如何避免常见陷阱?如何设计出健壮、可维护的Agent系统?第15章将分享最佳实践和反模式,帮你少走弯路。


参考资料

本章引用的案例均来自 awesome-openclaw-usecases 社区仓库: