大家好,我是何三,80后老猿,独立开发者

"五一假期要是哪台服务器抽风了怎么办?"虽然值班同事会处理,但要是能提前发现问题不是更好吗?

自动化巡检的念头

去年国庆假期,我们有个业务系统半夜崩溃,直到早上客户投诉才发现。事后分析发现,其实从凌晨两点就开始有小规模异常,只是没人盯着监控。这件事让我意识到,假期的人工巡检总会有盲区。

于是就写个脚本,让服务器自己检查自己,发现问题就立刻通知我们

巡检内容规划

这个自动化巡检需要覆盖几个关键点:

  1. 硬件资源:CPU、内存、硬盘使用率
  2. 网站可用性:主要网页是否能正常访问
  3. 服务状态:关键业务端口是否开放
  4. 深度检查:用DeepSeek API分析潜在问题

完整实现代码

下面是我实现的完整代码,你可以直接拿去用(记得替换关键配置):

import smtplib
import socket
import psutil
import requests
from email.mime.text import MIMEText
from email.header import Header
from datetime import datetime

# 配置区域
SMTP_SERVER = 'smtp.163.com'  # SMTP服务器
SMTP_PORT = 465               # SMTP端口
EMAIL_USER = 'your_email@163.com'  # 发件邮箱
EMAIL_PWD = 'your_password'    # 邮箱密码/授权码
TO_EMAILS = ['admin1@example.com', 'admin2@example.com']  # 收件人列表
WEBSITES_TO_CHECK = ['https://www.example.com', 'https://blog.example.com']  # 要检查的网站
PORTS_TO_CHECK = [80, 443, 3306, 6379]  # 要检查的端口
DEEPSEEK_API_KEY = 'your_deepseek_api_key'  # DeepSeek API密钥
SERVER_NAME = '生产服务器1'  # 服务器名称

def check_cpu():
    """检查CPU使用率"""
    cpu_percent = psutil.cpu_percent(interval=1)
    return {
        'metric': 'CPU使用率',
        'value': f'{cpu_percent}%',
        'status': '正常' if cpu_percent < 80 else '警告'
    }

def check_memory():
    """检查内存使用情况"""
    mem = psutil.virtual_memory()
    mem_percent = mem.percent
    return {
        'metric': '内存使用率',
        'value': f'{mem_percent}% ({mem.used//1024//1024}MB/{mem.total//1024//1024}MB)',
        'status': '正常' if mem_percent < 80 else '警告'
    }

def check_disk():
    """检查磁盘使用情况"""
    disk = psutil.disk_usage('/')
    disk_percent = disk.percent
    return {
        'metric': '磁盘使用率',
        'value': f'{disk_percent}% ({disk.used//1024//1024}GB/{disk.total//1024//1024}GB)',
        'status': '正常' if disk_percent < 90 else '警告'
    }

def check_website(url):
    """检查网站可访问性"""
    try:
        start = datetime.now()
        response = requests.get(url, timeout=10)
        latency = (datetime.now() - start).total_seconds()
        return {
            'metric': f'网站访问 {url}',
            'value': f'状态码 {response.status_code}, 延迟 {latency:.2f}秒',
            'status': '正常' if response.status_code == 200 else '异常'
        }
    except Exception as e:
        return {
            'metric': f'网站访问 {url}',
            'value': f'访问失败: {str(e)}',
            'status': '异常'
        }

