大家好,我是何三,80后老猿,独立开发者
"五一假期要是哪台服务器抽风了怎么办?"虽然值班同事会处理,但要是能提前发现问题不是更好吗?
自动化巡检的念头
去年国庆假期,我们有个业务系统半夜崩溃,直到早上客户投诉才发现。事后分析发现,其实从凌晨两点就开始有小规模异常,只是没人盯着监控。这件事让我意识到,假期的人工巡检总会有盲区。
于是就写个脚本,让服务器自己检查自己,发现问题就立刻通知我们
巡检内容规划
这个自动化巡检需要覆盖几个关键点:
- 硬件资源:CPU、内存、硬盘使用率
- 网站可用性:主要网页是否能正常访问
- 服务状态:关键业务端口是否开放
- 深度检查:用DeepSeek API分析潜在问题
完整实现代码
下面是我实现的完整代码,你可以直接拿去用(记得替换关键配置):
import smtplib
import socket
import psutil
import requests
from email.mime.text import MIMEText
from email.header import Header
from datetime import datetime
# 配置区域
SMTP_SERVER = 'smtp.163.com' # SMTP服务器
SMTP_PORT = 465 # SMTP端口
EMAIL_USER = 'your_email@163.com' # 发件邮箱
EMAIL_PWD = 'your_password' # 邮箱密码/授权码
TO_EMAILS = ['admin1@example.com', 'admin2@example.com'] # 收件人列表
WEBSITES_TO_CHECK = ['https://www.example.com', 'https://blog.example.com'] # 要检查的网站
PORTS_TO_CHECK = [80, 443, 3306, 6379] # 要检查的端口
DEEPSEEK_API_KEY = 'your_deepseek_api_key' # DeepSeek API密钥
SERVER_NAME = '生产服务器1' # 服务器名称
def check_cpu():
"""检查CPU使用率"""
cpu_percent = psutil.cpu_percent(interval=1)
return {
'metric': 'CPU使用率',
'value': f'{cpu_percent}%',
'status': '正常' if cpu_percent < 80 else '警告'
}
def check_memory():
"""检查内存使用情况"""
mem = psutil.virtual_memory()
mem_percent = mem.percent
return {
'metric': '内存使用率',
'value': f'{mem_percent}% ({mem.used//1024//1024}MB/{mem.total//1024//1024}MB)',
'status': '正常' if mem_percent < 80 else '警告'
}
def check_disk():
"""检查磁盘使用情况"""
disk = psutil.disk_usage('/')
disk_percent = disk.percent
return {
'metric': '磁盘使用率',
'value': f'{disk_percent}% ({disk.used//1024//1024}GB/{disk.total//1024//1024}GB)',
'status': '正常' if disk_percent < 90 else '警告'
}
def check_website(url):
"""检查网站可访问性"""
try:
start = datetime.now()
response = requests.get(url, timeout=10)
latency = (datetime.now() - start).total_seconds()
return {
'metric': f'网站访问 {url}',
'value': f'状态码 {response.status_code}, 延迟 {latency:.2f}秒',
'status': '正常' if response.status_code == 200 else '异常'
}
except Exception as e:
return {
'metric': f'网站访问 {url}',
'value': f'访问失败: {str(e)}',
'status': '异常'
}
def check_port(port):
"""检查端口是否开放"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
result = sock.connect_ex(('127.0.0.1', port))
sock.close()
return {
'metric': f'端口检查 {port}',
'value': '开放' if result == 0 else '关闭',
'status': '正常' if result == 0 else '异常'
}
def analyze_with_deepseek(problem_description):
"""使用DeepSeek API分析问题"""
try:
headers = {
'Authorization': f'Bearer {DEEPSEEK_API_KEY}',
'Content-Type': 'application/json'
}
data = {
'prompt': f"作为服务器运维专家,请分析以下问题并提供解决建议:\n{problem_description}",
'max_tokens': 500
}
response = requests.post(
'https://api.deepseek.com/v1/chat/completions',
headers=headers,
json=data,
timeout=10
)
return response.json().get('choices', [{}])[0].get('message', {}).get('content', 'DeepSeek分析失败')
except Exception as e:
return f'调用DeepSeek API失败: {str(e)}'
def send_email(subject, content):
"""发送邮件通知"""
message = MIMEText(content, 'plain', 'utf-8')
message['From'] = Header(f'服务器巡检系统 <{EMAIL_USER}>')
message['To'] = Header(','.join(TO_EMAILS))
message['Subject'] = Header(subject, 'utf-8')
try:
smtp_obj = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT)
smtp_obj.login(EMAIL_USER, EMAIL_PWD)
smtp_obj.sendmail(EMAIL_USER, TO_EMAILS, message.as_string())
smtp_obj.quit()
return True
except Exception as e:
print(f"邮件发送失败: {str(e)}")
return False
def generate_report(check_results):
"""生成巡检报告"""
report = f"{SERVER_NAME} 巡检报告\n"
report += f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
report += "="*50 + "\n"
warning_count = 0
error_count = 0
problems = []
for result in check_results:
report += f"{result['metric']}: {result['value']} [{result['status']}]\n"
if result['status'] == '警告':
warning_count += 1
problems.append(f"{result['metric']} {result['value']}")
elif result['status'] == '异常':
error_count += 1
problems.append(f"{result['metric']} {result['value']}")
report += "="*50 + "\n"
report += f"总结: 共检查{len(check_results)}项, 警告{warning_count}项, 异常{error_count}项\n"
if problems:
report += "\n问题分析:\n"
problem_desc = "\n".join(problems)
deepseek_analysis = analyze_with_deepseek(problem_desc)
report += deepseek_analysis
return report
def main():
"""主巡检流程"""
print(f"开始执行服务器巡检... {datetime.now()}")
# 执行所有检查
check_results = []
check_results.append(check_cpu())
check_results.append(check_memory())
check_results.append(check_disk())
for website in WEBSITES_TO_CHECK:
check_results.append(check_website(website))
for port in PORTS_TO_CHECK:
check_results.append(check_port(port))
# 生成报告
report = generate_report(check_results)
print(report)
# 如果有问题就发邮件
if any(result['status'] in ('警告', '异常') for result in check_results):
subject = f"[紧急] {SERVER_NAME} 巡检发现问题!"
send_email(subject, report)
else:
subject = f"{SERVER_NAME} 巡检正常"
if datetime.now().hour == 9: # 只在早上9点发送正常报告
send_email(subject, report)
print(f"巡检完成. {datetime.now()}")
if __name__ == '__main__':
main()
邮件内容展示
主题:生产服务器1 巡检正常
正文内容:
生产服务器1 巡检报告
生成时间: 2023-04-28 09:00:05
==================================================
CPU使用率: 15% [正常]
内存使用率: 42% (3456MB/8192MB) [正常]
磁盘使用率: 65% (130GB/200GB) [正常]
网站访问 https://www.h3blog.com: 状态码 200, 延迟 0.32秒 [正常]
网站访问 https://blog.h3blog.com: 状态码 200, 延迟 0.28秒 [正常]
端口检查 80: 开放 [正常]
端口检查 443: 开放 [正常]
端口检查 3306: 内部 [正常]
端口检查 6379: 内部 [正常]
==================================================
总结: 共检查8项, 警告0项, 异常0项
所有系统指标正常,祝您工作愉快!
代码解析与使用说明
这个脚本做了以下几件事:
- 硬件检查:使用
psutil
库获取CPU、内存和磁盘使用情况 - 网站检查:通过
requests
尝试访问配置的网站,检查返回状态码和延迟 - 端口检查:用
socket
检查关键业务端口是否开放 - 深度分析:发现问题后调用DeepSeek API获取专业分析建议
- 邮件通知:通过SMTP发送详细的巡检报告
如何部署使用
- 安装依赖库:
pip install psutil requests
-
修改脚本开头的配置部分:
-
设置你的邮箱SMTP信息
- 配置要检查的网站和端口
-
申请DeepSeek API密钥(如果没有可以去掉相关代码)
-
设置定时任务(Linux crontab示例,每小时检查一次):
0 * * * * /usr/bin/python3 /path/to/server_check.py >> /var/log/server_check.log 2>&1
实际效果
自从部署了这个脚本,运维工作轻松了不少。有次凌晨三点,脚本检测到数据库端口无响应,自动发邮件并附上了DeepSeek的分析建议:"可能是连接数耗尽,建议检查max_connections配置和当前连接数"。值班同事按照建议很快解决了问题,避免了早高峰的业务中断。
更进一步
这个基础版本还可以扩展很多功能:
- 添加企业微信/钉钉机器人通知
- 记录历史数据用于趋势分析
- 对关键业务做更复杂的检查(如数据库查询测试)
- 实现自动修复简单问题
但核心思想很简单:让机器做机器擅长的事(24小时监控),让人做人擅长的事(分析解决复杂问题)。这样即使是假期,我们也能安心休息,知道有"数字同事"在帮我们盯着系统。