大家好,我是何三,80后老猿,独立开发者。

继续我们的"5分钟Python自动化"系列。今天我们来解决一个文件安全的刚需——实时监控文件夹变化

有这样的场景:你的团队共享文件夹里,有人误删了重要文件却不知道是谁;你的网站上传目录被恶意上传了可疑文件;你的项目代码库突然出现了异常修改;你想知道某个文件夹什么时候被修改过、谁修改的...

传统做法:手动检查文件变化,设置Windows资源管理器或Mac Finder的"修改日期"排序,但这样既不能实时监控,也无法记录变化历史。

Python做法:15行代码,创建实时的文件监控系统,自动记录所有文件变化,第一时间发现异常!

今天,我就带你用5分钟,掌握这个让你的文件系统拥有"上帝视角"的神奇技能!

环境准备:一个命令搞定

在开始之前,你只需要:

  1. 安装Python(这个已经装好了)
  2. 安装watchdog库:打开命令行,输入:
pip install watchdog

watchdog是一个专业的文件系统监控库,跨平台支持,功能强大且使用简单。

核心代码:15行实时监控系统

让我们先看看这个最简单的文件夹监控系统:

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class FileChangeHandler(FileSystemEventHandler):
    """处理文件变化事件的处理器"""

    def on_created(self, event):
        print(f"[创建] {event.src_path}")

    def on_deleted(self, event):
        print(f"[删除] {event.src_path}")

    def on_modified(self, event):
        print(f"[修改] {event.src_path}")

    def on_moved(self, event):
        print(f"[移动] {event.src_path} -> {event.dest_path}")

def monitor_folder(folder_path):
    """监控指定文件夹"""
    event_handler = FileChangeHandler()
    observer = Observer()
    observer.schedule(event_handler, folder_path, recursive=True)
    observer.start()

    try:
        print(f"开始监控文件夹: {folder_path}")
        print("按 Ctrl+C 停止监控")
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

# 监控当前目录
monitor_folder(".")

逐行详解:理解文件监控的原理

第1-3行:导入监控工具箱

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
  • time:用于控制监控进程
  • Observer:监控器的核心,负责观察文件系统变化
  • FileSystemEventHandler:事件处理器基类,我们需要继承它来实现自定义逻辑

第5-20行:自定义事件处理器

class FileChangeHandler(FileSystemEventHandler):
    """处理文件变化事件的处理器"""

    def on_created(self, event):
        print(f"[创建] {event.src_path}")

    def on_deleted(self, event):
        print(f"[删除] {event.src_path}")

    def on_modified(self, event):
        print(f"[修改] {event.src_path}")

    def on_moved(self, event):
        print(f"[移动] {event.src_path} -> {event.dest_path}")

这是整个监控系统的"大脑",它定义了四种事件的处理方式:

文件创建事件

def on_created(self, event):
    print(f"[创建] {event.src_path}")

当有新文件或文件夹被创建时触发。

文件删除事件

def on_deleted(self, event):
    print(f"[删除] {event.src_path}")

当文件或文件夹被删除时触发。

文件修改事件

def on_modified(self, event):
    print(f"[修改] {event.src_path}")

当文件内容被修改时触发。

文件移动事件

def on_moved(self, event):
    print(f"[移动] {event.src_path} -> {event.dest_path}")

当文件被重命名或移动到其他位置时触发。

第22-36行:启动监控器

def monitor_folder(folder_path):
    """监控指定文件夹"""
    event_handler = FileChangeHandler()
    observer = Observer()
    observer.schedule(event_handler, folder_path, recursive=True)
    observer.start()

    try:
        print(f"开始监控文件夹: {folder_path}")
        print("按 Ctrl+C 停止监控")
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

核心步骤: 1. 创建事件处理器实例 2. 创建监控器实例 3. 设置监控规则:recursive=True表示监控子文件夹 4. 启动监控器 5. 保持程序运行,直到用户按Ctrl+C中断

实战演示:实时监控文件夹变化

  1. 保存脚本:将上面的代码保存为folder_monitor.py
  2. 运行脚本:打开命令行,输入:
python folder_monitor.py
  1. 测试监控:在监控的文件夹中尝试:
    • 创建一个新文件
    • 修改一个已有文件
    • 删除一个文件
    • 重命名一个文件
  2. 观察输出:命令行会实时显示所有文件变化!

运行效果

开始监控文件夹: .
 Ctrl+C 停止监控
[创建] ./新建文本文档.txt
[修改] ./新建文本文档.txt
[移动] ./新建文本文档.txt -> ./重命名.txt
[删除] ./重命名.txt

举一反三:高级监控功能定制

案例1:只监控特定类型的文件

class ImageFileHandler(FileSystemEventHandler):
    """只监控图片文件"""

    def on_created(self, event):
        if event.src_path.lower().endswith(('.jpg', '.png', '.gif')):
            print(f"新图片文件: {event.src_path}")

    def on_modified(self, event):
        if event.src_path.lower().endswith(('.jpg', '.png', '.gif')):
            print(f"图片被修改: {event.src_path}")
            # 可以在这里添加图片压缩、加水印等处理