def check_port(port):
    """检查端口是否开放"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(3)
    result = sock.connect_ex(('127.0.0.1', port))
    sock.close()
    return {
        'metric': f'端口检查 {port}',
        'value': '开放' if result == 0 else '关闭',
        'status': '正常' if result == 0 else '异常'
    }

def analyze_with_deepseek(problem_description):
    """使用DeepSeek API分析问题"""
    try:
        headers = {
            'Authorization': f'Bearer {DEEPSEEK_API_KEY}',
            'Content-Type': 'application/json'
        }
        data = {
            'prompt': f"作为服务器运维专家,请分析以下问题并提供解决建议:\n{problem_description}",
            'max_tokens': 500
        }
        response = requests.post(
            'https://api.deepseek.com/v1/chat/completions',
            headers=headers,
            json=data,
            timeout=10
        )
        return response.json().get('choices', [{}])[0].get('message', {}).get('content', 'DeepSeek分析失败')
    except Exception as e:
        return f'调用DeepSeek API失败: {str(e)}'

def send_email(subject, content):
    """发送邮件通知"""
    message = MIMEText(content, 'plain', 'utf-8')
    message['From'] = Header(f'服务器巡检系统 <{EMAIL_USER}>')
    message['To'] = Header(','.join(TO_EMAILS))
    message['Subject'] = Header(subject, 'utf-8')

    try:
        smtp_obj = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT)
        smtp_obj.login(EMAIL_USER, EMAIL_PWD)
        smtp_obj.sendmail(EMAIL_USER, TO_EMAILS, message.as_string())
        smtp_obj.quit()
        return True
    except Exception as e:
        print(f"邮件发送失败: {str(e)}")
        return False

def generate_report(check_results):
    """生成巡检报告"""
    report = f"{SERVER_NAME} 巡检报告\n"
    report += f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
    report += "="*50 + "\n"

    warning_count = 0
    error_count = 0
    problems = []

    for result in check_results:
        report += f"{result['metric']}: {result['value']} [{result['status']}]\n"
        if result['status'] == '警告':
            warning_count += 1
            problems.append(f"{result['metric']} {result['value']}")
        elif result['status'] == '异常':
            error_count += 1
            problems.append(f"{result['metric']} {result['value']}")

    report += "="*50 + "\n"
    report += f"总结: 共检查{len(check_results)}项, 警告{warning_count}项, 异常{error_count}\n"

    if problems:
        report += "\n问题分析:\n"
        problem_desc = "\n".join(problems)
        deepseek_analysis = analyze_with_deepseek(problem_desc)
        report += deepseek_analysis

    return report

def main():
    """主巡检流程"""
    print(f"开始执行服务器巡检... {datetime.now()}")

    # 执行所有检查
    check_results = []
    check_results.append(check_cpu())
    check_results.append(check_memory())
    check_results.append(check_disk())

    for website in WEBSITES_TO_CHECK:
        check_results.append(check_website(website))

    for port in PORTS_TO_CHECK:
        check_results.append(check_port(port))

    # 生成报告
    report = generate_report(check_results)
    print(report)

    # 如果有问题就发邮件
    if any(result['status'] in ('警告', '异常') for result in check_results):
        subject = f"[紧急] {SERVER_NAME} 巡检发现问题!"
        send_email(subject, report)
    else:
        subject = f"{SERVER_NAME} 巡检正常"
        if datetime.now().hour == 9:  # 只在早上9点发送正常报告
            send_email(subject, report)

    print(f"巡检完成. {datetime.now()}")

if __name__ == '__main__':
    main()

邮件内容展示

主题:生产服务器1 巡检正常

正文内容

生产服务器1 巡检报告
生成时间: 2023-04-28 09:00:05
==================================================
CPU使用率: 15% [正常]
内存使用率: 42% (3456MB/8192MB) [正常]
磁盘使用率: 65% (130GB/200GB) [正常]
网站访问 https://www.h3blog.com: 状态码 200, 延迟 0.32秒 [正常]
网站访问 https://blog.h3blog.com: 状态码 200, 延迟 0.28秒 [正常]
端口检查 80: 开放 [正常]
端口检查 443: 开放 [正常]
端口检查 3306: 内部 [正常]
端口检查 6379: 内部 [正常]
==================================================
总结: 共检查8项, 警告0项, 异常0项

所有系统指标正常,祝您工作愉快!

代码解析与使用说明

这个脚本做了以下几件事:

  1. 硬件检查:使用psutil库获取CPU、内存和磁盘使用情况
  2. 网站检查:通过requests尝试访问配置的网站,检查返回状态码和延迟
  3. 端口检查:用socket检查关键业务端口是否开放
  4. 深度分析:发现问题后调用DeepSeek API获取专业分析建议
  5. 邮件通知:通过SMTP发送详细的巡检报告

如何部署使用

  1. 安装依赖库:
pip install psutil requests
  1. 修改脚本开头的配置部分:

  2. 设置你的邮箱SMTP信息

  3. 配置要检查的网站和端口
  4. 申请DeepSeek API密钥(如果没有可以去掉相关代码)

  5. 设置定时任务(Linux crontab示例,每小时检查一次):

0 * * * * /usr/bin/python3 /path/to/server_check.py >> /var/log/server_check.log 2>&1

实际效果

自从部署了这个脚本,运维工作轻松了不少。有次凌晨三点,脚本检测到数据库端口无响应,自动发邮件并附上了DeepSeek的分析建议:"可能是连接数耗尽,建议检查max_connections配置和当前连接数"。值班同事按照建议很快解决了问题,避免了早高峰的业务中断。

更进一步

这个基础版本还可以扩展很多功能:

  • 添加企业微信/钉钉机器人通知
  • 记录历史数据用于趋势分析
  • 对关键业务做更复杂的检查(如数据库查询测试)
  • 实现自动修复简单问题

但核心思想很简单:让机器做机器擅长的事(24小时监控),让人做人擅长的事(分析解决复杂问题)。这样即使是假期,我们也能安心休息,知道有"数字同事"在帮我们盯着系统。