第11章:基础设施与DevOps自动化
“基础设施不应该需要人类24小时待命。Agent可以成为永不休息的运维工程师。”
凌晨3点,你的手机响了。生产环境的Kubernetes集群出现了Pod崩溃,监控系统发出了告警。你睡眼惺忪地爬起来,SSH登录服务器,检查日志,重启服务,然后祈祷不会再有问题。第二天一早,你疲惫不堪地回到办公室,发誓要“找时间“自动化这些重复的运维工作。
但时间永远不够。直到你遇到了Agent。
本章将展示如何构建自愈式基础设施系统(Self-healing Infrastructure),让AI Agent成为你的24/7运维助手。它不仅能监控系统健康状态,还能自主诊断问题、执行修复操作、管理基础设施变更,并在必要时唤醒你。
11.1 为什么基础设施需要Agent
传统运维的三大痛点
1. 24/7待命的疲惫
现代基础设施永不休息,但人类需要睡眠。传统的解决方案是轮班值班,但这带来了高昂的人力成本和生活质量下降。
传统运维流程:
告警触发 → 人类收到通知 → 登录系统
→ 查看日志 → 诊断问题 → 执行修复
→ 验证结果 → 记录文档
平均响应时间: 15-30分钟(如果人在睡觉可能更长)
2. 重复性工作的低效
根据调查,约70%的运维故障是“曾经见过的问题“。每次都需要人工执行相同的诊断和修复步骤,既浪费时间又容易出错。
常见的重复场景:
- Pod内存溢出崩溃 → 重启Pod,清理缓存
- 磁盘空间不足 → 清理日志,扩容存储
- 证书即将过期 → 续期证书,重启服务
- 部署失败 → 回滚到上一个稳定版本
- 数据库连接池耗尽 → 重启应用,调整配置
3. 知识散落与经验流失
运维知识往往存在于资深工程师的大脑中,或者散落在Confluence、Slack历史消息、私人笔记里。当关键人员离职时,这些宝贵的知识也随之流失。
💡 AI辅助提示
不熟悉Kubernetes、Docker或DevOps概念?没关系!遇到不懂的术语,随时问AI:
- “什么是Kubernetes Pod?为什么会崩溃?”
- “Docker容器和虚拟机有什么区别?”
- “DevOps的核心理念是什么?”
AI会用通俗的语言解释这些概念,帮你快速建立基础认知。
Agent如何改变游戏规则
1. 永不休息的监控与响应
Agent可以24/7运行,定期检查系统健康状态,并在发现问题时立即响应。它不需要睡眠,不会疲劳,不会因为假期而缺席。
# HEARTBEAT.md - Agent的定期检查清单
checks:
- name: kubernetes-health
interval: 5min
action: check_pod_status
- name: disk-usage
interval: 15min
action: monitor_disk_space
threshold: 80%
- name: certificate-expiry
interval: 1day
action: check_ssl_certificates
alert_days: 7
- name: backup-verification
interval: 1hour
action: verify_latest_backup
2. 自动化的知识积累
每次Agent处理问题时,它都会记录详细的诊断过程和解决方案。这些记录会存储在Git版本控制的知识库中,成为组织的运维资产。
# memory/incidents/2024-02-20-pod-restart.md
## 事件: api-gateway Pod频繁重启
**检测时间**: 2024-02-20 03:15:22
**严重级别**: Warning
### 诊断过程
1. 检查Pod状态: CrashLoopBackOff
2. 查看容器日志: OutOfMemoryError
3. 检查资源限制: memory limit 512Mi
4. 检查实际使用: 接近500Mi,触发OOM
### 采取行动
- 临时措施: 重启Pod(成功)
- 永久修复: 提交PR增加memory limit到1Gi
- PR链接: https://git.example.com/infra/k8s/pull/123
### 预防建议
- 设置内存使用告警阈值为70%
- 增加Horizontal Pod Autoscaler
- 优化应用内存使用
### 知识更新
更新到知识库: docs/troubleshooting/k8s-memory-issues.md
3. 渐进式自动化
Agent系统可以从简单的监控和告警开始,逐步演进到自主修复。你可以根据团队的舒适度和系统的风险级别,选择合适的自动化层次。
| 自动化层次 | Agent行为 | 适用场景 | 示例 |
|---|---|---|---|
| Level 1 | 只监控,发现问题立即告警 | 高风险操作,新部署的系统 | 数据库主从切换检测 |
| Level 2 | 提供诊断建议,等待人工确认 | 中等风险,需要人工判断 | 建议回滚部署 |
| Level 3 | 自动修复,事后通知 | 低风险,常见问题 | 重启崩溃的Pod |
| Level 4 | 完全自主,只在异常时告警 | 极低风险,成熟场景 | 证书自动续期 |
真实收益:某创业公司的案例
一家30人的SaaS创业公司实施了Self-healing Agent系统后的变化:
实施前(3个月数据):
- 平均每周夜间告警: 4.2次
- 平均响应时间: 23分钟
- 运维人员睡眠质量: 😫😫😫
- 重复性故障占比: 73%
实施后(3个月数据):
- Agent自动处理的事件: 87%
- 需要人工介入的事件: 13%
- 平均响应时间: 2分钟(Agent)
- 运维人员睡眠质量: 😊😊😊
- 减少的On-call压力: 显著
成本效益:
- Agent开发和维护成本: 约40工时/月
- 节省的运维响应时间: 约160工时/月
- ROI: 400%
- 附加价值: 团队士气提升,知识积累
📚 深入学习
想了解更多关于自动化层次和风险评估的内容?可以问AI:
- “DevOps中的自动化成熟度模型有哪些?”
- “如何评估运维操作的风险级别?”
- “Site Reliability Engineering(SRE)的核心原则是什么?”
11.2 Self-healing模式深度实践1
现在让我们构建一个完整的自愈式服务器系统。这个案例会展示从健康监控到自动修复的完整流程,涉及真实的DevOps工具栈。
系统架构概览
┌─────────────────────────────────────────────────────────┐
│ OpenClaw Agent │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Health Check │ │ Diagnosis │ │ Repair │ │
│ │ Scripts │→ │ Engine │→ │ Actions │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└────────────┬────────────────────────────────┬───────────┘
│ │
┌────▼─────┐ ┌────▼─────┐
│ Cron │ │ Git │
│ Scheduler│ │ Audit │
└────┬─────┘ └──────────┘
│
┌────────┼────────┐
│ │ │
┌───▼───┐ ┌─▼──┐ ┌───▼────┐
│ SSH │ │K8s │ │Terraform│
│Servers│ │API │ │ Ansible │
└───────┘ └────┘ └────────┘
核心组件设计
1. 健康检查层(Health Check Layer)
这是Agent的“感知器官“,定期收集系统状态信息。
#!/bin/bash
# scripts/health-check.sh
set -euo pipefail
# 检查磁盘使用率
check_disk_usage() {
local threshold=80
local usage=$(df -h / | awk 'NR==2 {print $5}' | sed 's/%//')
if [ "$usage" -gt "$threshold" ]; then
echo "CRITICAL: Disk usage at ${usage}%"
return 1
else
echo "OK: Disk usage at ${usage}%"
return 0
fi
}
# 检查关键服务
check_services() {
local services=("nginx" "postgresql" "redis")
for service in "${services[@]}"; do
if systemctl is-active --quiet "$service"; then
echo "OK: $service is running"
else
echo "CRITICAL: $service is not running"
return 1
fi
done
return 0
}
# 检查K8s Pod状态
check_k8s_pods() {
local namespace="production"
# 获取所有非Running状态的Pod
local failing_pods=$(kubectl get pods -n "$namespace" \
--field-selector=status.phase!=Running \
-o json | jq -r '.items[].metadata.name')
if [ -n "$failing_pods" ]; then
echo "CRITICAL: Failing pods in $namespace:"
echo "$failing_pods"
return 1
else
echo "OK: All pods running in $namespace"
return 0
fi
}
# 检查证书有效期
check_ssl_certificates() {
local domains=("api.example.com" "app.example.com")
local warn_days=7
for domain in "${domains[@]}"; do
local expiry_date=$(echo | openssl s_client -servername "$domain" \
-connect "$domain":443 2>/dev/null | openssl x509 -noout -enddate \
| cut -d= -f2)
local expiry_epoch=$(date -d "$expiry_date" +%s)
local now_epoch=$(date +%s)
local days_left=$(( ($expiry_epoch - $now_epoch) / 86400 ))
if [ "$days_left" -lt "$warn_days" ]; then
echo "WARNING: SSL certificate for $domain expires in $days_left days"
return 1
else
echo "OK: SSL certificate for $domain valid for $days_left days"
fi
done
return 0
}
# 主检查流程
main() {
echo "=== Health Check Report $(date) ==="
check_disk_usage
check_services
check_k8s_pods
check_ssl_certificates
echo "=== End of Report ==="
}
main "$@"
🔧 遇到错误?
运行健康检查脚本时遇到问题?把错误信息复制给AI:
- “我运行health-check.sh时报错: [粘贴错误], 是什么原因?”
- “kubectl命令找不到,如何安装和配置?”
- “如何配置SSH免密码登录到远程服务器?”
2. 诊断引擎(Diagnosis Engine)
当健康检查发现问题时,Agent需要深入分析根本原因。
# scripts/diagnosis.py
import subprocess
import json
from datetime import datetime, timedelta
class DiagnosisEngine:
def __init__(self):
self.findings = []
def diagnose_pod_crash(self, pod_name, namespace="production"):
"""诊断Pod崩溃的原因"""
# 1. 获取Pod状态
pod_status = self._get_pod_status(pod_name, namespace)
self.findings.append(f"Pod状态: {pod_status['phase']}")
# 2. 检查最近的容器日志
logs = self._get_container_logs(pod_name, namespace, tail=100)
# 3. 分析常见错误模式
if "OutOfMemoryError" in logs or "OOMKilled" in pod_status.get("reason", ""):
self.findings.append("诊断: 内存不足导致Pod被OOM Killer终止")
self.findings.append("建议: 增加memory limit或优化应用内存使用")
return "OOM"
elif "CrashLoopBackOff" in pod_status.get("status", ""):
restart_count = pod_status.get("restartCount", 0)
self.findings.append(f"诊断: Pod反复崩溃,已重启{restart_count}次")
# 检查启动探针
if "Liveness probe failed" in logs:
self.findings.append("原因: Liveness probe失败")
return "LIVENESS_FAILED"
elif "Readiness probe failed" in logs:
self.findings.append("原因: Readiness probe失败")
return "READINESS_FAILED"
else:
self.findings.append("原因: 应用启动失败,检查日志获取详细信息")
return "STARTUP_FAILED"
elif "ImagePullBackOff" in pod_status.get("status", ""):
self.findings.append("诊断: 无法拉取容器镜像")
self.findings.append("建议: 检查镜像名称、tag和仓库权限")
return "IMAGE_PULL_FAILED"
# 4. 检查资源配额
resource_usage = self._get_resource_usage(pod_name, namespace)
if resource_usage['cpu_percent'] > 90:
self.findings.append(f"警告: CPU使用率 {resource_usage['cpu_percent']}%")
if resource_usage['memory_percent'] > 90:
self.findings.append(f"警告: 内存使用率 {resource_usage['memory_percent']}%")
return "UNKNOWN"
def diagnose_disk_full(self, hostname):
"""诊断磁盘空间不足"""
# 1. 找出占用空间最多的目录
cmd = f"ssh {hostname} 'du -sh /var/* 2>/dev/null | sort -rh | head -10'"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
self.findings.append("磁盘空间占用TOP 10:")
self.findings.append(result.stdout)
# 2. 检查日志文件大小
log_dirs = ["/var/log", "/var/log/nginx", "/var/log/postgresql"]
for log_dir in log_dirs:
cmd = f"ssh {hostname} 'du -sh {log_dir} 2>/dev/null'"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
self.findings.append(f"{log_dir}: {result.stdout.strip()}")
# 3. 检查是否有大型core dumps
cmd = f"ssh {hostname} 'find /var -name core.* -size +100M 2>/dev/null'"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.stdout.strip():
self.findings.append("发现大型core dump文件:")
self.findings.append(result.stdout)
return "DISK_FULL"
def _get_pod_status(self, pod_name, namespace):
"""获取Pod状态"""
cmd = f"kubectl get pod {pod_name} -n {namespace} -o json"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
pod_data = json.loads(result.stdout)
status = pod_data['status']
return {
'phase': status.get('phase'),
'reason': status.get('reason', ''),
'status': status.get('containerStatuses', [{}])[0].get('state', {}),
'restartCount': status.get('containerStatuses', [{}])[0].get('restartCount', 0)
}
def _get_container_logs(self, pod_name, namespace, tail=100):
"""获取容器日志"""
cmd = f"kubectl logs {pod_name} -n {namespace} --tail={tail}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return result.stdout
def _get_resource_usage(self, pod_name, namespace):
"""获取资源使用率"""
cmd = f"kubectl top pod {pod_name} -n {namespace}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
# 解析输出(示例: NAME CPU(cores) MEMORY(bytes))
lines = result.stdout.strip().split('\n')
if len(lines) > 1:
parts = lines[1].split()
return {
'cpu_percent': float(parts[1].replace('m', '')) / 10, # 简化计算
'memory_percent': 75 # 这里需要更复杂的计算,简化处理
}
return {'cpu_percent': 0, 'memory_percent': 0}
def get_report(self):
"""生成诊断报告"""
return "\n".join(self.findings)
# 使用示例
if __name__ == "__main__":
import sys
engine = DiagnosisEngine()
issue_type = sys.argv[1] if len(sys.argv) > 1 else "pod_crash"
if issue_type == "pod_crash":
pod_name = sys.argv[2]
result = engine.diagnose_pod_crash(pod_name)
print(f"诊断结果: {result}")
print("\n详细报告:")
print(engine.get_report())
elif issue_type == "disk_full":
hostname = sys.argv[2]
result = engine.diagnose_disk_full(hostname)
print(f"诊断结果: {result}")
print("\n详细报告:")
print(engine.get_report())
3. 修复执行层(Repair Action Layer)
基于诊断结果,Agent可以执行相应的修复操作。这是最需要谨慎设计的部分。
# scripts/repair.py
import subprocess
import time
from datetime import datetime
import os
class RepairEngine:
def __init__(self, dry_run=False):
self.dry_run = dry_run
self.actions_taken = []
self.git_repo = "/home/agent/infrastructure"
def repair_pod_oom(self, pod_name, namespace="production"):
"""修复OOM问题"""
# 1. 立即重启Pod(临时措施)
if not self.dry_run:
self._restart_pod(pod_name, namespace)
self.actions_taken.append(f"重启了Pod: {pod_name}")
else:
self.actions_taken.append(f"[DRY RUN] 将重启Pod: {pod_name}")
# 2. 创建PR增加memory limit(永久修复)
current_limit = self._get_memory_limit(pod_name, namespace)
new_limit = self._calculate_new_limit(current_limit)
pr_branch = f"fix/increase-memory-{pod_name}-{int(time.time())}"
pr_message = f"Increase memory limit for {pod_name} from {current_limit} to {new_limit}"
if not self.dry_run:
self._create_pr_for_resource_change(
pod_name, namespace, "memory", new_limit,
pr_branch, pr_message
)
self.actions_taken.append(f"创建PR: {pr_branch}")
else:
self.actions_taken.append(f"[DRY RUN] 将创建PR增加内存到 {new_limit}")
return True
def repair_disk_full(self, hostname):
"""修复磁盘空间不足"""
# 1. 清理旧日志
retention_days = 7
if not self.dry_run:
cmd = f"""ssh {hostname} 'find /var/log -name "*.log" -mtime +{retention_days} -delete'"""
subprocess.run(cmd, shell=True)
self.actions_taken.append(f"清理了 {hostname} 上超过 {retention_days} 天的日志")
else:
self.actions_taken.append(f"[DRY RUN] 将清理 {hostname} 上的旧日志")
# 2. 压缩未压缩的日志
if not self.dry_run:
cmd = f"""ssh {hostname} 'find /var/log -name "*.log" -size +100M -exec gzip {{}} \\;'"""
subprocess.run(cmd, shell=True)
self.actions_taken.append(f"压缩了大型日志文件")
# 3. 删除core dumps
if not self.dry_run:
cmd = f"""ssh {hostname} 'find /var -name core.* -delete'"""
subprocess.run(cmd, shell=True)
self.actions_taken.append(f"删除了core dump文件")
# 4. 如果还是不够,创建扩容ticket
remaining_space = self._check_disk_space(hostname)
if remaining_space < 20: # 少于20%
self._create_expansion_ticket(hostname, remaining_space)
self.actions_taken.append(f"创建了磁盘扩容工单")
return True
def repair_certificate_expiry(self, domain):
"""续期SSL证书"""
# 使用Let's Encrypt自动续期
if not self.dry_run:
cmd = f"certbot renew --cert-name {domain}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode == 0:
self.actions_taken.append(f"成功续期 {domain} 的证书")
# 重新加载nginx
subprocess.run("systemctl reload nginx", shell=True)
self.actions_taken.append("重新加载了nginx配置")
else:
self.actions_taken.append(f"证书续期失败: {result.stderr}")
return False
else:
self.actions_taken.append(f"[DRY RUN] 将续期 {domain} 的证书")
return True
def repair_failed_deployment(self, deployment_name, namespace="production"):
"""回滚失败的部署"""
# 1. 检查最近的部署历史
cmd = f"kubectl rollout history deployment/{deployment_name} -n {namespace}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
# 2. 回滚到上一个版本
if not self.dry_run:
cmd = f"kubectl rollout undo deployment/{deployment_name} -n {namespace}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode == 0:
self.actions_taken.append(f"成功回滚 {deployment_name}")
# 3. 等待rollout完成
cmd = f"kubectl rollout status deployment/{deployment_name} -n {namespace}"
subprocess.run(cmd, shell=True, timeout=300)
else:
self.actions_taken.append(f"回滚失败: {result.stderr}")
return False
else:
self.actions_taken.append(f"[DRY RUN] 将回滚 {deployment_name}")
return True
def _restart_pod(self, pod_name, namespace):
"""重启Pod"""
cmd = f"kubectl delete pod {pod_name} -n {namespace}"
subprocess.run(cmd, shell=True)
time.sleep(5) # 等待Pod重新创建
def _get_memory_limit(self, pod_name, namespace):
"""获取当前memory limit"""
cmd = f"""kubectl get pod {pod_name} -n {namespace} -o jsonpath='{{.spec.containers[0].resources.limits.memory}}'"""
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return result.stdout.strip()
def _calculate_new_limit(self, current_limit):
"""计算新的memory limit(增加50%)"""
# 简化处理: 512Mi -> 768Mi, 1Gi -> 1.5Gi
if "Mi" in current_limit:
value = int(current_limit.replace("Mi", ""))
new_value = int(value * 1.5)
return f"{new_value}Mi"
elif "Gi" in current_limit:
value = float(current_limit.replace("Gi", ""))
new_value = value * 1.5
return f"{new_value}Gi"
return current_limit
def _create_pr_for_resource_change(self, pod_name, namespace, resource_type, new_value, branch_name, commit_message):
"""创建PR修改资源配置"""
os.chdir(self.git_repo)
# 1. 创建新分支
subprocess.run(f"git checkout -b {branch_name}", shell=True)
# 2. 修改配置文件(这里假设使用kustomize)
config_file = f"k8s/overlays/{namespace}/{pod_name}/kustomization.yaml"
# 实际修改逻辑会更复杂,这里简化处理
# 3. 提交变更
subprocess.run(f"git add {config_file}", shell=True)
subprocess.run(f"git commit -m '{commit_message}'", shell=True)
# 4. 推送并创建PR(使用GitHub CLI或API)
subprocess.run(f"git push origin {branch_name}", shell=True)
subprocess.run(
f"gh pr create --title '{commit_message}' --body 'Auto-generated by Self-healing Agent'",
shell=True
)
def _check_disk_space(self, hostname):
"""检查剩余磁盘空间百分比"""
cmd = f"ssh {hostname} \"df -h / | awk 'NR==2 {{print 100-$5}}' | sed 's/%//'\""
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return int(result.stdout.strip())
def _create_expansion_ticket(self, hostname, remaining_space):
"""创建磁盘扩容工单"""
# 这里可以集成到Jira、Linear等项目管理工具
ticket_content = f"""
主机: {hostname}
剩余空间: {remaining_space}%
创建时间: {datetime.now()}
优先级: High
需要扩容根分区容量。
"""
# 实际实现会调用API创建工单
print(f"创建工单:\n{ticket_content}")
def get_report(self):
"""生成修复报告"""
report = f"\n=== 修复报告 {datetime.now()} ===\n"
report += "\n".join(self.actions_taken)
report += "\n=== 报告结束 ===\n"
return report
# 使用示例
if __name__ == "__main__":
import sys
# 默认是dry-run模式,需要显式传递--execute才会真正执行
dry_run = "--execute" not in sys.argv
engine = RepairEngine(dry_run=dry_run)
issue_type = sys.argv[1]
if issue_type == "pod_oom":
pod_name = sys.argv[2]
success = engine.repair_pod_oom(pod_name)
elif issue_type == "disk_full":
hostname = sys.argv[2]
success = engine.repair_disk_full(hostname)
elif issue_type == "cert_expiry":
domain = sys.argv[2]
success = engine.repair_certificate_expiry(domain)
elif issue_type == "failed_deployment":
deployment_name = sys.argv[2]
success = engine.repair_failed_deployment(deployment_name)
else:
print(f"未知的问题类型: {issue_type}")
sys.exit(1)
print(engine.get_report())
sys.exit(0 if success else 1)
💡 AI辅助提示
Python脚本看起来复杂?可以问AI帮你理解:
- “这段Python代码做了什么?能用简单的语言解释吗?”
- “subprocess.run是什么?如何使用?”
- “如何调试Python脚本中的错误?”
OpenClaw Agent集成
现在让我们把这些脚本集成到OpenClaw Agent中,构建一个真正智能的自愈系统。
# AGENTS.md - Self-healing Agent配置
你是一个DevOps自愈Agent,负责24/7监控和维护基础设施。
## 职责
1. **定期健康检查**: 每5-15分钟检查一次关键系统
2. **问题诊断**: 发现异常时深入分析根本原因
3. **自动修复**: 在安全范围内自主执行修复操作
4. **事件记录**: 详细记录所有诊断和修复过程
5. **人工升级**: 超出能力范围时立即通知人类
## 工作流程
当收到heartbeat时:
1. 运行 `/scripts/health-check.sh`
2. 如果发现问题:
- 立即运行诊断: `python /scripts/diagnosis.py <issue_type> <params>`
- 评估风险级别
- 如果风险可控,运行修复: `python /scripts/repair.py <issue_type> <params>`
- 记录完整过程到 `memory/incidents/YYYY-MM-DD-<issue>.md`
- 如果是重大问题,立即通知管理员
3. 如果一切正常,回复 `HEARTBEAT_OK`
## 决策规则
### 可以自动修复(无需确认)
- Pod内存OOM崩溃 → 重启Pod + 创建增加内存的PR
- 磁盘使用 > 80% → 清理旧日志
- SSL证书 < 7天过期 → 自动续期
- 单个Pod崩溃 → 重启
### 需要人工确认
- 数据库主从切换
- 大规模服务重启(> 10个实例)
- 磁盘扩容
- 安全相关变更
### 必须立即告警
- 数据备份失败
- 生产数据库不可用
- 关键API持续错误率 > 10%
- 异常的资源消耗(可能是攻击)
## 安全防护
- 所有变更通过Git PR,不直接修改生产环境
- 重要操作有dry-run模式,先模拟再执行
- 每日审计日志,检查Agent行为
- 凭证通过n8n隔离,Agent不直接持有
# HEARTBEAT.md - 定期任务配置
# Self-healing Agent的心跳检查清单
last_check: 2024-02-20T10:15:00Z
# 每5分钟执行一次
frequent_checks:
- name: k8s-pod-health
command: /scripts/health-check.sh check_k8s_pods
last_run: 2024-02-20T10:15:00Z
last_status: OK
- name: critical-services
command: /scripts/health-check.sh check_services
last_run: 2024-02-20T10:15:00Z
last_status: OK
# 每15分钟执行一次
moderate_checks:
- name: disk-usage
command: /scripts/health-check.sh check_disk_usage
last_run: 2024-02-20T10:00:00Z
last_status: WARNING
last_finding: "Disk usage at 82% on web-01"
- name: resource-usage
command: kubectl top nodes
last_run: 2024-02-20T10:00:00Z
last_status: OK
# 每天执行一次
daily_checks:
- name: ssl-certificates
command: /scripts/health-check.sh check_ssl_certificates
last_run: 2024-02-20T09:00:00Z
last_status: OK
- name: backup-verification
command: /scripts/verify-backups.sh
last_run: 2024-02-20T09:00:00Z
last_status: OK
- name: security-audit
command: /scripts/audit-logs.sh
last_run: 2024-02-20T09:00:00Z
last_status: OK
# 需要关注的问题
active_issues:
- issue: "web-01 disk usage high"
detected: 2024-02-20T10:00:00Z
status: "repair_in_progress"
actions_taken:
- "Cleaned old logs"
- "Compressed large files"
next_check: 2024-02-20T10:30:00Z
Cron任务配置
除了心跳检查,我们还可以设置精确的定时任务:
# crontab -e
# 每5分钟: 关键健康检查
*/5 * * * * /home/agent/openclaw cron health-check-critical
# 每15分钟: 完整健康检查
*/15 * * * * /home/agent/openclaw cron health-check-full
# 每小时: 资源使用趋势分析
0 * * * * /home/agent/openclaw cron analyze-resource-trends
# 每6小时: 容量规划检查
0 */6 * * * /home/agent/openclaw cron capacity-planning
# 每天凌晨1点: 完整系统审计
0 1 * * * /home/agent/openclaw cron daily-audit
# 每天凌晨2点: 备份验证
0 2 * * * /home/agent/openclaw cron verify-backups
# 每周日凌晨3点: 清理旧的事件记录
0 3 * * 0 /home/agent/openclaw cron cleanup-old-incidents
真实场景演练
让我们通过几个真实场景,看看Agent如何处理问题。
场景1: Kubernetes Pod内存溢出
[2024-02-20 03:15:22] Heartbeat触发健康检查
[检查] kubectl get pods -n production
发现: api-gateway-7d9f5b8c6-x7k2m 状态 CrashLoopBackOff
[诊断] python diagnosis.py pod_crash api-gateway-7d9f5b8c6-x7k2m
结果: OOM - 内存使用接近limit 512Mi
[决策] 风险级别: Low (可自动修复)
- 影响: 单个Pod,有其他副本在运行
- 操作: 重启Pod + 创建增加内存PR
[修复] python repair.py pod_oom api-gateway-7d9f5b8c6-x7k2m
✓ Pod已重启,恢复正常
✓ 创建PR: fix/increase-memory-api-gateway-1708398922
✓ PR链接: https://github.com/company/infra/pull/456
[记录] 事件已记录到 memory/incidents/2024-02-20-api-gateway-oom.md
[通知] Slack消息发送到 #devops:
"🤖 Self-healing Agent 已自动处理 api-gateway OOM问题
- 重启了崩溃的Pod
- 创建了增加内存的PR (#456)
- 详情: [链接到事件记录]"
总响应时间: 2分23秒
人工介入: 无需
场景2: 磁盘空间告警
[2024-02-20 14:30:15] 磁盘使用率检查
[检查] df -h on web-01
发现: 磁盘使用 87% (超过阈值 80%)
[诊断] python diagnosis.py disk_full web-01
结果: /var/log 占用 45GB, 大量未压缩的nginx日志
[决策] 风险级别: Low (可自动修复)
- 影响: 不影响服务,只是清理空间
- 操作: 清理7天前的日志,压缩大文件
[修复] python repair.py disk_full web-01 --execute
✓ 删除了 12GB 的旧日志
✓ 压缩了 18GB 的大型日志文件
✓ 删除了 3GB 的core dumps
当前使用率: 64%
[记录] 事件已记录到 memory/incidents/2024-02-20-web-01-disk-cleanup.md
[通知] 低优先级通知(不打断工作):
"💾 web-01磁盘清理完成: 87% → 64%"
总响应时间: 8分15秒(包括清理操作)
人工介入: 无需
场景3: 部署失败回滚
[2024-02-20 18:45:30] ArgoCD webhook告警: api-gateway部署失败
[检查] kubectl rollout status deployment/api-gateway -n production
状态: ProgressDeadlineExceeded - 新版本Pod无法启动
[诊断] python diagnosis.py pod_crash api-gateway-7d9f5b8c6-y8m3n
结果: STARTUP_FAILED - 新版本配置错误导致启动失败
[决策] 风险级别: High (需要快速决策)
- 影响: 生产环境,虽有旧版本Pod在运行但正在替换
- 操作: 立即回滚到上一个稳定版本
[通知] Slack实时消息到 #incidents:
"🚨 api-gateway部署失败,准备回滚
- 新版本Pod无法启动
- 即将回滚到上一个稳定版本
- 30秒后自动执行,回复 'ABORT' 取消"
[等待] 30秒... (无人取消)
[修复] python repair.py failed_deployment api-gateway --execute
✓ 回滚到 revision 23
✓ 等待rollout完成...
✓ 所有Pod运行正常
[记录] 事件已记录,包括失败的deployment配置
[通知] Slack消息:
"✅ api-gateway已成功回滚
- 当前版本: v1.2.3 (revision 23)
- 所有Pod健康
- 需要排查v1.2.4失败原因"
总响应时间: 3分45秒
人工介入: 监控但未取消(被动确认)
场景4: SSL证书即将过期
[2024-02-20 09:00:00] 每日证书检查
[检查] check_ssl_certificates
发现: api.example.com 证书将在5天后过期
[诊断] 确认是Let's Encrypt证书,可自动续期
[决策] 风险级别: Medium
- 影响: 证书过期会导致服务不可用
- 操作: 立即续期(提前而非最后一刻)
[修复] python repair.py cert_expiry api.example.com --execute
✓ 运行 certbot renew
✓ 新证书有效期至 2024-05-20
✓ 重新加载nginx配置
[验证] 检查新证书
✓ api.example.com 证书有效期: 90天
[记录] 续期成功记录
[通知] 低优先级通知:
"🔒 api.example.com SSL证书已自动续期
- 新有效期: 90天
- 下次检查: 85天后"
总响应时间: 1分30秒
人工介入: 无需
11.3 n8n集成与工作流自动化
在第7章我们讨论了n8n作为凭证隔离层的安全优势。在基础设施自动化场景中,n8n还能提供强大的工作流编排能力。
为什么需要n8n?
1. 凭证安全隔离
Agent不直接持有云服务的API密钥、数据库密码等敏感凭证。所有需要凭证的操作通过n8n的Webhook接口调用,凭证只存储在n8n中。
Agent → Webhook → n8n → AWS/GCP/Azure API
(无凭证) (凭证存储)
2. 可视化调试
n8n提供图形化的工作流界面,每次执行都有详细的日志,可以清楚地看到数据在各个节点间的流转。
3. 低代码集成
连接不同的服务和API不需要写代码,通过拖拽节点即可完成。这降低了维护成本,也让非程序员能够参与自动化流程设计。
架构设计
┌─────────────────────┐
│ OpenClaw Agent │
│ │
│ 监控、诊断、决策 │
└──────────┬──────────┘
│ Webhook
▼
┌─────────────────────┐
│ n8n │
│ │
│ ┌───────────────┐ │
│ │ Workflow 1: │ │
│ │ Slack Alert │ │
│ └───────────────┘ │
│ │
│ ┌───────────────┐ │
│ │ Workflow 2: │ │
│ │ Scale K8s │ │
│ └───────────────┘ │
│ │
│ ┌───────────────┐ │
│ │ Workflow 3: │ │
│ │ Backup Status │ │
│ └───────────────┘ │
└──────────┬──────────┘
│
┌────┼────┬────────┐
▼ ▼ ▼ ▼
Slack K8s AWS Database
实战案例: 告警通知工作流
n8n Workflow 1: 智能告警分发
{
"name": "Infrastructure Alert Router",
"nodes": [
{
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"parameters": {
"path": "infra-alert",
"responseMode": "responseNode",
"authentication": "headerAuth"
}
},
{
"name": "Parse Alert",
"type": "n8n-nodes-base.code",
"parameters": {
"jsCode": "const alert = $input.item.json;\n\n// 提取关键信息\nconst severity = alert.severity;\nconst service = alert.service;\nconst message = alert.message;\n\n// 判断严重程度\nlet emoji = '🔵';\nlet priority = 'low';\nlet channel = '#monitoring';\n\nif (severity === 'critical') {\n emoji = '🚨';\n priority = 'high';\n channel = '#incidents';\n} else if (severity === 'warning') {\n emoji = '⚠️';\n priority = 'medium';\n channel = '#alerts';\n}\n\nreturn {\n json: {\n severity,\n service,\n message,\n emoji,\n priority,\n channel,\n timestamp: new Date().toISOString()\n }\n};"
}
},
{
"name": "Route by Severity",
"type": "n8n-nodes-base.switch",
"parameters": {
"rules": {
"rules": [
{
"value": "critical",
"operation": "equal",
"field": "severity"
},
{
"value": "warning",
"operation": "equal",
"field": "severity"
}
]
}
}
},
{
"name": "Send to Slack",
"type": "n8n-nodes-base.slack",
"parameters": {
"channel": "={{$json.channel}}",
"text": "={{$json.emoji}} *{{$json.service}}* - {{$json.severity}}\n\n{{$json.message}}\n\n_Time: {{$json.timestamp}}_",
"attachments": []
},
"credentials": {
"slackApi": "slack-workspace"
}
},
{
"name": "Critical: Also Send SMS",
"type": "n8n-nodes-base.twilio",
"parameters": {
"to": "+1234567890",
"message": "CRITICAL ALERT: {{$json.service}} - {{$json.message}}"
},
"credentials": {
"twilioApi": "twilio-account"
}
},
{
"name": "Log to Database",
"type": "n8n-nodes-base.postgres",
"parameters": {
"operation": "insert",
"table": "infrastructure_alerts",
"columns": "severity,service,message,timestamp",
"values": "={{$json.severity}},={{$json.service}},={{$json.message}},={{$json.timestamp}}"
},
"credentials": {
"postgres": "alert-db"
}
},
{
"name": "Respond to Agent",
"type": "n8n-nodes-base.respondToWebhook",
"parameters": {
"respondWith": "json",
"responseBody": "{\"status\": \"alert_sent\", \"channels\": [\"{{$json.channel}}\"]}"
}
}
],
"connections": {
"Webhook": {"main": [[{"node": "Parse Alert"}]]},
"Parse Alert": {"main": [[{"node": "Route by Severity"}]]},
"Route by Severity": {
"main": [
[{"node": "Send to Slack"}, {"node": "Critical: Also Send SMS"}],
[{"node": "Send to Slack"}]
]
},
"Send to Slack": {"main": [[{"node": "Log to Database"}]]},
"Critical: Also Send SMS": {"main": [[{"node": "Log to Database"}]]},
"Log to Database": {"main": [[{"node": "Respond to Agent"}]]}
}
}
Agent端调用
# 从Agent发送告警到n8n
import requests
def send_alert(severity, service, message):
webhook_url = "https://n8n.example.com/webhook/infra-alert"
headers = {
"Authorization": "Bearer <webhook-token>",
"Content-Type": "application/json"
}
payload = {
"severity": severity,
"service": service,
"message": message,
"source": "self-healing-agent",
"hostname": os.uname().nodename
}
response = requests.post(webhook_url, json=payload, headers=headers)
return response.json()
# 使用示例
send_alert(
severity="critical",
service="api-gateway",
message="Pod崩溃,已自动重启。正在创建增加内存的PR。"
)
实战案例: Kubernetes自动扩容
n8n Workflow 2: 智能扩容决策
这个工作流接收Agent的资源使用数据,分析趋势,并在必要时自动扩容。
Workflow流程:
1. Webhook接收资源使用数据
2. 查询Prometheus获取历史趋势
3. 使用机器学习模型预测未来负载
4. 如果预测会超载,触发扩容
5. 调用Kubernetes API增加副本数
6. 发送通知到Slack
7. 记录扩容事件
关键节点配置
// 节点: 分析资源使用趋势
const current_cpu = $input.item.json.cpu_percent;
const current_memory = $input.item.json.memory_percent;
const pod_count = $input.item.json.pod_count;
// 获取Prometheus历史数据
const prometheus_url = "http://prometheus:9090/api/v1/query";
const query = `rate(container_cpu_usage_seconds_total{pod=~"api-gateway-.*"}[5m])`;
// ... 省略HTTP请求代码 ...
// 简单的趋势分析(实际应该用更复杂的模型)
const avg_cpu_last_hour = 75.3;
const trend = (current_cpu - avg_cpu_last_hour) / avg_cpu_last_hour;
let should_scale = false;
let reason = "";
if (current_cpu > 80 && trend > 0.1) {
should_scale = true;
reason = `CPU使用率${current_cpu}%且呈上升趋势`;
} else if (current_memory > 85) {
should_scale = true;
reason = `内存使用率${current_memory}%`;
}
return {
json: {
should_scale,
reason,
current_pods: pod_count,
recommended_pods: should_scale ? pod_count + 2 : pod_count
}
};
# 节点: 执行Kubernetes扩容
# 使用n8n的Kubernetes节点
Operation: Update
Resource: Deployment
Namespace: production
Name: api-gateway
Update:
spec:
replicas: {{$json.recommended_pods}}
🔧 遇到错误?
n8n工作流调试技巧:
- 点击每个节点查看输入/输出数据
- 使用“Execute Node“单独测试某个节点
- 检查“Executions“查看历史运行记录
遇到配置问题?问AI: “n8n的Kubernetes节点如何配置?需要什么权限?” “如何在n8n中安全存储API密钥?”
实战案例: 监控数据聚合
n8n Workflow 3: 多源监控数据推送
这个工作流定期从多个监控系统收集数据,聚合后推送给Agent进行分析。
数据源:
- Prometheus (基础设施指标)
- CloudWatch (AWS资源)
- Datadog (APM数据)
- PagerDuty (告警历史)
- GitHub (部署事件)
聚合逻辑:
1. 每15分钟触发一次
2. 并行查询所有数据源
3. 合并数据到统一格式
4. 计算关键指标(可用性、错误率、延迟)
5. 推送到Agent的webhook
6. Agent分析数据,决定是否需要采取行动
Agent接收聚合数据
# Agent webhook endpoint
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/webhook/monitoring-data', methods=['POST'])
def receive_monitoring_data():
data = request.json
# 提取关键指标
availability = data['metrics']['availability']
error_rate = data['metrics']['error_rate']
p99_latency = data['metrics']['p99_latency']
# 检查是否有异常
issues = []
if availability < 99.9:
issues.append(f"可用性下降到 {availability}%")
if error_rate > 1.0:
issues.append(f"错误率 {error_rate}% (正常<1%)")
if p99_latency > 1000:
issues.append(f"P99延迟 {p99_latency}ms (正常<500ms)")
if issues:
# 触发诊断流程
diagnosis = diagnose_performance_issue(data)
# 如果可以修复,执行修复
if diagnosis['can_auto_fix']:
repair_result = execute_repair(diagnosis)
return jsonify({"status": "repaired", "actions": repair_result})
else:
# 升级到人工处理
send_alert("warning", "Performance Issue", "\n".join(issues))
return jsonify({"status": "escalated"})
return jsonify({"status": "healthy"})
n8n的可观测性优势
n8n最大的优势之一是完整的可观测性。每次工作流执行都会保留详细记录:
Execution #12345
Status: Success
Duration: 2.3s
Triggered: 2024-02-20 15:30:00
Node Executions:
├─ Webhook ✓ 50ms
├─ Parse Alert ✓ 10ms
├─ Route by Severity ✓ 5ms
├─ Send to Slack ✓ 380ms
├─ Log to Database ✓ 120ms
└─ Respond to Webhook ✓ 5ms
Input Data:
{
"severity": "warning",
"service": "api-gateway",
"message": "High memory usage detected"
}
Output Data:
{
"status": "alert_sent",
"channels": ["#alerts"]
}
这种可观测性在调试自动化流程时极其有价值。你可以准确地看到:
- 哪一步失败了
- 输入输出数据是什么
- 执行花了多长时间
- 错误堆栈在哪里
📚 深入学习
想更深入了解n8n?可以问AI:
- “n8n和Zapier、Make(Integromat)有什么区别?”
- “如何设计容错的n8n工作流?”
- “n8n可以部署到Kubernetes吗?有什么最佳实践?”
11.4 Observability优先
自愈系统的可靠性取决于可观测性(Observability)。你需要知道Agent在做什么,为什么这样做,以及它做得对不对。
三大支柱: 日志、指标、追踪
1. 日志(Logs)
结构化日志是Agent行为的详细记录。
# 使用结构化日志
import structlog
logger = structlog.get_logger()
# 每个重要操作都记录
logger.info(
"health_check_completed",
check_type="k8s_pods",
namespace="production",
total_pods=15,
failing_pods=0,
duration_ms=234
)
logger.warning(
"issue_detected",
issue_type="pod_crash",
pod_name="api-gateway-x7k2m",
namespace="production",
restart_count=3,
last_error="OutOfMemoryError"
)
logger.info(
"repair_initiated",
repair_type="pod_restart",
pod_name="api-gateway-x7k2m",
risk_level="low",
auto_approved=True
)
logger.info(
"repair_completed",
repair_type="pod_restart",
pod_name="api-gateway-x7k2m",
success=True,
duration_ms=2340,
new_pod_name="api-gateway-9g3h7"
)
日志聚合: Loki + Grafana
使用Loki聚合Agent的日志,在Grafana中可视化查询。
# promtail配置: 收集Agent日志
scrape_configs:
- job_name: self-healing-agent
static_configs:
- targets:
- localhost
labels:
job: agent
environment: production
__path__: /var/log/agent/*.log
pipeline_stages:
- json:
expressions:
level: level
message: message
check_type: check_type
issue_type: issue_type
- labels:
level:
check_type:
issue_type:
在Grafana中查询
# 查询最近1小时内的所有修复操作
{job="agent"} |= "repair_completed" | json
# 查询失败的修复操作
{job="agent"} |= "repair_completed" | json | success="false"
# 统计每种问题类型的频率
sum by (issue_type) (
count_over_time({job="agent"} |= "issue_detected" [1h])
)
2. 指标(Metrics)
将Agent的关键指标导出到Prometheus。
# 使用Prometheus客户端库
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# 定义指标
health_checks_total = Counter(
'agent_health_checks_total',
'Total number of health checks performed',
['check_type', 'status']
)
issues_detected_total = Counter(
'agent_issues_detected_total',
'Total number of issues detected',
['issue_type', 'severity']
)
repairs_total = Counter(
'agent_repairs_total',
'Total number of repair actions taken',
['repair_type', 'success']
)
repair_duration_seconds = Histogram(
'agent_repair_duration_seconds',
'Time spent performing repairs',
['repair_type']
)
active_issues = Gauge(
'agent_active_issues',
'Number of currently active issues',
['severity']
)
# 在代码中更新指标
def perform_health_check(check_type):
start = time.time()
try:
result = run_check(check_type)
health_checks_total.labels(
check_type=check_type,
status='success'
).inc()
return result
except Exception as e:
health_checks_total.labels(
check_type=check_type,
status='failure'
).inc()
raise
finally:
duration = time.time() - start
logger.info("check_completed", check_type=check_type, duration=duration)
def execute_repair(repair_type, **params):
start = time.time()
try:
result = do_repair(repair_type, **params)
repairs_total.labels(
repair_type=repair_type,
success=True
).inc()
return result
except Exception as e:
repairs_total.labels(
repair_type=repair_type,
success=False
).inc()
raise
finally:
duration = time.time() - start
repair_duration_seconds.labels(repair_type=repair_type).observe(duration)
# 启动Prometheus metrics服务器
start_http_server(9090)
Prometheus查询示例
# 每小时检测到的问题数量
rate(agent_issues_detected_total[1h])
# 修复成功率
sum(rate(agent_repairs_total{success="true"}[5m]))
/
sum(rate(agent_repairs_total[5m]))
# P95修复时间
histogram_quantile(0.95,
rate(agent_repair_duration_seconds_bucket[5m])
)
# 当前活跃的严重问题数量
agent_active_issues{severity="critical"}
在Grafana中创建Dashboard
{
"dashboard": {
"title": "Self-healing Agent监控",
"panels": [
{
"title": "健康检查状态",
"targets": [{
"expr": "rate(agent_health_checks_total[5m])"
}],
"type": "graph"
},
{
"title": "问题检测量",
"targets": [{
"expr": "sum by (issue_type) (rate(agent_issues_detected_total[1h]))"
}],
"type": "graph"
},
{
"title": "修复成功率",
"targets": [{
"expr": "sum(rate(agent_repairs_total{success=\"true\"}[5m])) / sum(rate(agent_repairs_total[5m])) * 100"
}],
"type": "singlestat"
},
{
"title": "活跃问题",
"targets": [{
"expr": "agent_active_issues"
}],
"type": "table"
}
]
}
}
3. 追踪(Tracing)
使用OpenTelemetry追踪Agent处理事件的完整链路。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
# 配置追踪
tracer_provider = TracerProvider()
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
tracer_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer(__name__)
# 使用追踪
def handle_incident(incident_type, **params):
with tracer.start_as_current_span("handle_incident") as span:
span.set_attribute("incident.type", incident_type)
span.set_attribute("incident.severity", params.get("severity"))
# 诊断阶段
with tracer.start_as_current_span("diagnose"):
diagnosis = diagnose(incident_type, params)
span.set_attribute("diagnosis.result", diagnosis['type'])
# 决策阶段
with tracer.start_as_current_span("decision"):
decision = make_decision(diagnosis)
span.set_attribute("decision.action", decision['action'])
span.set_attribute("decision.risk_level", decision['risk'])
# 修复阶段(如果批准)
if decision['approved']:
with tracer.start_as_current_span("repair"):
result = execute_repair(decision['action'], diagnosis)
span.set_attribute("repair.success", result['success'])
return result
在Jaeger UI中,你可以看到完整的调用链:
handle_incident (2.5s)
├─ diagnose (0.8s)
│ ├─ get_pod_status (0.3s)
│ ├─ get_container_logs (0.4s)
│ └─ analyze_error_pattern (0.1s)
├─ decision (0.1s)
└─ repair (1.6s)
├─ restart_pod (1.2s)
└─ create_pr (0.4s)
主动 vs 被动监控
被动监控: 等待问题发生,然后响应
问题发生 → 告警触发 → Agent检测 → 诊断修复
这是传统的监控模式,Agent扮演“救火队员“角色。
主动监控: 预测问题,提前预防
Agent定期检查 → 发现潜在问题 → 主动修复 → 避免故障
Self-healing Agent应该以主动监控为主,被动响应为辅。
主动监控示例
def proactive_health_check():
"""主动发现潜在问题"""
issues_found = []
# 1. 检查证书有效期
certificates = get_all_certificates()
for cert in certificates:
days_left = cert['expiry_days']
if days_left < 30:
issues_found.append({
'type': 'certificate_expiring',
'severity': 'warning' if days_left > 7 else 'high',
'domain': cert['domain'],
'days_left': days_left,
'action': 'renew_certificate'
})
# 2. 检查磁盘增长趋势
disk_usage = get_disk_usage_trend(days=7)
growth_rate = calculate_growth_rate(disk_usage)
if growth_rate > 0:
days_until_full = calculate_days_until_full(disk_usage, growth_rate)
if days_until_full < 30:
issues_found.append({
'type': 'disk_filling_up',
'severity': 'warning',
'days_until_full': days_until_full,
'action': 'schedule_cleanup'
})
# 3. 检查即将过期的备份保留
backups = get_backup_retention_status()
for backup in backups:
if backup['retention_days_left'] < 3:
issues_found.append({
'type': 'backup_expiring',
'severity': 'high',
'backup_id': backup['id'],
'action': 'extend_retention'
})
# 4. 检查资源使用趋势
resource_trends = analyze_resource_trends(days=14)
if resource_trends['cpu']['trend'] == 'increasing':
weeks_until_limit = resource_trends['cpu']['weeks_until_80_percent']
if weeks_until_limit < 4:
issues_found.append({
'type': 'cpu_trending_high',
'severity': 'info',
'weeks_until_limit': weeks_until_limit,
'action': 'consider_scaling'
})
return issues_found
# 定期运行主动检查
def main_loop():
while True:
issues = proactive_health_check()
for issue in issues:
if issue['severity'] in ['high', 'critical']:
# 严重问题立即处理
handle_issue(issue)
else:
# 低优先级问题记录并计划处理
schedule_issue(issue)
time.sleep(3600) # 每小时一次
告警疲劳的防护
过多的告警会导致“告警疲劳“,最终让人忽略所有告警。Self-healing Agent应该智能地筛选告警。
告警分级策略
def should_alert_human(issue):
"""决定是否需要通知人类"""
# Level 1: Agent已自动修复,无需通知
if issue['auto_fixed'] and issue['severity'] == 'low':
return False
# Level 2: Agent已自动修复,但记录通知(不紧急)
if issue['auto_fixed'] and issue['severity'] in ['medium', 'high']:
return {
'notify': True,
'urgency': 'low',
'channel': 'slack',
'summary': True # 只发送摘要,不发送详情
}
# Level 3: Agent无法自动修复,需要人工介入
if not issue['can_auto_fix']:
return {
'notify': True,
'urgency': 'high' if issue['severity'] == 'critical' else 'medium',
'channel': 'slack+sms' if issue['severity'] == 'critical' else 'slack',
'summary': False # 发送完整详情
}
# Level 4: 重复出现的问题,提升优先级
if issue['occurrence_count'] > 3:
return {
'notify': True,
'urgency': 'high',
'channel': 'slack',
'message': f"这个问题已经出现{issue['occurrence_count']}次了,需要根本性修复"
}
return False
聚合告警
不要每个问题都发一条告警,而是聚合同类问题。
def aggregate_alerts(issues, window_minutes=15):
"""聚合时间窗口内的告警"""
aggregated = {}
for issue in issues:
key = (issue['type'], issue['service'])
if key not in aggregated:
aggregated[key] = {
'type': issue['type'],
'service': issue['service'],
'count': 0,
'first_seen': issue['timestamp'],
'last_seen': issue['timestamp'],
'examples': []
}
aggregated[key]['count'] += 1
aggregated[key]['last_seen'] = issue['timestamp']
if len(aggregated[key]['examples']) < 3:
aggregated[key]['examples'].append(issue)
# 生成摘要消息
alerts = []
for agg in aggregated.values():
if agg['count'] == 1:
alerts.append(format_single_alert(agg['examples'][0]))
else:
alerts.append(format_aggregated_alert(agg))
return alerts
def format_aggregated_alert(agg):
return f"""
🔔 {agg['count']}个 {agg['type']} 问题
服务: {agg['service']}
时间范围: {agg['first_seen']} - {agg['last_seen']}
示例:
{format_examples(agg['examples'])}
完整列表: [查看详情]
"""
💡 AI辅助提示
想了解更多可观测性实践?可以问AI:
- “什么是SLI、SLO、SLA?如何设定合理的目标?”
- “如何设计不会产生告警疲劳的告警策略?”
- “Prometheus和Grafana的最佳实践是什么?”
审计与合规
Self-healing Agent会自主执行基础设施变更,必须有完整的审计轨迹。
审计日志设计
# memory/audit/2024-02-20-actions.md
## 2024-02-20 审计日志
### 03:15:22 - Pod重启
- **操作**: 重启Pod
- **对象**: api-gateway-7d9f5b8c6-x7k2m
- **原因**: OutOfMemoryError
- **风险级别**: Low
- **批准方式**: 自动(符合预设规则)
- **执行结果**: 成功
- **新Pod**: api-gateway-9g3h7
- **Git提交**: [链接]
### 14:30:45 - 磁盘清理
- **操作**: 清理日志文件
- **对象**: web-01.example.com
- **原因**: 磁盘使用率87%
- **风险级别**: Low
- **批准方式**: 自动
- **执行结果**: 成功
- **释放空间**: 33GB
- **最终使用率**: 64%
### 18:45:50 - 部署回滚
- **操作**: 回滚Deployment
- **对象**: api-gateway
- **原因**: 新版本启动失败
- **风险级别**: High
- **批准方式**: 自动(带30秒人工取消窗口)
- **人工干预**: 无
- **执行结果**: 成功
- **回滚版本**: v1.2.3 (revision 23)
定期审计报告
def generate_weekly_audit_report():
"""生成每周审计报告"""
week_start = datetime.now() - timedelta(days=7)
actions = get_actions_since(week_start)
report = {
'period': f"{week_start.date()} to {datetime.now().date()}",
'total_actions': len(actions),
'by_type': count_by_field(actions, 'type'),
'by_risk_level': count_by_field(actions, 'risk_level'),
'by_approval': count_by_field(actions, 'approval_method'),
'success_rate': calculate_success_rate(actions),
'manual_interventions': count_manual_interventions(actions),
'time_saved': estimate_time_saved(actions),
'issues': []
}
# 识别需要关注的模式
for action_type, count in report['by_type'].items():
if count > 10: # 频繁发生
report['issues'].append({
'type': 'frequent_issue',
'action_type': action_type,
'count': count,
'recommendation': f"考虑根本性修复 {action_type} 问题"
})
# 识别失败率高的操作
for action_type in report['by_type'].keys():
type_actions = [a for a in actions if a['type'] == action_type]
failure_rate = 1 - calculate_success_rate(type_actions)
if failure_rate > 0.2: # 失败率>20%
report['issues'].append({
'type': 'high_failure_rate',
'action_type': action_type,
'failure_rate': f"{failure_rate*100:.1f}%",
'recommendation': f"检查 {action_type} 的自动修复逻辑"
})
# 生成markdown报告
return format_audit_report(report)
小结
本章展示了如何构建一个完整的自愈式基础设施系统:
核心要点:
-
Agent的价值: 24/7监控、自动修复、知识积累,将运维人员从重复工作中解放出来
-
Self-healing模式: 健康检查 → 诊断分析 → 自动修复 → 审计记录,形成闭环
-
工具集成: SSH、kubectl、Terraform、Ansible与OpenClaw Agent的无缝集成
-
安全第一: 凭证隔离(n8n)、防护栏(Git PR)、审计日志、人工确认窗口
-
可观测性优先: 日志、指标、追踪三大支柱,主动监控而非被动响应
-
渐进式自动化: 从Level 1(只监控)到Level 4(自主修复),根据风险和成熟度选择
实战建议:
- ✅ 从简单开始: 先自动化低风险、高频率的任务(如Pod重启)
- ✅ 建立信任: 通过dry-run模式和详细日志让团队信任Agent
- ✅ 持续优化: 定期审查Agent行为,优化决策逻辑
- ✅ 文档化知识: 每次事件都是学习机会,沉淀到知识库
- ✅ 监控Agent: Agent本身也需要监控,避免“谁来监控监控者“的问题
下一步:
- 第12章我们将讨论知识管理与学习系统,展示如何让Agent持续积累运维知识
- 第14章会深入Agent的可观测性与调试技巧
- 附录A提供了基础设施自动化的安全检查清单
记住: 最好的自愈系统不是处理问题最快的,而是让问题越来越少的。通过持续学习和优化,Agent会帮你构建一个越来越稳定、越来越不需要人工介入的基础设施。
“We can’t solve problems by using the same kind of thinking we used when we created them.” - Albert Einstein
自动化不是简单地把手动操作变成脚本,而是重新思考运维工作的本质。Agent系统让我们从“救火“模式转向“预防“模式,从“被动响应“转向“主动优化“。这才是真正的DevOps转型。
参考资料
本章引用的案例均来自 awesome-openclaw-usecases 社区仓库:
-
案例来源:Self-Healing Home Server,awesome-openclaw-usecases 社区贡献 ↩