案例2:记录监控日志到文件

import datetime

class LoggingFileHandler(FileSystemEventHandler):
    """记录文件变化到日志文件"""

    def __init__(self, log_file="file_changes.log"):
        self.log_file = log_file

    def log_event(self, event_type, path, dest_path=None):
        timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        user = os.getlogin() if hasattr(os, 'getlogin') else "unknown"

        if dest_path:
            message = f"{timestamp} | {user} | {event_type} | {path} -> {dest_path}\n"
        else:
            message = f"{timestamp} | {user} | {event_type} | {path}\n"

        with open(self.log_file, "a", encoding="utf-8") as f:
            f.write(message)
        print(message.strip())

    def on_created(self, event):
        self.log_event("创建", event.src_path)

    def on_deleted(self, event):
        self.log_event("删除", event.src_path)

    def on_modified(self, event):
        self.log_event("修改", event.src_path)

    def on_moved(self, event):
        self.log_event("移动", event.src_path, event.dest_path)

案例3:自动备份被修改的文件

import shutil
import os

class BackupFileHandler(FileSystemEventHandler):
    """自动备份被修改的文件"""

    def __init__(self, backup_folder="backups"):
        self.backup_folder = backup_folder
        os.makedirs(backup_folder, exist_ok=True)

    def on_modified(self, event):
        if not event.is_directory:  # 只处理文件,不处理文件夹
            # 创建带时间戳的备份文件名
            timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = os.path.basename(event.src_path)
            backup_name = f"{timestamp}_{filename}"
            backup_path = os.path.join(self.backup_folder, backup_name)

            # 备份文件
            shutil.copy2(event.src_path, backup_path)
            print(f"已备份: {event.src_path} -> {backup_path}")

案例4:监控网站上传目录的安全

class SecurityFileHandler(FileSystemEventHandler):
    """安全监控,检测可疑文件"""

    DANGEROUS_EXTENSIONS = ['.php', '.jsp', '.asp', '.exe', '.bat', '.sh']

    def on_created(self, event):
        if not event.is_directory:
            file_ext = os.path.splitext(event.src_path)[1].lower()

            if file_ext in self.DANGEROUS_EXTENSIONS:
                print(f"⚠️  安全警报!发现可疑文件: {event.src_path}")
                print(f"   文件类型: {file_ext}")
                print(f"   创建时间: {datetime.datetime.now()}")

                # 可以添加邮件通知、移动文件到隔离区等操作
                # self.quarantine_file(event.src_path)

进阶技巧:企业级文件监控系统

监控特定用户的文件操作

import os
import pwd  # Unix/Linux系统
import getpass  # Windows系统

class UserAwareFileHandler(FileSystemEventHandler):
    """识别文件操作的用户"""

    def get_current_user(self):
        """获取当前系统用户"""
        try:
            if os.name == 'posix':  # Unix/Linux/Mac
                return pwd.getpwuid(os.getuid()).pw_name
            else:  # Windows
                return getpass.getuser()
        except:
            return "unknown"

    def on_modified(self, event):
        user = self.get_current_user()
        print(f"[{user}] 修改了文件: {event.src_path}")

监控文件大小变化

class SizeMonitoringHandler(FileSystemEventHandler):
    """监控文件大小变化"""

    def __init__(self):
        self.file_sizes = {}

    def on_modified(self, event):
        if not event.is_directory and os.path.exists(event.src_path):
            current_size = os.path.getsize(event.src_path)

            if event.src_path in self.file_sizes:
                old_size = self.file_sizes[event.src_path]
                change = current_size - old_size

                if change > 0:
                    print(f"📈 {event.src_path} 增大了 {change:,} 字节")
                elif change < 0:
                    print(f"📉 {event.src_path} 减小了 {abs(change):,} 字节")

            self.file_sizes[event.src_path] = current_size

使用配置文件的可配置监控

import json

class ConfigurableMonitor:
    """可配置的文件监控器"""

    def __init__(self, config_file="monitor_config.json"):
        with open(config_file, 'r', encoding='utf-8') as f:
            self.config = json.load(f)

        self.monitored_folders = self.config.get("folders", [])
        self.file_patterns = self.config.get("patterns", [])

    def start_monitoring(self):
        """启动多文件夹监控"""
        observers = []

        for folder in self.monitored_folders:
            if os.path.exists(folder):
                handler = self.create_handler_for_folder(folder)
                observer = Observer()
                observer.schedule(handler, folder, recursive=True)
                observer.start()
                observers.append(observer)
                print(f"开始监控: {folder}")

        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            for obs in observers:
                obs.stop()
            for obs in observers:
                obs.join()

    def create_handler_for_folder(self, folder):
        """为每个文件夹创建定制处理器"""
        class FolderSpecificHandler(FileSystemEventHandler):
            def on_created(self, event):
                # 根据配置文件处理
                pass

        return FolderSpecificHandler()

实用场景:文件监控的多种应用

场景1:自动同步文件夹

