大家好,我是何三,80后老猿,独立开发者。
继续我们的"5分钟Python自动化"系列。今天我们来解决一个文件安全的刚需——实时监控文件夹变化。
有这样的场景:你的团队共享文件夹里,有人误删了重要文件却不知道是谁;你的网站上传目录被恶意上传了可疑文件;你的项目代码库突然出现了异常修改;你想知道某个文件夹什么时候被修改过、谁修改的...
传统做法:手动检查文件变化,设置Windows资源管理器或Mac Finder的"修改日期"排序,但这样既不能实时监控,也无法记录变化历史。
Python做法:15行代码,创建实时的文件监控系统,自动记录所有文件变化,第一时间发现异常!
今天,我就带你用5分钟,掌握这个让你的文件系统拥有"上帝视角"的神奇技能!
环境准备:一个命令搞定
在开始之前,你只需要:
- 安装Python(这个已经装好了)
- 安装watchdog库:打开命令行,输入:
pip install watchdog
watchdog是一个专业的文件系统监控库,跨平台支持,功能强大且使用简单。
核心代码:15行实时监控系统
让我们先看看这个最简单的文件夹监控系统:
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class FileChangeHandler(FileSystemEventHandler):
"""处理文件变化事件的处理器"""
def on_created(self, event):
print(f"[创建] {event.src_path}")
def on_deleted(self, event):
print(f"[删除] {event.src_path}")
def on_modified(self, event):
print(f"[修改] {event.src_path}")
def on_moved(self, event):
print(f"[移动] {event.src_path} -> {event.dest_path}")
def monitor_folder(folder_path):
"""监控指定文件夹"""
event_handler = FileChangeHandler()
observer = Observer()
observer.schedule(event_handler, folder_path, recursive=True)
observer.start()
try:
print(f"开始监控文件夹: {folder_path}")
print("按 Ctrl+C 停止监控")
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
# 监控当前目录
monitor_folder(".")
逐行详解:理解文件监控的原理
第1-3行:导入监控工具箱
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
time:用于控制监控进程Observer:监控器的核心,负责观察文件系统变化FileSystemEventHandler:事件处理器基类,我们需要继承它来实现自定义逻辑
第5-20行:自定义事件处理器
class FileChangeHandler(FileSystemEventHandler):
"""处理文件变化事件的处理器"""
def on_created(self, event):
print(f"[创建] {event.src_path}")
def on_deleted(self, event):
print(f"[删除] {event.src_path}")
def on_modified(self, event):
print(f"[修改] {event.src_path}")
def on_moved(self, event):
print(f"[移动] {event.src_path} -> {event.dest_path}")
这是整个监控系统的"大脑",它定义了四种事件的处理方式:
文件创建事件:
def on_created(self, event):
print(f"[创建] {event.src_path}")
当有新文件或文件夹被创建时触发。
文件删除事件:
def on_deleted(self, event):
print(f"[删除] {event.src_path}")
当文件或文件夹被删除时触发。
文件修改事件:
def on_modified(self, event):
print(f"[修改] {event.src_path}")
当文件内容被修改时触发。
文件移动事件:
def on_moved(self, event):
print(f"[移动] {event.src_path} -> {event.dest_path}")
当文件被重命名或移动到其他位置时触发。
第22-36行:启动监控器
def monitor_folder(folder_path):
"""监控指定文件夹"""
event_handler = FileChangeHandler()
observer = Observer()
observer.schedule(event_handler, folder_path, recursive=True)
observer.start()
try:
print(f"开始监控文件夹: {folder_path}")
print("按 Ctrl+C 停止监控")
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
核心步骤:
1. 创建事件处理器实例
2. 创建监控器实例
3. 设置监控规则:recursive=True表示监控子文件夹
4. 启动监控器
5. 保持程序运行,直到用户按Ctrl+C中断
实战演示:实时监控文件夹变化
- 保存脚本:将上面的代码保存为
folder_monitor.py - 运行脚本:打开命令行,输入:
python folder_monitor.py
- 测试监控:在监控的文件夹中尝试:
- 创建一个新文件
- 修改一个已有文件
- 删除一个文件
- 重命名一个文件
- 观察输出:命令行会实时显示所有文件变化!
运行效果:
开始监控文件夹: .
按 Ctrl+C 停止监控
[创建] ./新建文本文档.txt
[修改] ./新建文本文档.txt
[移动] ./新建文本文档.txt -> ./重命名.txt
[删除] ./重命名.txt
举一反三:高级监控功能定制
案例1:只监控特定类型的文件
class ImageFileHandler(FileSystemEventHandler):
"""只监控图片文件"""
def on_created(self, event):
if event.src_path.lower().endswith(('.jpg', '.png', '.gif')):
print(f"新图片文件: {event.src_path}")
def on_modified(self, event):
if event.src_path.lower().endswith(('.jpg', '.png', '.gif')):
print(f"图片被修改: {event.src_path}")
# 可以在这里添加图片压缩、加水印等处理
案例2:记录监控日志到文件
import datetime
class LoggingFileHandler(FileSystemEventHandler):
"""记录文件变化到日志文件"""
def __init__(self, log_file="file_changes.log"):
self.log_file = log_file
def log_event(self, event_type, path, dest_path=None):
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
user = os.getlogin() if hasattr(os, 'getlogin') else "unknown"
if dest_path:
message = f"{timestamp} | {user} | {event_type} | {path} -> {dest_path}\n"
else:
message = f"{timestamp} | {user} | {event_type} | {path}\n"
with open(self.log_file, "a", encoding="utf-8") as f:
f.write(message)
print(message.strip())
def on_created(self, event):
self.log_event("创建", event.src_path)
def on_deleted(self, event):
self.log_event("删除", event.src_path)
def on_modified(self, event):
self.log_event("修改", event.src_path)
def on_moved(self, event):
self.log_event("移动", event.src_path, event.dest_path)
案例3:自动备份被修改的文件
import shutil
import os
class BackupFileHandler(FileSystemEventHandler):
"""自动备份被修改的文件"""
def __init__(self, backup_folder="backups"):
self.backup_folder = backup_folder
os.makedirs(backup_folder, exist_ok=True)
def on_modified(self, event):
if not event.is_directory: # 只处理文件,不处理文件夹
# 创建带时间戳的备份文件名
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.basename(event.src_path)
backup_name = f"{timestamp}_{filename}"
backup_path = os.path.join(self.backup_folder, backup_name)
# 备份文件
shutil.copy2(event.src_path, backup_path)
print(f"已备份: {event.src_path} -> {backup_path}")
案例4:监控网站上传目录的安全
class SecurityFileHandler(FileSystemEventHandler):
"""安全监控,检测可疑文件"""
DANGEROUS_EXTENSIONS = ['.php', '.jsp', '.asp', '.exe', '.bat', '.sh']
def on_created(self, event):
if not event.is_directory:
file_ext = os.path.splitext(event.src_path)[1].lower()
if file_ext in self.DANGEROUS_EXTENSIONS:
print(f"⚠️ 安全警报!发现可疑文件: {event.src_path}")
print(f" 文件类型: {file_ext}")
print(f" 创建时间: {datetime.datetime.now()}")
# 可以添加邮件通知、移动文件到隔离区等操作
# self.quarantine_file(event.src_path)
进阶技巧:企业级文件监控系统
监控特定用户的文件操作
import os
import pwd # Unix/Linux系统
import getpass # Windows系统
class UserAwareFileHandler(FileSystemEventHandler):
"""识别文件操作的用户"""
def get_current_user(self):
"""获取当前系统用户"""
try:
if os.name == 'posix': # Unix/Linux/Mac
return pwd.getpwuid(os.getuid()).pw_name
else: # Windows
return getpass.getuser()
except:
return "unknown"
def on_modified(self, event):
user = self.get_current_user()
print(f"[{user}] 修改了文件: {event.src_path}")
监控文件大小变化
class SizeMonitoringHandler(FileSystemEventHandler):
"""监控文件大小变化"""
def __init__(self):
self.file_sizes = {}
def on_modified(self, event):
if not event.is_directory and os.path.exists(event.src_path):
current_size = os.path.getsize(event.src_path)
if event.src_path in self.file_sizes:
old_size = self.file_sizes[event.src_path]
change = current_size - old_size
if change > 0:
print(f"📈 {event.src_path} 增大了 {change:,} 字节")
elif change < 0:
print(f"📉 {event.src_path} 减小了 {abs(change):,} 字节")
self.file_sizes[event.src_path] = current_size
使用配置文件的可配置监控
import json
class ConfigurableMonitor:
"""可配置的文件监控器"""
def __init__(self, config_file="monitor_config.json"):
with open(config_file, 'r', encoding='utf-8') as f:
self.config = json.load(f)
self.monitored_folders = self.config.get("folders", [])
self.file_patterns = self.config.get("patterns", [])
def start_monitoring(self):
"""启动多文件夹监控"""
observers = []
for folder in self.monitored_folders:
if os.path.exists(folder):
handler = self.create_handler_for_folder(folder)
observer = Observer()
observer.schedule(handler, folder, recursive=True)
observer.start()
observers.append(observer)
print(f"开始监控: {folder}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
for obs in observers:
obs.stop()
for obs in observers:
obs.join()
def create_handler_for_folder(self, folder):
"""为每个文件夹创建定制处理器"""
class FolderSpecificHandler(FileSystemEventHandler):
def on_created(self, event):
# 根据配置文件处理
pass
return FolderSpecificHandler()
实用场景:文件监控的多种应用
场景1:自动同步文件夹
class AutoSyncHandler(FileSystemEventHandler):
"""自动同步文件夹到备份位置"""
def __init__(self, source_folder, backup_folder):
self.source = source_folder
self.backup = backup_folder
os.makedirs(backup_folder, exist_ok=True)
def on_created(self, event):
if not event.is_directory:
relative_path = os.path.relpath(event.src_path, self.source)
backup_path = os.path.join(self.backup, relative_path)
# 确保备份目录结构存在
os.makedirs(os.path.dirname(backup_path), exist_ok=True)
# 同步文件
shutil.copy2(event.src_path, backup_path)
print(f"已同步: {relative_path}")
def on_modified(self, event):
self.on_created(event) # 修改也触发同步
场景2:监控项目代码变化
class CodeChangeHandler(FileSystemEventHandler):
"""监控代码仓库变化"""
CODE_EXTENSIONS = ['.py', '.js', '.java', '.cpp', '.html', '.css']
def on_modified(self, event):
if not event.is_directory:
ext = os.path.splitext(event.src_path)[1]
if ext in self.CODE_EXTENSIONS:
print(f"📝 代码文件被修改: {event.src_path}")
# 可以触发自动测试、代码格式化等
# self.run_tests(event.src_path)
def on_created(self, event):
if not event.is_directory:
ext = os.path.splitext(event.src_path)[1]
if ext in self.CODE_EXTENSIONS:
print(f"🆕 新增代码文件: {event.src_path}")
场景3:实时日志文件监控
class LogFileTailer(FileSystemEventHandler):
"""实时监控日志文件追加内容"""
def __init__(self, log_file):
self.log_file = log_file
self.last_position = 0
# 如果文件存在,记录当前位置
if os.path.exists(log_file):
self.last_position = os.path.getsize(log_file)
def on_modified(self, event):
if event.src_path == self.log_file:
current_size = os.path.getsize(self.log_file)
if current_size > self.last_position:
# 读取新增内容
with open(self.log_file, 'r', encoding='utf-8') as f:
f.seek(self.last_position)
new_content = f.read()
if new_content.strip():
print("📄 新的日志内容:")
print(new_content)
self.last_position = current_size
注意事项与最佳实践
1. 处理大量文件事件
class DebouncedHandler(FileSystemEventHandler):
"""防抖处理,避免短时间内大量事件"""
def __init__(self, delay=0.5):
self.delay = delay
self.last_event_time = {}
def should_process(self, event):
"""判断是否应该处理这个事件"""
current_time = time.time()
event_key = f"{event.event_type}_{event.src_path}"
if event_key in self.last_event_time:
if current_time - self.last_event_time[event_key] < self.delay:
return False
self.last_event_time[event_key] = current_time
return True
def on_modified(self, event):
if self.should_process(event):
print(f"处理修改事件: {event.src_path}")
2. 跨平台兼容性处理
class CrossPlatformHandler(FileSystemEventHandler):
"""处理不同操作系统的差异"""
def on_modified(self, event):
# 在某些系统上,保存文件可能触发多次修改事件
# 可以添加额外的判断逻辑
if not hasattr(self, 'last_modified'):
self.last_modified = {}
file_path = event.src_path
current_time = time.time()
# 如果同一个文件在1秒内被多次修改,只处理一次
if file_path in self.last_modified:
if current_time - self.last_modified[file_path] < 1:
return
self.last_modified[file_path] = current_time
print(f"文件修改: {file_path}")
3. 监控性能优化
def optimize_monitor(folder_path):
"""优化的监控器,减少资源占用"""
event_handler = FileSystemEventHandler()
# 只监控特定类型的事件
def on_important_events(event):
if event.event_type in ['created', 'deleted']:
print(f"重要事件: {event.event_type} - {event.src_path}")
event_handler.on_any_event = on_important_events
observer = Observer()
observer.schedule(event_handler, folder_path, recursive=False) # 不监控子文件夹
observer.start()
结语:让文件系统拥有"千里眼"
恭喜!在又一个5分钟内,你掌握了一个强大的文件安全监控技能:
✅ 使用watchdog库创建实时文件监控
✅ 掌握文件创建、修改、删除、移动的监控方法
✅ 学会定制化监控规则和事件处理
✅ 具备了构建企业级监控系统的能力
今天你学会的不仅仅是监控文件,而是掌握了保护数字资产的主动防御能力。
想象一下,你还可以: - 为团队共享文件夹添加操作审计 - 实时监控网站上传目录的安全性 - 自动备份重要文件的每次修改 - 监控项目代码的实时变化 - 构建自动化的文件同步系统
知道发生了什么,永远比不知道要好。监控不是为了限制,而是为了了解。
现在,就找一个你关心的文件夹,为它加上监控功能吧!在评论区分享你用它解决了什么问题,或者发现了什么有趣的文件操作模式!
记得关注我,下一篇我们挑战更有趣的Python自动化技巧!