第14章:可观测性与调试
“能观测的系统才能调试,能调试的系统才能信任。”
凌晨3点,你的Self-healing Server Agent突然停止响应。日志里只有一条:“Task failed”。没有堆栈跟踪,没有上下文,没有任何线索。你只能祈祷白天它自己恢复,或者手动登录服务器逐个检查。
这就是没有可观测性的Agent系统的真实写照。
与传统软件不同,Agent系统的失败模式更加微妙:
- 不报错但答非所问(模型理解偏差)
- 突然变慢(上下文膨胀)
- 输出质量下降(模型降级或prompt漂移)
- 多Agent协调失败(状态不同步)
本章将帮你建立一套实用的可观测性体系,让Agent的行为可见、可追踪、可调试。
14.1 Agent行为的可观测性
传统软件的“三大支柱“(Logs、Metrics、Traces)在Agent系统中依然适用,但需要针对性扩展。
结构化日志:记录Agent的“思考过程“
传统日志(不够用):
# ❌ 信息不足
logging.info("Agent started")
logging.info("Task completed")
这样的日志在Agent失败时毫无用处。你不知道它收到了什么输入,做了什么推理,输出了什么。
Agent专用结构化日志:
import json
import logging
from datetime import datetime
class AgentLogger:
def __init__(self, agent_name, log_file="agent.jsonl"):
self.agent_name = agent_name
self.log_file = log_file
def log_invocation(self, task_id, input_data, model, prompt_tokens):
"""记录Agent调用"""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"agent": self.agent_name,
"task_id": task_id,
"event": "invocation",
"input": input_data,
"model": model,
"prompt_tokens": prompt_tokens
}
self._write(entry)
def log_decision(self, task_id, reasoning, decision):
"""记录决策过程"""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"agent": self.agent_name,
"task_id": task_id,
"event": "decision",
"reasoning": reasoning,
"decision": decision
}
self._write(entry)
def log_completion(self, task_id, output, duration_ms, success):
"""记录完成状态"""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"agent": self.agent_name,
"task_id": task_id,
"event": "completion",
"output": output,
"duration_ms": duration_ms,
"success": success
}
self._write(entry)
def log_error(self, task_id, error_type, error_message, context):
"""记录错误"""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"agent": self.agent_name,
"task_id": task_id,
"event": "error",
"error_type": error_type,
"error_message": error_message,
"context": context
}
self._write(entry)
def _write(self, entry):
with open(self.log_file, "a") as f:
f.write(json.dumps(entry) + "\n")
使用示例:
logger = AgentLogger("research_agent")
def research_task(query):
task_id = generate_task_id()
try:
# 记录开始
logger.log_invocation(
task_id=task_id,
input_data={"query": query},
model="gemini-flash",
prompt_tokens=len(query.split()) * 1.3
)
# 执行检索
results = search_web(query)
# 记录决策
logger.log_decision(
task_id=task_id,
reasoning=f"找到{len(results)}条结果,选择前3条",
decision={"selected": results[:3]}
)
# 生成摘要
summary = summarize(results[:3])
# 记录完成
logger.log_completion(
task_id=task_id,
output=summary,
duration_ms=1250,
success=True
)
return summary
except Exception as e:
logger.log_error(
task_id=task_id,
error_type=type(e).__name__,
error_message=str(e),
context={"query": query, "stage": "search"}
)
raise
现在当出错时,你可以精确追踪:
# 查看某个任务的完整生命周期
$ cat agent.jsonl | jq 'select(.task_id == "task_12345")'
# 查看所有错误
$ cat agent.jsonl | jq 'select(.event == "error")'
# 统计每个Agent的成功率
$ cat agent.jsonl | jq -s 'group_by(.agent) | map({agent: .[0].agent, total: length, success: map(select(.success == true)) | length})'
💡 AI辅助提示 不熟悉jq命令行工具?问AI: “jq是什么?给我10个常用的jq命令示例,用于分析JSON日志文件。” jq是处理JSONL日志的瑞士军刀。
Metrics:Agent健康度指标
关键指标:
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# 调用次数
agent_invocations = Counter(
'agent_invocations_total',
'Total agent invocations',
['agent_name', 'model']
)
# 响应时间分布
agent_duration = Histogram(
'agent_duration_seconds',
'Agent execution duration',
['agent_name']
)
# Token消耗
agent_tokens = Counter(
'agent_tokens_total',
'Total tokens consumed',
['agent_name', 'model', 'token_type']
)
# 成功率
agent_success = Counter(
'agent_success_total',
'Successful completions',
['agent_name']
)
agent_failure = Counter(
'agent_failure_total',
'Failed completions',
['agent_name', 'error_type']
)
# 当前运行的任务数
agent_active_tasks = Gauge(
'agent_active_tasks',
'Currently running tasks',
['agent_name']
)
集成到Agent:
def execute_with_metrics(agent_name, task_func, *args):
agent_invocations.labels(agent_name=agent_name, model="claude-sonnet").inc()
agent_active_tasks.labels(agent_name=agent_name).inc()
start_time = time.time()
try:
result = task_func(*args)
agent_success.labels(agent_name=agent_name).inc()
return result
except Exception as e:
agent_failure.labels(
agent_name=agent_name,
error_type=type(e).__name__
).inc()
raise
finally:
duration = time.time() - start_time
agent_duration.labels(agent_name=agent_name).observe(duration)
agent_active_tasks.labels(agent_name=agent_name).dec()
启动Metrics服务:
# 在Agent启动时
start_http_server(8000) # Prometheus可以从http://localhost:8000/metrics抓取
用Grafana可视化:
# grafana-dashboard.json(简化版)
{
"panels": [
{
"title": "Agent调用QPS",
"targets": [{
"expr": "rate(agent_invocations_total[5m])"
}]
},
{
"title": "成功率",
"targets": [{
"expr": "rate(agent_success_total[5m]) / rate(agent_invocations_total[5m])"
}]
},
{
"title": "P95响应时间",
"targets": [{
"expr": "histogram_quantile(0.95, agent_duration_seconds)"
}]
}
]
}
🔧 遇到错误? Prometheus/Grafana配置复杂?试试这个快捷方式: “给我一个最简单的Prometheus + Grafana Docker Compose配置,用于监控Python应用。” 5分钟内就能跑起来一个基础监控系统。
Traces:多Agent调用链追踪
当Multi-Agent系统涉及多个Agent协作时,单一日志不够用,你需要分布式追踪。
OpenTelemetry集成:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# 初始化
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# 添加Exporter(可以换成Jaeger、Zipkin等)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(ConsoleSpanExporter())
)
def agent_with_trace(agent_name, task_func):
with tracer.start_as_current_span(f"{agent_name}.execute") as span:
span.set_attribute("agent.name", agent_name)
span.set_attribute("agent.version", "1.0")
try:
result = task_func()
span.set_attribute("success", True)
return result
except Exception as e:
span.set_attribute("success", False)
span.set_attribute("error.type", type(e).__name__)
span.set_attribute("error.message", str(e))
raise
Multi-Agent场景:
def content_pipeline(topic):
with tracer.start_as_current_span("content_pipeline") as parent_span:
parent_span.set_attribute("topic", topic)
# 策略Agent
with tracer.start_as_current_span("strategy_agent"):
strategy = strategy_agent.plan(topic)
# 研究Agent
with tracer.start_as_current_span("research_agent"):
research = research_agent.gather(strategy)
# 写作Agent
with tracer.start_as_current_span("writer_agent"):
draft = writer_agent.write(research)
# 编辑Agent
with tracer.start_as_current_span("editor_agent"):
final = editor_agent.review(draft)
return final
Trace可视化(Jaeger UI):
content_pipeline [总耗时: 45s]
├─ strategy_agent [2s]
├─ research_agent [15s] ← 瓶颈!
├─ writer_agent [20s]
└─ editor_agent [8s]
一眼就能看出研究阶段是瓶颈。
📚 深入学习 想了解更多分布式追踪?问AI: “OpenTelemetry和Jaeger是什么关系?如何在Python中快速集成?”
14.2 常见问题诊断
即使有完善的可观测性,问题还是会发生。以下是Agent系统最常见的故障模式和诊断方法。
问题1:Agent不响应
症状:Agent调用后长时间无返回,或直接超时。
可能原因与诊断:
# 诊断脚本
def diagnose_no_response(agent_name, task_id):
# 1. 检查日志
logs = get_logs(agent_name, task_id)
if not logs:
return "❌ 完全没有日志 → Agent可能未启动或配置错误"
if logs[-1]["event"] == "invocation":
# 有调用记录但没有后续
return "⚠️ Agent收到请求但未响应 → 可能API超时或死循环"
if logs[-1]["event"] == "error":
return f"❌ 出现错误:{logs[-1]['error_message']}"
# 2. 检查API状态
api_status = check_api_health(logs[-1]["model"])
if api_status != "ok":
return f"⚠️ API服务异常:{api_status}"
# 3. 检查Token限制
if logs[-1]["prompt_tokens"] > 100000:
return "⚠️ 输入过长,可能触发API限制"
return "🤔 无法确定,需要更多信息"
常见解决方案:
| 原因 | 解决方案 |
|---|---|
| API超时 | 增加timeout配置,或切换到更快的模型 |
| 上下文过长 | 实施第13章的上下文裁剪策略 |
| API配额耗尽 | 检查账户余额和速率限制 |
| 死循环(自反馈) | 添加最大迭代次数限制 |
问题2:输出质量突然下降
症状:Agent以前工作正常,现在输出变得离谱或无关。
诊断检查清单:
def diagnose_quality_drop(agent_name, recent_tasks):
report = {}
# 1. 模型是否被降级?
models_used = [task["model"] for task in recent_tasks]
if len(set(models_used)) > 1:
report["model_change"] = f"检测到模型切换:{set(models_used)}"
# 2. Prompt是否被修改?
prompt_hashes = [hash(task["prompt"]) for task in recent_tasks]
if len(set(prompt_hashes[-10:])) > 5:
report["prompt_drift"] = "Prompt频繁变化,可能影响一致性"
# 3. 输入质量是否下降?
avg_input_length = sum(len(t["input"]) for t in recent_tasks) / len(recent_tasks)
if avg_input_length < 100:
report["input_quality"] = "输入过短,可能缺少必要上下文"
# 4. 温度参数是否过高?
temps = [task["temperature"] for task in recent_tasks if "temperature" in task]
if temps and max(temps) > 0.9:
report["high_temperature"] = f"温度过高:{max(temps)}"
return report
真实案例:Morning Briefing Agent(第6章)突然开始输出乱码。
调查过程:
1. 检查日志 → 发现模型从claude-sonnet切换到了gpt-4o-mini(成本优化时改错了)
2. 对比输出 → gpt-4o-mini对特定prompt的理解偏差
3. 解决:回滚模型 or 调整prompt以适配新模型
🔧 遇到错误? 输出质量下降但找不到原因?试试A/B对比: “我的Agent之前输出正常,现在[具体问题]。帮我设计一个A/B测试,对比新旧版本的差异。”
问题3:上下文混乱
症状:Agent回答时引用了错误的信息,或混淆了不同任务的上下文。
根本原因:多任务并发或状态泄露。
诊断示例:
# 错误实现:共享状态
class BuggyAgent:
def __init__(self):
self.context = "" # ❌ 所有任务共享!
def process(self, task):
self.context += task.input # 会累积所有任务
return self.llm.call(self.context)
# 正确实现:每任务独立上下文
class CorrectAgent:
def process(self, task):
context = self.build_context(task) # ✅ 每次构建新的
return self.llm.call(context)
检测脚本:
def detect_context_leak():
"""检测上下文是否在任务间泄露"""
# 发送两个完全无关的任务
task1 = "总结这篇关于AI的文章:[文章A]"
task2 = "总结这篇关于烹饪的文章:[文章B]"
response1 = agent.process(task1)
response2 = agent.process(task2)
# 检查task2的回答是否提到task1的内容
if "AI" in response2 or "人工智能" in response2:
print("⚠️ 检测到上下文泄露!task2提到了task1的内容")
return True
return False
问题4:多Agent协调失败
症状:Multi-Agent系统中某个Agent没有收到前一个Agent的输出,或输出格式不兼容。
诊断策略:
def diagnose_coordination_failure(pipeline_run_id):
# 1. 检查每个Agent的输出
stages = ["strategy", "research", "writer", "editor"]
outputs = {}
for stage in stages:
output = get_agent_output(pipeline_run_id, stage)
outputs[stage] = output
if not output:
print(f"❌ {stage} Agent没有产生输出")
return
# 2. 检查输出格式
if stage in ["strategy", "research"]:
if not isinstance(output, dict):
print(f"⚠️ {stage} 输出格式错误,期望dict,实际{type(output)}")
# 3. 检查数据流
for i in range(len(stages) - 1):
current_stage = stages[i]
next_stage = stages[i + 1]
# 检查下一个Agent是否使用了上一个Agent的输出
next_input = get_agent_input(pipeline_run_id, next_stage)
if outputs[current_stage] not in str(next_input):
print(f"⚠️ {next_stage} 未接收到 {current_stage} 的输出")
Autonomous PM案例(第5章、第10章深入):
失败场景:
- Strategy Agent输出JSON:{"tasks": [...]}
- Research Agent期望YAML格式
- 结果:Research Agent解析失败,整个Pipeline中断
解决方案:
1. 标准化输出格式(统一用JSON)
2. 添加格式转换层
3. 在每个Agent的输入端验证格式
📚 深入学习 想系统学习多Agent调试?问AI: “Multi-Agent系统中常见的协调失败模式有哪些?如何设计健壮的Agent间通信协议?”
14.3 n8n可视化调试
对于使用n8n编排的工作流(第7章、第11章深入讨论),调试变得直观得多。
n8n的内置可视化
案例引用:n8n Workflow Orchestration(第7章:凭证隔离架构 → 第11章:基础设施集成 → 本章:调试实战)
优势:
-
每个节点的输入输出都可见
[Webhook] → 接收到请求 ↓ [Extract Data] → 提取字段 {user_id: 123, action: "deploy"} ↓ [Check Permissions] → 权限验证通过 ↓ [Execute Script] → ❌ 失败:SSH timeout -
可以单独重跑失败的节点
- 右键点击失败节点 → “Execute Node”
- 无需重跑整个workflow
-
可以修改输入进行测试
- 点击节点 → “Edit” → 修改输入数据
- 立即看到输出变化
调试实战:Self-healing Server
场景:第11章的Self-healing Server用n8n编排了一个修复流程,但某次执行失败了。
n8n Workflow:
[Cron: 每5分钟]
↓
[Health Check] ← SSH检查服务状态
↓
[Condition: 服务异常?]
├─ 是 → [Diagnose] ← Agent分析日志
│ ↓
│ [Generate Fix] ← Agent生成修复命令
│ ↓
│ [Execute Fix] ← SSH执行
│ ↓
│ [Verify] ← 再次检查
│
└─ 否 → [Log: All Good]
失败案例:
- 打开n8n执行历史:看到“Execute Fix“节点失败
- 点击失败节点:看到输入和输出
输入:{ "command": "sudo systemctl restart nginx", "host": "prod-server-1" } 输出:{ "error": "Permission denied (publickey)" } - 问题定位:SSH密钥失效
- 修复:
- 在n8n的Credentials中更新SSH密钥
- 点击“Retry Execution“ → 成功
整个调试过程不到2分钟,无需查看任何日志文件。
n8n调试技巧
技巧1:使用“Pin Data“固定测试数据
在开发workflow时,不想每次都等真实数据:
1. 执行一次workflow,获取真实数据
2. 右键节点 → "Pin Data" → 保存这次的输出
3. 后续调试时直接用这个固定数据
技巧2:添加“Function“节点打印调试信息
// 在任意位置插入Function节点
const input = $input.all();
console.log("当前数据:", JSON.stringify(input, null, 2));
return input; // 原样传递给下一个节点
在n8n的执行日志中可以看到打印内容。
技巧3:用“Error Trigger“捕获异常
[主Workflow] → [可能失败的节点]
↓ (失败时)
[Error Trigger]
↓
[Send Alert] → Telegram通知
↓
[Log to File] → 保存失败详情
这样所有失败都会自动通知你,并保存现场。
💡 AI辅助提示 n8n节点配置复杂?问AI: “我想在n8n中[具体功能],应该用哪个节点?如何配置?” AI可以给你详细的节点配置示例。
对比:n8n vs 纯代码调试
| 维度 | n8n | 纯代码 |
|---|---|---|
| 可视化 | ✅ 每个步骤清晰可见 | ❌ 需要自己加日志 |
| 失败定位 | ✅ 立即知道哪个节点失败 | ⚠️ 需要分析日志/堆栈 |
| 部分重试 | ✅ 右键重跑失败节点 | ❌ 需要重跑整个流程 |
| 实时修改 | ✅ 直接在UI改 | ⚠️ 改代码+重启 |
| 版本控制 | ⚠️ JSON格式(可以Git管理) | ✅ 原生代码 |
| 复杂逻辑 | ⚠️ 适合中等复杂度 | ✅ 无限制 |
推荐策略:
- 简单到中等复杂度的工作流:优先用n8n(调试效率高)
- 高度复杂或需要深度定制:用代码(灵活性高)
- 混合:n8n做编排,复杂逻辑用Code节点或外部API
本章小结
关键要点:
- 结构化日志是基础:记录输入、决策、输出、错误,形成完整追踪链
- Metrics让问题浮出水面:成功率、响应时间、Token消耗是健康度核心指标
- Traces串联多Agent调用:看清整个Pipeline的瓶颈
- 常见问题有套路:不响应、质量下降、上下文混乱、协调失败各有诊断方法
- n8n提供最佳调试体验:可视化、可重跑、可修改
调试清单:
- 为每个Agent添加结构化日志
- 暴露Prometheus metrics端点
- 在CI/CD中集成基础健康检查
- 文档化常见错误和解决方案
- 设置告警规则(成功率<95%、P95延迟>10s)
反模式警告:
- ❌ 只在出问题时才加日志(太晚了)
- ❌ 日志用print()而不是结构化(无法分析)
- ❌ 没有保留历史数据(无法对比趋势)
- ❌ 完全依赖第三方平台(平台故障时你也瞎了)
下一章预告:
你现在有了可观测性和调试工具,但如何避免常见陷阱?如何设计出健壮、可维护的Agent系统?第15章将分享最佳实践和反模式,帮你少走弯路。
参考资料
本章引用的案例均来自 awesome-openclaw-usecases 社区仓库: