如果你的团队不到5个人,负责公司全部IT基础设施,每天告警来自:云监控短信、Zabbix推送群消息、交换机Syslog塞邮箱、机房环控App自己弹通知、安全设备另一个管理后台——那这篇文章就是写给你的。
告警治理在大型团队是一个专门岗位(NOC),但在中小团队,告警治理就是"N+1件事"里的那件事。没人会专门花两周选型告警管理平台,但每个人都希望"告警别在四个地方蹦"。这套方案的核心思路是用最轻量的方式——一个Flask脚本+企业微信机器人——把归集、分级、收敛、派单四步跑通。不需要额外服务器、不需要买新系统、也不需要一个人专职盯告警。
我们不买东西,先把噪音干掉。
一、告警多不是问题,告警散才是问题
先问一个问题:你的IT团队一天收到多少条告警?
50条?100条?200条?
再问一个:这些告警来自几个渠道?
大部分中小企业的回答是:至少4个。Zabbix在企业微信群里发、交换机的Syslog转到邮箱、UPS和空调有专用App推送、安全设备在自己的管理界面弹窗。
告警多不是最大的问题——你可以通过调整阈值减少告警量。真正的问题是告警散落在不同渠道。IT团队需要同时盯着企业微信、邮箱、2个App和1个Web管理界面,才能确保不漏掉任何一条告警。
结果就是两种极端:
- 要么全都不看。 告警太多太散,逐渐麻木,当成背景噪音。
- 要么严重的被淹掉。 一台核心交换机DOWN了,发了1条Critical告警,但它和30条"端口Up/Down抖动"的告警混在一起,直到用户打电话来报障才被发现。
解决方案是一条完整的告警治理链路:归集 → 分级 → 收敛 → 派单。
二、告警归集:把所有告警收到一个地方
2.1 归集架构
告警归集架构:
Zabbix ──── Webhook ────┐
交换机Syslog ──── Syslog转发 ──┤
UPS/空调 ──── SNMP Trap ───┼──→ 告警中心(统一接口)──→ 分级 → 派单
安全设备 ──── API对接 ────┤
云服务(阿里云等) ── 回调Webhook ──┘
告警中心可以是:
├── 开源方案:Alertmanager + 自定义Webhook
├── 轻量方案:一个Python Flask服务,接收各渠道推送
└── 平台方案:使用统一运维平台的告警归集模块
2.2 Zabbix告警接入(Webhook方式)
Zabbix是最常见的告警源。通过Media Type配置Webhook,把告警推送到统一接口:
#!/usr/bin/env python3
"""
告警归集服务 - 接收多系统告警,统一格式化
依赖:pip install flask
运行:python3 alert_collector.py
"""
from flask import Flask, request, jsonify
from datetime import datetime
import json
import hashlib
app = Flask(__name__)
# 告警存储(生产环境用数据库)
alerts = []
# ============ 统一告警格式 ============
def normalize_alert(source, raw_data):
"""将不同来源的告警统一为标准格式"""
alert = {
"id": hashlib.md5(f"{source}{raw_data}{datetime.now().isoformat()}".encode()).hexdigest()[:12],
"source": source,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"host": "",
"severity": "P3", # 默认P3-信息级
"title": "",
"detail": "",
"status": "firing", # firing / resolved
"assigned_to": None,
"work_order_id": None
}
if source == "zabbix":
alert["host"] = raw_data.get("host", "")
alert["title"] = raw_data.get("trigger_name", "")
alert["detail"] = raw_data.get("trigger_description", "")
alert["severity"] = map_zabbix_severity(raw_data.get("trigger_severity", ""))
alert["status"] = "resolved" if raw_data.get("trigger_status") == "OK" else "firing"
elif source == "syslog":
alert["host"] = raw_data.get("host", "")
alert["title"] = raw_data.get("message", "")[:100]
alert["detail"] = raw_data.get("message", "")
alert["severity"] = map_syslog_severity(raw_data.get("severity", 6))
elif source == "snmptrap":
alert["host"] = raw_data.get("agent_addr", "")
alert["title"] = raw_data.get("trap_oid", "")
alert["detail"] = json.dumps(raw_data.get("varbinds", {
}))
alert["severity"] = "P2" # SNMP Trap默认预警级
return alert
def map_zabbix_severity(zabbix_sev):
"""Zabbix严重级别 → 统一分级"""
mapping = {
"Disaster": "P1",
"High": "P1",
"Average": "P2",
"Warning": "P2",
"Information": "P3",
"Not classified": "P3"
}
return mapping.get(zabbix_sev, "P3")
def map_syslog_severity(syslog_sev):
"""Syslog严重级别(0-7) → 统一分级"""
if syslog_sev <= 2: # Emergency/Alert/Critical
return "P1"
elif syslog_sev <= 4: # Error/Warning
return "P2"
else: # Notice/Info/Debug
return "P3"
# ============ 告警接收接口 ============
@app.route("/alert/zabbix", methods=["POST"])
def receive_zabbix():
"""接收Zabbix Webhook告警"""
data = request.json
alert = normalize_alert("zabbix", data)
alerts.append(alert)
process_alert(alert)
return jsonify({
"status": "ok", "alert_id": alert["id"]})
@app.route("/alert/syslog", methods=["POST"])
def receive_syslog():
"""接收Syslog转发告警"""
data = request.json
alert = normalize_alert("syslog", data)
alerts.append(alert)
process_alert(alert)
return jsonify({
"status": "ok", "alert_id": alert["id"]})
@app.route("/alert/snmptrap", methods=["POST"])
def receive_snmptrap():
"""接收SNMP Trap告警"""
data = request.json
alert = normalize_alert("snmptrap", data)
alerts.append(alert)
process_alert(alert)
return jsonify({
"status": "ok", "alert_id": alert["id"]})
def process_alert(alert):
"""告警处理:分级 → 收敛 → 派单"""
if should_suppress(alert):
alert["status"] = "suppressed"
return
if alert["severity"] == "P1":
dispatch_alert(alert, channel="phone")
elif alert["severity"] == "P2":
dispatch_alert(alert, channel="wechat")
else:
dispatch_alert(alert, channel="log_only")
def should_suppress(alert):
"""告警收敛:同一设备+同一告警标题,5分钟内不重复"""
for existing in reversed(alerts[:-1]):
if (existing["host"] == alert["host"] and
existing["title"] == alert["title"] and
existing["status"] == "firing"):
existing_time = datetime.strptime(existing["timestamp"], "%Y-%m-%d %H:%M:%S")
current_time = datetime.strptime(alert["timestamp"], "%Y-%m-%d %H:%M:%S")
if (current_time - existing_time).total_seconds() < 300:
return True
return False
def dispatch_alert(alert, channel):
"""派单通知"""
print(f"[派单] {alert['severity']} | {alert['host']} | {alert['title']} → {channel}")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
2.3 Syslog转发配置
# 在交换机上配置Syslog转发到归集服务器
# 华为交换机
info-center loghost 192.168.1.200 facility local7
# Cisco交换机
logging host 192.168.1.200
logging trap warnings
# 归集服务器上用rsyslog接收并转HTTP
# /etc/rsyslog.d/alert-forward.conf
template(name="AlertJSON" type="string"
string="{\"host\":\"%HOSTNAME%\",\"severity\":%syslogseverity%,\"message\":\"%msg%\"}")
if $syslogseverity <= 4 then {
action(type="omhttp"
server="127.0.0.1"
serverport="5000"
restpath="alert/syslog"
template="AlertJSON"
)
}
三、告警分级:P1/P2/P3三级就够了
3.1 分级规则
中小企业不需要5级分级。3级足够,规则越简单越能执行:
| 级别 | 名称 | 定义 | 响应要求 | 通知方式 | 例子 |
|---|---|---|---|---|---|
| P1 | 严重 | 业务中断或即将中断 | 15分钟内响应 | 电话+企业微信 | 核心交换机DOWN、互联网出口断、服务器宕机 |
| P2 | 预警 | 业务未中断但有风险 | 2小时内响应 | 企业微信 | CPU>90%持续10分钟、磁盘>85%、链路丢包>5% |
| P3 | 信息 | 无业务影响的变更/通知 | 工作时间内查看 | 仅记录 | 端口Up/Down、配置变更、备份完成通知 |
3.2 分级判断逻辑
分级判断决策树:
这条告警影响业务了吗?
├── 是 → P1(业务中断)
│ ├── 核心设备DOWN
│ ├── 互联网出口不通
│ ├── 关键业务服务器宕机
│ └── 全公司无法上网/打电话
│
└── 没有 → 可能影响业务吗?
├── 可能(指标异常、趋势恶化)→ P2(预警)
│ ├── CPU/内存/磁盘预警
│ ├── 链路质量下降
│ ├── 冗余链路断了一条(业务还通但无冗余)
│ └── 证书/域名即将到期
│
└── 不会(纯信息型)→ P3(信息)
├── 端口抖动(物理口Up/Down)
├── 备份完成通知
├── 配置保存通知
└── 设备重启完成
3.3 分级的关键是"从业务影响出发"而不是"从设备状态出发"
很多人做告警分级的思路是"核心交换机的告警就是P1、接入交换机的就是P2"——这是错的。
正确的思路是:同一台设备的不同告警,级别可以不同。
同一台核心交换机的不同告警分级:
├── 设备不可达(Unreachable) → P1 全网受影响
├── CPU>95%持续10分钟 → P2 业务暂未受影响但有风险
├── 某物理端口Down → P3 可能只是用户拔了网线
└── 配置被保存 → P3 信息型
四、告警收敛:解决"一台设备挂了发50条告警"的问题
4.1 收敛规则
告警收敛是告警治理中最容易被忽略但最有价值的一环。一台交换机DOWN了,可能触发:
- 设备不可达告警 × 1
- 该设备上的20个端口告警 × 20
- 连接到该设备的下游设备不可达 × 10
- 相关联的业务探测失败 × 5
- Syslog断流告警 × 1
一共37条告警。但根本原因只有1个:那台交换机DOWN了。
收敛规则:
规则1:重复收敛
├── 同一设备 + 同一告警 + 5分钟内 → 只保留第1条,后续合并
├── 显示"重复 N 次"代替重复通知
└── 例:核心交换机CPU告警每分钟发1次 → 5分钟内只通知1次
规则2:根因收敛(最关键)
├── 设备不可达 → 抑制该设备上的所有其他告警
├── 上游设备DOWN → 抑制所有下游设备的"不可达"告警
└── 例:核心交换机DOWN → 抑制20个端口告警 + 10个下游设备告警
规则3:抖动收敛
├── 端口Up/Down在10分钟内切换超过3次 → 合并为1条"端口抖动"告警
└── 避免物理链路不稳定导致的告警轰炸
规则4:恢复自动关闭
├── 告警恢复(OK/Resolved) → 自动关闭对应的告警
└── 显示"已恢复"+ 持续时长
4.2 收敛效果
收敛前:一台核心交换机DOWN → IT收到37条告警通知
收敛后:一台核心交换机DOWN → IT收到1条告警:
"P1 | Core-SW-HQ 不可达 | 影响设备: 30台 | 已自动抑制关联告警36条"
收敛前:每天80条告警 → 全部推送到企业微信 → 没人看
收敛后:每天80条告警 → 收敛+分级后 → P1: 0-2条 / P2: 5-8条 / P3: 仅记录
→ 真正需要IT处理的只有5-10条
五、自动派单:告警到人而不是告警到群
5.1 告警到群 vs 告警到人的区别
大部分中小企业的告警通知是这样的:把所有告警发到一个企业微信群里,然后——没人管。
因为"发到群里"等于"发给所有人",等于"不是发给任何具体的人"。每个人都觉得"应该有别人在看吧"。
正确的做法是:告警到具体的人,并生成工单跟踪。
派单规则示例(2人IT团队):
张工(网络+服务器):
├── 所有网络设备告警 → @张工
├── 服务器/虚拟化告警 → @张工
└── P1告警 → @张工 + 电话
李工(终端+安全):
├── 终端/WiFi告警 → @李工
├── 安全设备告警 → @李工
└── P1告警 → @李工 + 电话(备份)
所有告警:
├── P1 → 指定负责人 + 电话 + 生成紧急工单
├── P2 → 指定负责人 + 企业微信 + 生成普通工单
├── P3 → 仅记录到告警日志
└── 超时未处理 → P2超2小时/P1超15分钟 → 升级通知IT主管
5.2 企业微信告警通知对接
import requests
def send_wechat_alert(alert, webhook_url, mentioned_user=None):
"""
发送告警到企业微信群机器人
webhook_url: 企业微信群机器人Webhook地址
mentioned_user: 要@的用户ID(企业微信通讯录中的userid)
"""
severity_emoji = {
"P1": "🔴", "P2": "🟡", "P3": "🔵"}
emoji = severity_emoji.get(alert["severity"], "⚪")
content = (
f"{emoji} **{alert['severity']}级告警**\n"
f"> 设备:{alert['host']}\n"
f"> 告警:{alert['title']}\n"
f"> 时间:{alert['timestamp']}\n"
f"> 来源:{alert['source']}\n"
f"> 详情:{alert['detail'][:200]}\n"
)
if alert.get("work_order_id"):
content += f"> 工单:#{alert['work_order_id']}\n"
payload = {
"msgtype": "markdown",
"markdown": {
"content": content}
}
if mentioned_user:
payload["markdown"]["content"] += f"\n<@{mentioned_user}> 请及时处理"
resp = requests.post(webhook_url, json=payload, timeout=10)
return resp.status_code == 200
# 派单规则配置
DISPATCH_RULES = {
"network": {
"owner": "zhangsan",
"webhook": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx-network",
"keywords": ["交换机", "路由器", "防火墙", "链路", "端口", "SNMP"]
},
"server": {
"owner": "zhangsan",
"webhook": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx-server",
"keywords": ["服务器", "CPU", "内存", "磁盘", "VMware", "进程"]
},
"terminal": {
"owner": "lisi",
"webhook": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx-terminal",
"keywords": ["WiFi", "AP", "终端", "打印机"]
}
}
def auto_dispatch(alert):
"""根据告警内容自动匹配负责人"""
for category, rule in DISPATCH_RULES.items():
for keyword in rule["keywords"]:
if keyword in alert["title"] or keyword in alert["detail"]:
alert["assigned_to"] = rule["owner"]
send_wechat_alert(alert, rule["webhook"], mentioned_user=rule["owner"])
return
# 兜底:发到通用告警群
send_wechat_alert(alert, "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx-general")
5.3 告警→工单自动关联
告警与工单的关联逻辑:
P1告警 → 自动创建「紧急工单」
├── 工单标题 = 告警标题
├── 优先级 = 紧急
├── 处理人 = 派单规则匹配的负责人
├── SLA要求 = 15分钟响应 / 4小时解决
└── 超时升级 = 15分钟未响应 → 通知IT主管
P2告警 → 自动创建「普通工单」
├── 优先级 = 普通
├── SLA要求 = 2小时响应 / 24小时解决
└── 超时升级 = 2小时未响应 → 升级为P1
P3告警 → 不创建工单
├── 仅记录到告警日志
└── 可在月度回顾时批量查看
告警恢复 → 自动更新工单
├── 追加备注:告警已自动恢复
├── 记录持续时长
└── 待处理人确认后关闭(不自动关闭)
六、告警治理落地的5个建议
第一,先归集再分级。 不要一上来就搞分级规则。先把所有告警收到一个地方——哪怕是一个企业微信群+一个Excel记录表。能看到"全部告警"之后,你才知道哪些是高频的、哪些是噪音、哪些需要分级。
第二,分级规则从粗开始,逐步细化。 先按"设备是否不可达"分P1/非P1,跑一周看数据。再根据实际情况加P2规则。别一开始就搞50条规则——太细了没人能记住。
第三,告警收敛必须做。 收敛是投入产出比最高的环节。做了收敛,告警数量直接从每天80条降到10条以内,IT团队的体感完全不同。
第四,派单到人不到群。 如果只能改一个东西,改这个。告警发到群里就是废了。发到具体的人、@具体的人、附工单号,处理率翻倍。
第五,定期复盘告警数据。 每月拉一次告警统计:哪类告警最多?哪些反复出现?哪些P2应该降成P3(噪音太多)?哪些P3应该升成P2(曾经引发过事故)?告警规则不是写一次就完了,需要持续调优。
我们团队实际跑下来,最大的感受是——归集和收敛这两步做完,80%的问题就解决了。之前IT团队看到告警就麻木的状态,变成了"收到P1立刻响应、P2工作时间内处理完、P3月底统一看"。不是告警变少了,而是噪音被过滤掉了,该关注的浮出来了。我们后来把这套告警治理流程整体迁移到了冠服云EMS平台上,归集、分级、收敛、工单联动都变成了平台内置的能力,不再需要单独维护Flask脚本和企业微信机器人的配置——不过这套轻量方案本身完全够用,重要的是先把"告警从消息变工单、从通知到群变指派到人"这个逻辑跑通。
本文给出的Flask告警归集脚本已在Python 3.9+环境验证通过。企业微信Webhook地址需替换为实际的群机器人Key。