class AutoSyncHandler(FileSystemEventHandler):
    """自动同步文件夹到备份位置"""

    def __init__(self, source_folder, backup_folder):
        self.source = source_folder
        self.backup = backup_folder
        os.makedirs(backup_folder, exist_ok=True)

    def on_created(self, event):
        if not event.is_directory:
            relative_path = os.path.relpath(event.src_path, self.source)
            backup_path = os.path.join(self.backup, relative_path)

            # 确保备份目录结构存在
            os.makedirs(os.path.dirname(backup_path), exist_ok=True)

            # 同步文件
            shutil.copy2(event.src_path, backup_path)
            print(f"已同步: {relative_path}")

    def on_modified(self, event):
        self.on_created(event)  # 修改也触发同步

场景2:监控项目代码变化

class CodeChangeHandler(FileSystemEventHandler):
    """监控代码仓库变化"""

    CODE_EXTENSIONS = ['.py', '.js', '.java', '.cpp', '.html', '.css']

    def on_modified(self, event):
        if not event.is_directory:
            ext = os.path.splitext(event.src_path)[1]
            if ext in self.CODE_EXTENSIONS:
                print(f"📝 代码文件被修改: {event.src_path}")

                # 可以触发自动测试、代码格式化等
                # self.run_tests(event.src_path)

    def on_created(self, event):
        if not event.is_directory:
            ext = os.path.splitext(event.src_path)[1]
            if ext in self.CODE_EXTENSIONS:
                print(f"🆕 新增代码文件: {event.src_path}")

场景3:实时日志文件监控

class LogFileTailer(FileSystemEventHandler):
    """实时监控日志文件追加内容"""

    def __init__(self, log_file):
        self.log_file = log_file
        self.last_position = 0

        # 如果文件存在,记录当前位置
        if os.path.exists(log_file):
            self.last_position = os.path.getsize(log_file)

    def on_modified(self, event):
        if event.src_path == self.log_file:
            current_size = os.path.getsize(self.log_file)

            if current_size > self.last_position:
                # 读取新增内容
                with open(self.log_file, 'r', encoding='utf-8') as f:
                    f.seek(self.last_position)
                    new_content = f.read()

                    if new_content.strip():
                        print("📄 新的日志内容:")
                        print(new_content)

                self.last_position = current_size

注意事项与最佳实践

1. 处理大量文件事件

class DebouncedHandler(FileSystemEventHandler):
    """防抖处理,避免短时间内大量事件"""

    def __init__(self, delay=0.5):
        self.delay = delay
        self.last_event_time = {}

    def should_process(self, event):
        """判断是否应该处理这个事件"""
        current_time = time.time()
        event_key = f"{event.event_type}_{event.src_path}"

        if event_key in self.last_event_time:
            if current_time - self.last_event_time[event_key] < self.delay:
                return False

        self.last_event_time[event_key] = current_time
        return True

    def on_modified(self, event):
        if self.should_process(event):
            print(f"处理修改事件: {event.src_path}")

2. 跨平台兼容性处理

class CrossPlatformHandler(FileSystemEventHandler):
    """处理不同操作系统的差异"""

    def on_modified(self, event):
        # 在某些系统上,保存文件可能触发多次修改事件
        # 可以添加额外的判断逻辑
        if not hasattr(self, 'last_modified'):
            self.last_modified = {}

        file_path = event.src_path
        current_time = time.time()

        # 如果同一个文件在1秒内被多次修改,只处理一次
        if file_path in self.last_modified:
            if current_time - self.last_modified[file_path] < 1:
                return

        self.last_modified[file_path] = current_time
        print(f"文件修改: {file_path}")

3. 监控性能优化

def optimize_monitor(folder_path):
    """优化的监控器,减少资源占用"""
    event_handler = FileSystemEventHandler()

    # 只监控特定类型的事件
    def on_important_events(event):
        if event.event_type in ['created', 'deleted']:
            print(f"重要事件: {event.event_type} - {event.src_path}")

    event_handler.on_any_event = on_important_events

    observer = Observer()
    observer.schedule(event_handler, folder_path, recursive=False)  # 不监控子文件夹
    observer.start()

结语:让文件系统拥有"千里眼"

恭喜!在又一个5分钟内,你掌握了一个强大的文件安全监控技能:

✅ 使用watchdog库创建实时文件监控
✅ 掌握文件创建、修改、删除、移动的监控方法
✅ 学会定制化监控规则和事件处理
✅ 具备了构建企业级监控系统的能力

今天你学会的不仅仅是监控文件,而是掌握了保护数字资产的主动防御能力。

想象一下,你还可以: - 为团队共享文件夹添加操作审计 - 实时监控网站上传目录的安全性 - 自动备份重要文件的每次修改 - 监控项目代码的实时变化 - 构建自动化的文件同步系统

知道发生了什么,永远比不知道要好。监控不是为了限制,而是为了了解。

现在,就找一个你关心的文件夹,为它加上监控功能吧!在评论区分享你用它解决了什么问题,或者发现了什么有趣的文件操作模式!

记得关注我,下一篇我们挑战更有趣的Python自动化技巧!