002-Python运维业务服务监控理论与实例

Python运维业务服务监控理论与实例

在现代IT运维中,业务服务监控是保障系统稳定运行的关键环节。本章将详细介绍如何使用Python实现文件内容差异对比、文件与目录差异对比、邮件发送以及Web服务质量监控等核心功能。

2.1 文件内容差异对比方法

文件内容的差异对比在配置管理、版本控制、安全审计等场景中有着广泛应用。Python提供了多种实现文件对比的方法。

2.1.1 示例1:两个字符串的差异对比

使用difflib模块进行字符串对比

python

import difflib

def string_diff_basic():
    """基础字符串差异对比"""
    text1 = """user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log;
pid /run/nginx.pid;

events {
    worker_connections 1024;
}"""

    text2 = """user nginx;
worker_processes 4;
error_log /var/log/nginx/error.log warn;
pid /run/nginx.pid;

events {
    worker_connections 2048;
    use epoll;
}"""

    # 创建Differ对象
    d = difflib.Differ()
    diff = d.compare(text1.splitlines(), text2.splitlines())
    
    print("逐行对比结果:")
    print('
'.join(diff))

def unified_diff_example():
    """统一差异格式(类似git diff)"""
    text1 = """line 1
line 2
line 3
line 4
line 5"""

    text2 = """line 1
line 2 modified
line 3
line 4
line 5
line 6"""

    lines1 = text1.splitlines(keepends=True)
    lines2 = text2.splitlines(keepends=True)
    
    # 生成统一格式的差异
    diff = difflib.unified_diff(
        lines1, 
        lines2, 
        fromfile='original.txt',
        tofile='modified.txt',
        lineterm=''
    )
    
    print("统一差异格式:")
    for line in diff:
        print(line)

def context_diff_example():
    """上下文差异格式"""
    text1 = ["apple
", "banana
", "cherry
", "date
"]
    text2 = ["apple
", "blueberry
", "cherry
", "date
", "elderberry
"]
    
    diff = difflib.context_diff(
        text1,
        text2,
        fromfile='fruits1.txt',
        tofile='fruits2.txt'
    )
    
    print("上下文差异格式:")
    for line in diff:
        print(line, end='')

def get_close_matches_example():
    """查找相似匹配"""
    words = ['python', 'java', 'javascript', 'ruby', 'rust', 'golang']
    
    # 查找与'pythno'最相似的词
    matches = difflib.get_close_matches('pythno', words, n=3, cutoff=0.6)
    print(f"
与'pythno'相似的词:{matches}")
    
    # 查找与'jav'最相似的词
    matches = difflib.get_close_matches('jav', words, n=3, cutoff=0.5)
    print(f"与'jav'相似的词:{matches}")

# 执行示例
string_diff_basic()
print("
" + "="*50 + "
")
unified_diff_example()
print("
" + "="*50 + "
")
context_diff_example()
print("
" + "="*50 + "
")
get_close_matches_example()
高级字符串对比功能

python

import difflib
from typing import List, Tuple

class AdvancedStringDiffer:
    """高级字符串差异对比工具"""
    
    def __init__(self):
        self.differ = difflib.Differ()
    
    def get_diff_summary(self, text1: str, text2: str) -> dict:
        """获取差异摘要信息"""
        lines1 = text1.splitlines()
        lines2 = text2.splitlines()
        
        diff = list(self.differ.compare(lines1, lines2))
        
        added = sum(1 for line in diff if line.startswith('+ '))
        removed = sum(1 for line in diff if line.startswith('- '))
        changed = sum(1 for line in diff if line.startswith('? '))
        
        return {
            'total_lines_text1': len(lines1),
            'total_lines_text2': len(lines2),
            'lines_added': added,
            'lines_removed': removed,
            'lines_changed': changed,
            'similarity_ratio': difflib.SequenceMatcher(None, text1, text2).ratio()
        }
    
    def get_changed_lines(self, text1: str, text2: str) -> List[Tuple[int, str, str]]:
        """获取所有变更的行"""
        lines1 = text1.splitlines()
        lines2 = text2.splitlines()
        
        matcher = difflib.SequenceMatcher(None, lines1, lines2)
        changed_lines = []
        
        for tag, i1, i2, j1, j2 in matcher.get_opcodes():
            if tag == 'replace':
                for i in range(i1, i2):
                    old_line = lines1[i] if i < len(lines1) else ''
                    new_line = lines2[j1 + i - i1] if j1 + i - i1 < j2 else ''
                    changed_lines.append((i + 1, old_line, new_line))
            elif tag == 'delete':
                for i in range(i1, i2):
                    changed_lines.append((i + 1, lines1[i], ''))
            elif tag == 'insert':
                for j in range(j1, j2):
                    changed_lines.append((i1 + 1, '', lines2[j]))
        
        return changed_lines
    
    def generate_patch(self, text1: str, text2: str, 
                      filename1: str = 'original', 
                      filename2: str = 'modified') -> str:
        """生成补丁文件内容"""
        lines1 = text1.splitlines(keepends=True)
        lines2 = text2.splitlines(keepends=True)
        
        diff = difflib.unified_diff(
            lines1, lines2,
            fromfile=filename1,
            tofile=filename2,
            n=3  # 上下文行数
        )
        
        return ''.join(diff)
    
    def highlight_differences(self, text1: str, text2: str) -> str:
        """高亮显示差异(HTML格式)"""
        d = difflib.HtmlDiff()
        html = d.make_file(
            text1.splitlines(),
            text2.splitlines(),
            fromdesc='Original',
            todesc='Modified',
            context=True,
            numlines=3
        )
        return html

# 使用示例
differ = AdvancedStringDiffer()

text1 = """def hello_world():
    print("Hello, World!")
    return True

def main():
    hello_world()
"""

text2 = """def hello_world(name="World"):
    print(f"Hello, {name}!")
    return True

def main():
    hello_world("Python")
    print("Done!")
"""

# 获取差异摘要
summary = differ.get_diff_summary(text1, text2)
print("差异摘要:")
for key, value in summary.items():
    print(f"  {key}: {value}")

# 获取变更的行
print("
变更的行:")
changed_lines = differ.get_changed_lines(text1, text2)
for line_num, old, new in changed_lines:
    print(f"  行 {line_num}:")
    if old:
        print(f"    - {old}")
    if new:
        print(f"    + {new}")

# 生成补丁
print("
生成的补丁:")
patch = differ.generate_patch(text1, text2, 'hello_v1.py', 'hello_v2.py')
print(patch)

2.1.2 生成美观的对比HTML格式文档

python

import difflib
import os
from datetime import datetime

class HTMLDiffGenerator:
    """HTML差异报告生成器"""
    
    def __init__(self):
        self.html_diff = difflib.HtmlDiff()
        
    def generate_diff_report(self, file1_content: str, file2_content: str,
                           file1_name: str = "File 1", 
                           file2_name: str = "File 2",
                           title: str = "File Comparison Report") -> str:
        """生成完整的HTML差异报告"""
        
        # 自定义CSS样式
        custom_css = """
        <style>
            body {
                font-family: Arial, sans-serif;
                margin: 20px;
                background-color: #f5f5f5;
            }
            h1, h2 {
                color: #333;
            }
            .report-header {
                background-color: #fff;
                padding: 20px;
                border-radius: 5px;
                box-shadow: 0 2px 5px rgba(0,0,0,0.1);
                margin-bottom: 20px;
            }
            .metadata {
                color: #666;
                font-size: 14px;
            }
            table.diff {
                background-color: #fff;
                border-collapse: collapse;
                border: 1px solid #ddd;
                width: 100%;
            }
            .diff td {
                padding: 3px 7px;
                font-family: Consolas, monospace;
                font-size: 13px;
                border: 1px solid #ddd;
                white-space: pre-wrap;
                word-wrap: break-word;
            }
            .diff .diff_header {
                background-color: #f0f0f0;
                font-weight: bold;
            }
            .diff .diff_add {
                background-color: #dfd;
            }
            .diff .diff_chg {
                background-color: #ffd;
            }
            .diff .diff_sub {
                background-color: #fdd;
            }
            .line-number {
                background-color: #f7f7f7;
                color: #999;
                text-align: right;
                width: 50px;
            }
            .summary {
                background-color: #fff;
                padding: 15px;
                border-radius: 5px;
                margin-top: 20px;
                box-shadow: 0 2px 5px rgba(0,0,0,0.1);
            }
            .summary-item {
                display: inline-block;
                margin-right: 30px;
                padding: 10px 20px;
                border-radius: 3px;
                font-weight: bold;
            }
            .added { background-color: #dfd; color: #080; }
            .removed { background-color: #fdd; color: #800; }
            .modified { background-color: #ffd; color: #880; }
        </style>
        """
        
        # 计算差异统计
        lines1 = file1_content.splitlines()
        lines2 = file2_content.splitlines()
        
        matcher = difflib.SequenceMatcher(None, lines1, lines2)
        stats = {'added': 0, 'removed': 0, 'modified': 0}
        
        for tag, i1, i2, j1, j2 in matcher.get_opcodes():
            if tag == 'insert':
                stats['added'] += j2 - j1
            elif tag == 'delete':
                stats['removed'] += i2 - i1
            elif tag == 'replace':
                stats['modified'] += max(i2 - i1, j2 - j1)
        
        # 生成HTML内容
        html_content = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <meta charset="utf-8">
            <title>{title}</title>
            {custom_css}
        </head>
        <body>
            <div class="report-header">
                <h1>{title}</h1>
                <div class="metadata">
                    <p>生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
                    <p>文件1: {file1_name} ({len(lines1)} 行)</p>
                    <p>文件2: {file2_name} ({len(lines2)} 行)</p>
                    <p>相似度: {matcher.ratio():.1%}</p>
                </div>
            </div>
            
            <div class="summary">
                <h2>变更摘要</h2>
                <span class="summary-item added">新增: {stats['added']} 行</span>
                <span class="summary-item removed">删除: {stats['removed']} 行</span>
                <span class="summary-item modified">修改: {stats['modified']} 行</span>
            </div>
            
            <h2>详细对比</h2>
        """
        
        # 生成对比表格
        diff_table = self.html_diff.make_table(
            lines1, lines2,
            fromdesc=file1_name,
            todesc=file2_name,
            context=True,
            numlines=3
        )
        
        html_content += diff_table
        html_content += """
        </body>
        </html>
        """
        
        return html_content
    
    def save_diff_report(self, html_content: str, output_path: str):
        """保存HTML报告到文件"""
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(html_content)
        print(f"差异报告已保存到: {output_path}")
    
    def compare_files(self, file1_path: str, file2_path: str, 
                     output_path: str = None):
        """比较两个文件并生成HTML报告"""
        # 读取文件内容
        with open(file1_path, 'r', encoding='utf-8') as f:
            content1 = f.read()
        
        with open(file2_path, 'r', encoding='utf-8') as f:
            content2 = f.read()
        
        # 生成报告
        html_content = self.generate_diff_report(
            content1, content2,
            os.path.basename(file1_path),
            os.path.basename(file2_path),
            f"文件对比: {os.path.basename(file1_path)} vs {os.path.basename(file2_path)}"
        )
        
        # 保存报告
        if output_path is None:
            output_path = f"diff_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
        
        self.save_diff_report(html_content, output_path)
        return output_path

# 使用示例
generator = HTMLDiffGenerator()

# 创建测试文件
content1 = """server {
    listen 80;
    server_name example.com;
    
    location / {
        proxy_pass http://localhost:8080;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
    }
}"""

content2 = """server {
    listen 80;
    listen 443 ssl;
    server_name example.com www.example.com;
    
    ssl_certificate /path/to/cert.pem;
    ssl_certificate_key /path/to/key.pem;
    
    location / {
        proxy_pass http://backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
}"""

# 生成HTML报告
html = generator.generate_diff_report(
    content1, content2,
    "nginx_config_v1.conf",
    "nginx_config_v2.conf",
    "Nginx配置文件对比报告"
)

# 保存报告
generator.save_diff_report(html, "nginx_diff_report.html")

2.1.3 示例2:对比Nginx配置文件差异

python

import difflib
import re
from typing import Dict, List, Tuple

class NginxConfigDiffer:
    """Nginx配置文件专用对比工具"""
    
    def __init__(self):
        self.directive_pattern = re.compile(r'^s*(w+)s+(.+?);?s*$')
        self.block_pattern = re.compile(r'^s*(w+)s*{?s*$')
        
    def parse_nginx_config(self, content: str) -> Dict[str, any]:
        """解析Nginx配置文件结构"""
        lines = content.splitlines()
        config = {'directives': {}, 'blocks': {}}
        current_block = None
        block_content = []
        
        for line in lines:
            line = line.strip()
            if not line or line.startswith('#'):
                continue
            
            # 检查是否是块的开始
            if '{' in line:
                match = self.block_pattern.match(line.replace('{', ''))
                if match:
                    current_block = match.group(1)
                    block_content = []
            # 检查是否是块的结束
            elif '}' in line:
                if current_block:
                    config['blocks'][current_block] = block_content
                    current_block = None
                    block_content = []
            # 在块内
            elif current_block:
                block_content.append(line)
            # 顶级指令
            else:
                match = self.directive_pattern.match(line)
                if match:
                    directive = match.group(1)
                    value = match.group(2).rstrip(';')
                    if directive in config['directives']:
                        if isinstance(config['directives'][directive], list):
                            config['directives'][directive].append(value)
                        else:
                            config['directives'][directive] = [
                                config['directives'][directive], value
                            ]
                    else:
                        config['directives'][directive] = value
        
        return config
    
    def compare_nginx_configs(self, config1: str, config2: str) -> Dict[str, any]:
        """比较两个Nginx配置文件"""
        parsed1 = self.parse_nginx_config(config1)
        parsed2 = self.parse_nginx_config(config2)
        
        result = {
            'added_directives': {},
            'removed_directives': {},
            'modified_directives': {},
            'added_blocks': {},
            'removed_blocks': {},
            'modified_blocks': {}
        }
        
        # 比较指令
        all_directives = set(parsed1['directives'].keys()) | set(parsed2['directives'].keys())
        for directive in all_directives:
            if directive in parsed1['directives'] and directive not in parsed2['directives']:
                result['removed_directives'][directive] = parsed1['directives'][directive]
            elif directive not in parsed1['directives'] and directive in parsed2['directives']:
                result['added_directives'][directive] = parsed2['directives'][directive]
            elif parsed1['directives'][directive] != parsed2['directives'][directive]:
                result['modified_directives'][directive] = {
                    'old': parsed1['directives'][directive],
                    'new': parsed2['directives'][directive]
                }
        
        # 比较块
        all_blocks = set(parsed1['blocks'].keys()) | set(parsed2['blocks'].keys())
        for block in all_blocks:
            if block in parsed1['blocks'] and block not in parsed2['blocks']:
                result['removed_blocks'][block] = parsed1['blocks'][block]
            elif block not in parsed1['blocks'] and block in parsed2['blocks']:
                result['added_blocks'][block] = parsed2['blocks'][block]
            elif parsed1['blocks'][block] != parsed2['blocks'][block]:
                result['modified_blocks'][block] = {
                    'old': parsed1['blocks'][block],
                    'new': parsed2['blocks'][block]
                }
        
        return result
    
    def generate_nginx_diff_report(self, config1: str, config2: str) -> str:
        """生成Nginx配置差异报告"""
        diff_result = self.compare_nginx_configs(config1, config2)
        
        report = ["="*60, "Nginx配置文件差异报告", "="*60, ""]
        
        # 新增的指令
        if diff_result['added_directives']:
            report.append("【新增的指令】")
            for directive, value in diff_result['added_directives'].items():
                report.append(f"  + {directive} {value};")
            report.append("")
        
        # 删除的指令
        if diff_result['removed_directives']:
            report.append("【删除的指令】")
            for directive, value in diff_result['removed_directives'].items():
                report.append(f"  - {directive} {value};")
            report.append("")
        
        # 修改的指令
        if diff_result['modified_directives']:
            report.append("【修改的指令】")
            for directive, changes in diff_result['modified_directives'].items():
                report.append(f"  ~ {directive}:")
                report.append(f"    旧值: {changes['old']}")
                report.append(f"    新值: {changes['new']}")
            report.append("")
        
        # 块的变化
        if diff_result['added_blocks']:
            report.append("【新增的配置块】")
            for block, content in diff_result['added_blocks'].items():
                report.append(f"  + {block} {
           {")
                for line in content:
                    report.append(f"      {line}")
                report.append("    }")
            report.append("")
        
        if diff_result['removed_blocks']:
            report.append("【删除的配置块】")
            for block, content in diff_result['removed_blocks'].items():
                report.append(f"  - {block} {
           {")
                for line in content:
                    report.append(f"      {line}")
                report.append("    }")
            report.append("")
        
        if diff_result['modified_blocks']:
            report.append("【修改的配置块】")
            for block, changes in diff_result['modified_blocks'].items():
                report.append(f"  ~ {block} {
           {")
                report.append("    变更内容:")
                # 使用difflib显示块内的具体变化
                block_diff = difflib.unified_diff(
                    changes['old'], 
                    changes['new'],
                    lineterm='',
                    n=0
                )
                for line in block_diff:
                    if line.startswith('@@'):
                        continue
                    report.append(f"      {line}")
                report.append("    }")
            report.append("")
        
        return '
'.join(report)
    
    def validate_config_changes(self, config1: str, config2: str) -> List[str]:
        """验证配置变更的安全性"""
        warnings = []
        diff_result = self.compare_nginx_configs(config1, config2)
        
        # 检查关键指令的变更
        critical_directives = ['user', 'worker_processes', 'error_log', 'pid']
        for directive in critical_directives:
            if directive in diff_result['modified_directives']:
                warnings.append(f"警告: 关键指令 '{directive}' 被修改")
        
        # 检查SSL配置
        if 'ssl_certificate' in diff_result['removed_directives']:
            warnings.append("警告: SSL证书配置被删除")
        
        # 检查监听端口变更
        for changes in diff_result['modified_blocks'].values():
            old_content = ' '.join(changes['old'])
            new_content = ' '.join(changes['new'])
            if 'listen' in old_content and 'listen' not in new_content:
                warnings.append("警告: 监听端口配置可能被删除")
        
        return warnings

# 使用示例
nginx_differ = NginxConfigDiffer()

# 示例配置文件1
nginx_config_v1 = """
user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log;
pid /run/nginx.pid;

events {
    worker_connections 1024;
}

http {
    log_format main '$remote_addr - $remote_user [$time_local] "$request" '
                    '$status $body_bytes_sent "$http_referer" '
                    '"$http_user_agent" "$http_x_forwarded_for"';
    
    access_log /var/log/nginx/access.log main;
    
    server {
        listen 80;
        server_name example.com;
        root /var/www/html;
        
        location / {
            index index.html index.htm;
        }
    }
}
"""

# 示例配置文件2
nginx_config_v2 = """
user www-data;
worker_processes 4;
error_log /var/log/nginx/error.log warn;
pid /run/nginx.pid;

events {
    worker_connections 2048;
    use epoll;
}

http {
    log_format main '$remote_addr - $remote_user [$time_local] "$request" '
                    '$status $body_bytes_sent "$http_referer" '
                    '"$http_user_agent" "$http_x_forwarded_for"';
    
    access_log /var/log/nginx/access.log main;
    
    gzip on;
    gzip_types text/plain text/css application/json;
    
    server {
        listen 80;
        listen 443 ssl;
        server_name example.com www.example.com;
        root /var/www/html;
        
        ssl_certificate /etc/nginx/ssl/cert.pem;
        ssl_certificate_key /etc/nginx/ssl/key.pem;
        
        location / {
            index index.html index.htm;
            try_files $uri $uri/ =404;
        }
        
        location /api {
            proxy_pass http://localhost:8080;
            proxy_set_header Host $host;
        }
    }
}
"""

# 生成差异报告
report = nginx_differ.generate_nginx_diff_report(nginx_config_v1, nginx_config_v2)
print(report)

# 验证配置变更
warnings = nginx_differ.validate_config_changes(nginx_config_v1, nginx_config_v2)
if warnings:
    print("
配置变更警告:")
    for warning in warnings:
        print(f"  {warning}")

2.2 文件与目录差异对比方法

文件和目录的差异对比在部署验证、备份恢复、同步检查等场景中非常重要。

2.2.1 模块常用方法说明

python

import filecmp
import os
import hashlib
from pathlib import Path
from typing import Dict, List, Tuple

class FileComparisonTools:
    """文件比较工具集"""
    
    @staticmethod
    def compare_files_basic(file1: str, file2: str) -> bool:
        """基础文件比较"""
        return filecmp.cmp(file1, file2, shallow=False)
    
    @staticmethod
    def compare_files_detailed(file1: str, file2: str) -> Dict[str, any]:
        """详细文件比较"""
        result = {
            'identical': False,
            'size_match': False,
            'content_match': False,
            'file1_info': {},
            'file2_info': {}
        }
        
        # 获取文件信息
        try:
            stat1 = os.stat(file1)
            stat2 = os.stat(file2)
            
            result['file1_info'] = {
                'path': file1,
                'size': stat1.st_size,
                'mtime': stat1.st_mtime,
                'mode': oct(stat1.st_mode)
            }
            
            result['file2_info'] = {
                'path': file2,
                'size': stat2.st_size,
                'mtime': stat2.st_mtime,
                'mode': oct(stat2.st_mode)
            }
            
            # 比较文件大小
            result['size_match'] = stat1.st_size == stat2.st_size
            
            # 比较文件内容
            if result['size_match']:
                result['content_match'] = filecmp.cmp(file1, file2, shallow=False)
                result['identical'] = result['content_match']
            
            # 计算文件哈希
            result['file1_info']['md5'] = FileComparisonTools._calculate_md5(file1)
            result['file2_info']['md5'] = FileComparisonTools._calculate_md5(file2)
            
        except Exception as e:
            result['error'] = str(e)
        
        return result
    
    @staticmethod
    def _calculate_md5(filepath: str) -> str:
        """计算文件MD5值"""
        hash_md5 = hashlib.md5()
        with open(filepath, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()
    
    @staticmethod
    def compare_directories_shallow(dir1: str, dir2: str) -> Dict[str, List[str]]:
        """浅层目录比较"""
        dcmp = filecmp.dircmp(dir1, dir2)
        
        return {
            'left_only': dcmp.left_only,
            'right_only': dcmp.right_only,
            'common_files': dcmp.common_files,
            'common_dirs': dcmp.common_dirs,
            'diff_files': dcmp.diff_files,
            'same_files': dcmp.same_files,
            'funny_files': dcmp.funny_files
        }
    
    @staticmethod
    def compare_directories_recursive(dir1: str, dir2: str) -> Dict[str, any]:
        """递归目录比较"""
        result = {
            'identical': True,
            'differences': [],
            'left_only': [],
            'right_only': [],
            'diff_files': [],
            'error_files': []
        }
        
        def _compare_recursive(dcmp, prefix=""):
            # 处理只在左边存在的文件/目录
            for name in dcmp.left_only:
                result['left_only'].append(os.path.join(prefix, name))
                result['identical'] = False
            
            # 处理只在右边存在的文件/目录
            for name in dcmp.right_only:
                result['right_only'].append(os.path.join(prefix, name))
                result['identical'] = False
            
            # 处理不同的文件
            for name in dcmp.diff_files:
                result['diff_files'].append(os.path.join(prefix, name))
                result['identical'] = False
            
            # 处理无法比较的文件
            for name in dcmp.funny_files:
                result['error_files'].append(os.path.join(prefix, name))
            
            # 递归处理子目录
            for sub_dcmp in dcmp.subdirs.values():
                sub_prefix = os.path.join(prefix, os.path.basename(sub_dcmp.right))
                _compare_recursive(sub_dcmp, sub_prefix)
        
        dcmp = filecmp.dircmp(dir1, dir2)
        _compare_recursive(dcmp)
        
        return result

class DirectorySynchronizer:
    """目录同步工具"""
    
    def __init__(self, source_dir: str, target_dir: str):
        self.source_dir = Path(source_dir)
        self.target_dir = Path(target_dir)
        self.dry_run = True
        self.ignore_patterns = []
        
    def set_ignore_patterns(self, patterns: List[str]):
        """设置忽略模式"""
        self.ignore_patterns = patterns
    
    def should_ignore(self, path: Path) -> bool:
        """检查是否应该忽略该路径"""
        for pattern in self.ignore_patterns:
            if pattern in str(path):
                return True
        return False
    
    def analyze_sync_requirements(self) -> Dict[str, List[str]]:
        """分析同步需求"""
        requirements = {
            'files_to_copy': [],
            'files_to_update': [],
            'files_to_delete': [],
            'dirs_to_create': [],
            'dirs_to_delete': []
        }
        
        # 遍历源目录
        for source_path in self.source_dir.rglob('*'):
            if self.should_ignore(source_path):
                continue
            
            relative_path = source_path.relative_to(self.source_dir)
            target_path = self.target_dir / relative_path
            
            if source_path.is_file():
                if not target_path.exists():
                    requirements['files_to_copy'].append(str(relative_path))
                elif not filecmp.cmp(str(source_path), str(target_path), shallow=False):
                    requirements['files_to_update'].append(str(relative_path))
            elif source_path.is_dir() and not target_path.exists():
                requirements['dirs_to_create'].append(str(relative_path))
        
        # 遍历目标目录,找出需要删除的文件
        for target_path in self.target_dir.rglob('*'):
            relative_path = target_path.relative_to(self.target_dir)
            source_path = self.source_dir / relative_path
            
            if not source_path.exists() and not self.should_ignore(target_path):
                if target_path.is_file():
                    requirements['files_to_delete'].append(str(relative_path))
                elif target_path.is_dir():
                    requirements['dirs_to_delete'].append(str(relative_path))
        
        return requirements
    
    def generate_sync_report(self) -> str:
        """生成同步报告"""
        requirements = self.analyze_sync_requirements()
        
        report = []
        report.append("="*60)
        report.append(f"目录同步分析报告")
        report.append(f"源目录: {self.source_dir}")
        report.append(f"目标目录: {self.target_dir}")
        report.append("="*60)
        
        total_operations = sum(len(v) for v in requirements.values())
        
        if total_operations == 0:
            report.append("
目录已同步,无需操作。")
        else:
            report.append(f"
需要执行 {total_operations} 个操作:")
            
            if requirements['dirs_to_create']:
                report.append(f"
创建目录 ({len(requirements['dirs_to_create'])} 个):")
                for dir_path in requirements['dirs_to_create']:
                    report.append(f"  + {dir_path}/")
            
            if requirements['files_to_copy']:
                report.append(f"
复制文件 ({len(requirements['files_to_copy'])} 个):")
                for file_path in requirements['files_to_copy']:
                    report.append(f"  + {file_path}")
            
            if requirements['files_to_update']:
                report.append(f"
更新文件 ({len(requirements['files_to_update'])} 个):")
                for file_path in requirements['files_to_update']:
                    report.append(f"  ~ {file_path}")
            
            if requirements['files_to_delete']:
                report.append(f"
删除文件 ({len(requirements['files_to_delete'])} 个):")
                for file_path in requirements['files_to_delete']:
                    report.append(f"  - {file_path}")
            
            if requirements['dirs_to_delete']:
                report.append(f"
删除目录 ({len(requirements['dirs_to_delete'])} 个):")
                for dir_path in requirements['dirs_to_delete']:
                    report.append(f"  - {dir_path}/")
        
        return '
'.join(report)

# 使用示例
# 创建测试目录结构
import tempfile
import shutil

def create_test_directories():
    """创建测试目录结构"""
    # 创建临时目录
    temp_dir = tempfile.mkdtemp()
    dir1 = os.path.join(temp_dir, "project_v1")
    dir2 = os.path.join(temp_dir, "project_v2")
    
    # 创建目录结构
    os.makedirs(os.path.join(dir1, "src"))
    os.makedirs(os.path.join(dir1, "config"))
    os.makedirs(os.path.join(dir2, "src"))
    os.makedirs(os.path.join(dir2, "config"))
    os.makedirs(os.path.join(dir2, "docs"))
    
    # 创建文件
    with open(os.path.join(dir1, "src", "main.py"), "w") as f:
        f.write("print('Hello, World!')")
    
    with open(os.path.join(dir2, "src", "main.py"), "w") as f:
        f.write("print('Hello, Python!')")
    
    with open(os.path.join(dir1, "config", "settings.ini"), "w") as f:
        f.write("[DEFAULT]
version=1.0")
    
    with open(os.path.join(dir2, "config", "settings.ini"), "w") as f:
        f.write("[DEFAULT]
version=1.0")
    
    with open(os.path.join(dir2, "README.md"), "w") as f:
        f.write("# Project v2")
    
    return dir1, dir2, temp_dir

# 测试文件比较
print("文件比较示例:")
dir1, dir2, temp_dir = create_test_directories()

# 基础文件比较
file1 = os.path.join(dir1, "config", "settings.ini")
file2 = os.path.join(dir2, "config", "settings.ini")
print(f"
比较 settings.ini: {FileComparisonTools.compare_files_basic(file1, file2)}")

# 详细文件比较
file1 = os.path.join(dir1, "src", "main.py")
file2 = os.path.join(dir2, "src", "main.py")
detailed_result = FileComparisonTools.compare_files_detailed(file1, file2)
print(f"
详细比较 main.py:")
print(f"  文件相同: {detailed_result['identical']}")
print(f"  大小匹配: {detailed_result['size_match']}")
print(f"  内容匹配: {detailed_result['content_match']}")

# 目录比较
print("
目录比较结果:")
dir_comparison = FileComparisonTools.compare_directories_recursive(dir1, dir2)
print(f"  目录相同: {dir_comparison['identical']}")
print(f"  仅在 v1 中: {dir_comparison['left_only']}")
print(f"  仅在 v2 中: {dir_comparison['right_only']}")
print(f"  不同文件: {dir_comparison['diff_files']}")

# 目录同步分析
print("
目录同步分析:")
synchronizer = DirectorySynchronizer(dir2, dir1)
print(synchronizer.generate_sync_report())

# 清理临时目录
shutil.rmtree(temp_dir)

2.2.2 实践:校验源与备份目录差异

python

import os
import filecmp
import hashlib
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Set, Tuple
import concurrent.futures
import logging

class BackupVerifier:
    """备份验证工具"""
    
    def __init__(self, source_dir: str, backup_dir: str):
        self.source_dir = Path(source_dir)
        self.backup_dir = Path(backup_dir)
        self.logger = logging.getLogger(__name__)
        
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        
        # 验证结果
        self.verification_result = {
            'timestamp': datetime.now().isoformat(),
            'source_dir': str(self.source_dir),
            'backup_dir': str(self.backup_dir),
            'status': 'pending',
            'statistics': {},
            'issues': [],
            'file_checksums': {}
        }
    
    def calculate_file_checksum(self, filepath: Path, algorithm='sha256') -> str:
        """计算文件校验和"""
        hash_func = hashlib.new(algorithm)
        
        try:
            with open(filepath, 'rb') as f:
                for chunk in iter(lambda: f.read(8192), b''):
                    hash_func.update(chunk)
            return hash_func.hexdigest()
        except Exception as e:
            self.logger.error(f"计算文件校验和失败 {filepath}: {e}")
            return None
    
    def verify_file_integrity(self, source_file: Path, backup_file: Path) -> Dict[str, any]:
        """验证单个文件的完整性"""
        result = {
            'source_file': str(source_file),
            'backup_file': str(backup_file),
            'status': 'unknown',
            'details': {}
        }
        
        # 检查文件是否存在
        if not backup_file.exists():
            result['status'] = 'missing'
            result['details']['error'] = '备份文件不存在'
            return result
        
        # 比较文件大小
        source_size = source_file.stat().st_size
        backup_size = backup_file.stat().st_size
        
        if source_size != backup_size:
            result['status'] = 'size_mismatch'
            result['details']['source_size'] = source_size
            result['details']['backup_size'] = backup_size
            return result
        
        # 快速比较
        if filecmp.cmp(str(source_file), str(backup_file), shallow=False):
            result['status'] = 'identical'
            result['details']['size'] = source_size
            
            # 对重要文件计算校验和
            if source_file.suffix in ['.db', '.sql', '.zip', '.tar', '.gz']:
                checksum = self.calculate_file_checksum(source_file)
                result['details']['checksum'] = checksum
                self.verification_result['file_checksums'][str(source_file)] = checksum
        else:
            result['status'] = 'content_mismatch'
            result['details']['source_checksum'] = self.calculate_file_checksum(source_file)
            result['details']['backup_checksum'] = self.calculate_file_checksum(backup_file)
        
        return result
    
    def verify_directory_structure(self) -> Dict[str, Set[Path]]:
        """验证目录结构"""
        source_files = set()
        backup_files = set()
        
        # 收集源目录文件
        for file_path in self.source_dir.rglob('*'):
            if file_path.is_file():
                relative_path = file_path.relative_to(self.source_dir)
                source_files.add(relative_path)
        
        # 收集备份目录文件
        for file_path in self.backup_dir.rglob('*'):
            if file_path.is_file():
                relative_path = file_path.relative_to(self.backup_dir)
                backup_files.add(relative_path)
        
        return {
            'common_files': source_files & backup_files,
            'source_only': source_files - backup_files,
            'backup_only': backup_files - source_files
        }
    
    def parallel_verify_files(self, file_pairs: List[Tuple[Path, Path]], 
                            max_workers: int = 4) -> List[Dict]:
        """并行验证多个文件"""
        results = []
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_file = {
                executor.submit(self.verify_file_integrity, source, backup): (source, backup)
                for source, backup in file_pairs
            }
            
            for future in concurrent.futures.as_completed(future_to_file):
                try:
                    result = future.result()
                    results.append(result)
                    
                    # 记录问题
                    if result['status'] != 'identical':
                        self.verification_result['issues'].append(result)
                        
                except Exception as e:
                    source, backup = future_to_file[future]
                    self.logger.error(f"验证文件失败 {source}: {e}")
                    results.append({
                        'source_file': str(source),
                        'backup_file': str(backup),
                        'status': 'error',
                        'details': {'error': str(e)}
                    })
        
        return results
    
    def verify_backup(self, max_workers: int = 4) -> Dict[str, any]:
        """执行完整的备份验证"""
        self.logger.info("开始备份验证...")
        start_time = datetime.now()
        
        # 验证目录结构
        self.logger.info("验证目录结构...")
        structure_result = self.verify_directory_structure()
        
        # 统计信息
        self.verification_result['statistics'] = {
            'total_source_files': len(structure_result['common_files']) + 
                                len(structure_result['source_only']),
            'total_backup_files': len(structure_result['common_files']) + 
                                len(structure_result['backup_only']),
            'common_files': len(structure_result['common_files']),
            'missing_in_backup': len(structure_result['source_only']),
            'extra_in_backup': len(structure_result['backup_only'])
        }
        
        # 记录缺失的文件
        for missing_file in structure_result['source_only']:
            self.verification_result['issues'].append({
                'source_file': str(self.source_dir / missing_file),
                'backup_file': str(self.backup_dir / missing_file),
                'status': 'missing',
                'details': {'error': '备份中缺失该文件'}
            })
        
        # 记录额外的文件
        for extra_file in structure_result['backup_only']:
            self.verification_result['issues'].append({
                'source_file': str(self.source_dir / extra_file),
                'backup_file': str(self.backup_dir / extra_file),
                'status': 'extra',
                'details': {'error': '备份中存在额外文件'}
            })
        
        # 准备需要验证的文件对
        file_pairs = [
            (self.source_dir / rel_path, self.backup_dir / rel_path)
            for rel_path in structure_result['common_files']
        ]
        
        # 并行验证文件内容
        self.logger.info(f"验证 {len(file_pairs)} 个文件的内容...")
        verification_results = self.parallel_verify_files(file_pairs, max_workers)
        
        # 统计验证结果
        status_counts = {}
        for result in verification_results:
            status = result['status']
            status_counts[status] = status_counts.get(status, 0) + 1
        
        self.verification_result['statistics']['verification_results'] = status_counts
        
        # 确定整体状态
        if self.verification_result['issues']:
            self.verification_result['status'] = 'failed'
        else:
            self.verification_result['status'] = 'success'
        
        # 记录耗时
        end_time = datetime.now()
        self.verification_result['duration'] = str(end_time - start_time)
        
        self.logger.info(f"备份验证完成,状态: {self.verification_result['status']}")
        
        return self.verification_result
    
    def generate_verification_report(self, output_file: str = None) -> str:
        """生成验证报告"""
        report = []
        report.append("="*70)
        report.append("备份验证报告")
        report.append("="*70)
        report.append(f"验证时间: {self.verification_result['timestamp']}")
        report.append(f"源目录: {self.verification_result['source_dir']}")
        report.append(f"备份目录: {self.verification_result['backup_dir']}")
        report.append(f"验证状态: {self.verification_result['status'].upper()}")
        report.append(f"耗时: {self.verification_result.get('duration', 'N/A')}")
        
        report.append("
统计信息:")
        stats = self.verification_result['statistics']
        report.append(f"  源文件总数: {stats.get('total_source_files', 0)}")
        report.append(f"  备份文件总数: {stats.get('total_backup_files', 0)}")
        report.append(f"  共同文件数: {stats.get('common_files', 0)}")
        report.append(f"  缺失文件数: {stats.get('missing_in_backup', 0)}")
        report.append(f"  额外文件数: {stats.get('extra_in_backup', 0)}")
        
        if 'verification_results' in stats:
            report.append("
文件验证结果:")
            for status, count in stats['verification_results'].items():
                report.append(f"  {status}: {count}")
        
        if self.verification_result['issues']:
            report.append(f"
发现的问题 (共 {len(self.verification_result['issues'])} 个):")
            
            # 按问题类型分组
            issues_by_type = {}
            for issue in self.verification_result['issues']:
                issue_type = issue['status']
                if issue_type not in issues_by_type:
                    issues_by_type[issue_type] = []
                issues_by_type[issue_type].append(issue)
            
            for issue_type, issues in issues_by_type.items():
                report.append(f"
  {issue_type.upper()} ({len(issues)} 个):")
                for i, issue in enumerate(issues[:10]):  # 只显示前10个
                    report.append(f"    - {issue['source_file']}")
                    if 'details' in issue and 'error' in issue['details']:
                        report.append(f"      {issue['details']['error']}")
                
                if len(issues) > 10:
                    report.append(f"    ... 还有 {len(issues) - 10} 个 {issue_type} 问题")
        
        report_text = '
'.join(report)
        
        # 保存报告
        if output_file:
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(report_text)
            
            # 同时保存JSON格式的详细结果
            json_file = output_file.replace('.txt', '.json')
            with open(json_file, 'w', encoding='utf-8') as f:
                json.dump(self.verification_result, f, indent=2, ensure_ascii=False)
            
            self.logger.info(f"验证报告已保存到: {output_file}")
            self.logger.info(f"详细结果已保存到: {json_file}")
        
        return report_text

class IncrementalBackupVerifier(BackupVerifier):
    """增量备份验证工具"""
    
    def __init__(self, source_dir: str, backup_dir: str, last_backup_time: datetime = None):
        super().__init__(source_dir, backup_dir)
        self.last_backup_time = last_backup_time
    
    def get_modified_files(self) -> List[Path]:
        """获取自上次备份以来修改的文件"""
        modified_files = []
        
        if not self.last_backup_time:
            # 如果没有指定上次备份时间,返回所有文件
            for file_path in self.source_dir.rglob('*'):
                if file_path.is_file():
                    modified_files.append(file_path)
        else:
            # 获取修改时间晚于上次备份时间的文件
            last_backup_timestamp = self.last_backup_time.timestamp()
            for file_path in self.source_dir.rglob('*'):
                if file_path.is_file():
                    mtime = file_path.stat().st_mtime
                    if mtime > last_backup_timestamp:
                        modified_files.append(file_path)
        
        return modified_files
    
    def verify_incremental_backup(self) -> Dict[str, any]:
        """验证增量备份"""
        self.logger.info("开始增量备份验证...")
        
        # 获取需要验证的文件
        modified_files = self.get_modified_files()
        self.logger.info(f"发现 {len(modified_files)} 个需要验证的文件")
        
        # 准备文件对
        file_pairs = []
        for source_file in modified_files:
            relative_path = source_file.relative_to(self.source_dir)
            backup_file = self.backup_dir / relative_path
            file_pairs.append((source_file, backup_file))
        
        # 执行验证
        verification_results = self.parallel_verify_files(file_pairs)
        
        # 更新统计信息
        self.verification_result['statistics']['incremental_files'] = len(modified_files)
        self.verification_result['statistics']['last_backup_time'] = (
            self.last_backup_time.isoformat() if self.last_backup_time else 'N/A'
        )
        
        return self.verification_result

# 使用示例
def demo_backup_verification():
    """演示备份验证"""
    # 创建测试环境
    import tempfile
    import shutil
    
    with tempfile.TemporaryDirectory() as temp_dir:
        # 创建源目录和备份目录
        source_dir = Path(temp_dir) / "source"
        backup_dir = Path(temp_dir) / "backup"
        
        # 创建测试文件结构
        source_dir.mkdir()
        backup_dir.mkdir()
        
        # 创建一些测试文件
        (source_dir / "data").mkdir()
        (source_dir / "config").mkdir()
        (backup_dir / "data").mkdir()
        (backup_dir / "config").mkdir()
        
        # 相同的文件
        (source_dir / "config" / "app.conf").write_text("version=1.0
")
        (backup_dir / "config" / "app.conf").write_text("version=1.0
")
        
        # 内容不同的文件
        (source_dir / "data" / "users.txt").write_text("user1
user2
")
        (backup_dir / "data" / "users.txt").write_text("user1
")
        
        # 只在源目录存在的文件
        (source_dir / "data" / "new_data.csv").write_text("id,name
1,test
")
        
        # 只在备份目录存在的文件
        (backup_dir / "data" / "old_data.csv").write_text("deprecated
")
        
        # 执行备份验证
        verifier = BackupVerifier(str(source_dir), str(backup_dir))
        result = verifier.verify_backup(max_workers=2)
        
        # 生成报告
        report = verifier.generate_verification_report(
            str(Path(temp_dir) / "backup_verification_report.txt")
        )
        
        print(report)
        
        # 测试增量备份验证
        print("
" + "="*70)
        print("增量备份验证示例:")
        print("="*70)
        
        # 修改一个文件
        import time
        time.sleep(1)  # 确保时间戳不同
        last_backup_time = datetime.now()
        
        (source_dir / "config" / "app.conf").write_text("version=1.1
")
        (source_dir / "data" / "new_file.txt").write_text("new content
")
        
        # 执行增量验证
        inc_verifier = IncrementalBackupVerifier(
            str(source_dir), 
            str(backup_dir),
            last_backup_time
        )
        inc_result = inc_verifier.verify_incremental_backup()
        inc_report = inc_verifier.generate_verification_report()
        
        print("
增量备份验证结果:")
        print(f"需要验证的文件数: {inc_result['statistics']['incremental_files']}")

if __name__ == "__main__":
    demo_backup_verification()

2.3 发送电子邮件模块 smtplib

邮件通知是运维监控系统的重要组成部分,用于及时通知管理员系统状态和告警信息。

2.3.1 smtplib模块的常用类与方法

python

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email import encoders
from email.header import Header
from email.utils import formataddr, formatdate
import ssl
import os
from typing import List, Dict, Union

class EmailSender:
    """邮件发送基础类"""
    
    def __init__(self, smtp_server: str, smtp_port: int, 
                 username: str, password: str, use_tls: bool = True):
        self.smtp_server = smtp_server
        self.smtp_port = smtp_port
        self.username = username
        self.password = password
        self.use_tls = use_tls
        
    def _create_connection(self) -> smtplib.SMTP:
        """创建SMTP连接"""
        if self.use_tls and self.smtp_port == 465:
            # SSL连接
            context = ssl.create_default_context()
            smtp = smtplib.SMTP_SSL(self.smtp_server, self.smtp_port, context=context)
        else:
            # 普通连接或STARTTLS
            smtp = smtplib.SMTP(self.smtp_server, self.smtp_port)
            if self.use_tls:
                context = ssl.create_default_context()
                smtp.starttls(context=context)
        
        # 登录
        smtp.login(self.username, self.password)
        return smtp
    
    def send_text_email(self, to_addr: Union[str, List[str]], 
                       subject: str, content: str,
                       cc_addr: Union[str, List[str]] = None,
                       bcc_addr: Union[str, List[str]] = None) -> bool:
        """发送纯文本邮件"""
        try:
            # 创建邮件对象
            msg = MIMEText(content, 'plain', 'utf-8')
            
            # 设置邮件头
            msg['From'] = formataddr(("自动化系统", self.username))
            msg['Subject'] = Header(subject, 'utf-8')
            msg['Date'] = formatdate(localtime=True)
            
            # 处理收件人
            if isinstance(to_addr, str):
                to_addr = [to_addr]
            msg['To'] = ', '.join(to_addr)
            
            all_recipients = to_addr.copy()
            
            # 处理抄送
            if cc_addr:
                if isinstance(cc_addr, str):
                    cc_addr = [cc_addr]
                msg['Cc'] = ', '.join(cc_addr)
                all_recipients.extend(cc_addr)
            
            # 处理密送
            if bcc_addr:
                if isinstance(bcc_addr, str):
                    bcc_addr = [bcc_addr]
                all_recipients.extend(bcc_addr)
            
            # 发送邮件
            with self._create_connection() as smtp:
                smtp.send_message(msg, from_addr=self.username, to_addrs=all_recipients)
            
            return True
            
        except Exception as e:
            print(f"发送邮件失败: {e}")
            return False
    
    def send_html_email(self, to_addr: Union[str, List[str]], 
                       subject: str, html_content: str,
                       text_content: str = None) -> bool:
        """发送HTML邮件"""
        try:
            # 创建多部分邮件
            msg = MIMEMultipart('alternative')
            msg['From'] = formataddr(("自动化系统", self.username))
            msg['To'] = to_addr if isinstance(to_addr, str) else ', '.join(to_addr)
            msg['Subject'] = Header(subject, 'utf-8')
            msg['Date'] = formatdate(localtime=True)
            
            # 添加文本部分(可选)
            if text_content:
                text_part = MIMEText(text_content, 'plain', 'utf-8')
                msg.attach(text_part)
            
            # 添加HTML部分
            html_part = MIMEText(html_content, 'html', 'utf-8')
            msg.attach(html_part)
            
            # 发送邮件
            recipients = [to_addr] if isinstance(to_addr, str) else to_addr
            with self._create_connection() as smtp:
                smtp.send_message(msg, from_addr=self.username, to_addrs=recipients)
            
            return True
            
        except Exception as e:
            print(f"发送HTML邮件失败: {e}")
            return False
    
    def send_email_with_attachments(self, to_addr: Union[str, List[str]], 
                                   subject: str, content: str,
                                   attachments: List[str],
                                   html: bool = False) -> bool:
        """发送带附件的邮件"""
        try:
            # 创建邮件对象
            msg = MIMEMultipart()
            msg['From'] = formataddr(("自动化系统", self.username))
            msg['To'] = to_addr if isinstance(to_addr, str) else ', '.join(to_addr)
            msg['Subject'] = Header(subject, 'utf-8')
            msg['Date'] = formatdate(localtime=True)
            
            # 添加邮件正文
            content_type = 'html' if html else 'plain'
            msg.attach(MIMEText(content, content_type, 'utf-8'))
            
            # 添加附件
            for attachment_path in attachments:
                if os.path.isfile(attachment_path):
                    self._attach_file(msg, attachment_path)
            
            # 发送邮件
            recipients = [to_addr] if isinstance(to_addr, str) else to_addr
            with self._create_connection() as smtp:
                smtp.send_message(msg, from_addr=self.username, to_addrs=recipients)
            
            return True
            
        except Exception as e:
            print(f"发送带附件邮件失败: {e}")
            return False
    
    def _attach_file(self, msg: MIMEMultipart, filepath: str):
        """添加文件附件"""
        filename = os.path.basename(filepath)
        
        # 判断文件类型
        if filepath.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp')):
            # 图片附件
            with open(filepath, 'rb') as f:
                img = MIMEImage(f.read())
                img.add_header('Content-Disposition', 'attachment', filename=filename)
                msg.attach(img)
        else:
            # 其他类型文件
            with open(filepath, 'rb') as f:
                part = MIMEBase('application', 'octet-stream')
                part.set_payload(f.read())
                encoders.encode_base64(part)
                part.add_header('Content-Disposition', 'attachment', 
                              filename=Header(filename, 'utf-8').encode())
                msg.attach(part)

# 常见邮件服务器配置
EMAIL_CONFIGS = {
    'gmail': {
        'smtp_server': 'smtp.gmail.com',
        'smtp_port': 587,
        'use_tls': True
    },
    'outlook': {
        'smtp_server': 'smtp-mail.outlook.com',
        'smtp_port': 587,
        'use_tls': True
    },
    'qq': {
        'smtp_server': 'smtp.qq.com',
        'smtp_port': 465,
        'use_tls': True
    },
    '163': {
        'smtp_server': 'smtp.163.com',
        'smtp_port': 465,
        'use_tls': True
    },
    'aliyun': {
        'smtp_server': 'smtp.aliyun.com',
        'smtp_port': 465,
        'use_tls': True
    }
}

# 使用示例
def demo_email_sending():
    """演示邮件发送"""
    # 配置邮件发送器(请替换为实际的邮箱和密码)
    sender = EmailSender(
        smtp_server=EMAIL_CONFIGS['qq']['smtp_server'],
        smtp_port=EMAIL_CONFIGS['qq']['smtp_port'],
        username='your_email@qq.com',
        password='your_password_or_auth_code',
        use_tls=EMAIL_CONFIGS['qq']['use_tls']
    )
    
    # 发送纯文本邮件
    print("发送纯文本邮件...")
    result = sender.send_text_email(
        to_addr='recipient@example.com',
        subject='系统监控告警',
        content='服务器CPU使用率超过90%,请及时处理!'
    )
    print(f"发送结果: {result}")
    
    # 发送HTML邮件
    print("
发送HTML邮件...")
    html_content = """
    <html>
    <body>
        <h2>系统监控报告</h2>
        <p>监控时间: 2024-01-01 10:00:00</p>
        <table border="1">
            <tr>
                <th>指标</th>
                <th>当前值</th>
                <th>状态</th>
            </tr>
            <tr>
                <td>CPU使用率</td>
                <td>85%</td>
                <td>告警</td>
            </tr>
            <tr>
                <td>内存使用率</td>
                <td>60%</td>
                <td>正常</td>
            </tr>
        </table>
    </body>
    </html>
    """
    
    result = sender.send_html_email(
        to_addr='recipient@example.com',
        subject='系统监控HTML报告',
        html_content=html_content,
        text_content='这是一份系统监控报告,请使用支持HTML的邮件客户端查看。'
    )
    print(f"发送结果: {result}")

# 注意:实际使用时请配置真实的邮箱账号和密码
# demo_email_sending()

2.3.2 定制个性化的邮件格式方法

python

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.image import MIMEImage
from email.header import Header
from email.utils import formataddr, formatdate
import base64
from string import Template
from datetime import datetime
from typing import Dict, List, Any

class EmailTemplate:
    """邮件模板类"""
    
    # 基础HTML模板
    BASE_TEMPLATE = """
    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="UTF-8">
        <style>
            body {
                font-family: 'Microsoft YaHei', Arial, sans-serif;
                margin: 0;
                padding: 0;
                background-color: #f5f5f5;
            }
            .container {
                max-width: 800px;
                margin: 20px auto;
                background-color: #ffffff;
                box-shadow: 0 0 10px rgba(0,0,0,0.1);
            }
            .header {
                background-color: $header_color;
                color: white;
                padding: 20px;
                text-align: center;
            }
            .content {
                padding: 30px;
            }
            .footer {
                background-color: #f8f8f8;
                padding: 20px;
                text-align: center;
                font-size: 12px;
                color: #666;
            }
            .alert-box {
                padding: 15px;
                margin: 20px 0;
                border-radius: 5px;
            }
            .alert-danger {
                background-color: #f8d7da;
                border: 1px solid #f5c6cb;
                color: #721c24;
            }
            .alert-warning {
                background-color: #fff3cd;
                border: 1px solid #ffeeba;
                color: #856404;
            }
            .alert-success {
                background-color: #d4edda;
                border: 1px solid #c3e6cb;
                color: #155724;
            }
            table {
                width: 100%;
                border-collapse: collapse;
                margin: 20px 0;
            }
            th, td {
                padding: 10px;
                text-align: left;
                border-bottom: 1px solid #ddd;
            }
            th {
                background-color: #f2f2f2;
                font-weight: bold;
            }
            .metric-card {
                display: inline-block;
                width: 30%;
                margin: 10px 1.5%;
                padding: 20px;
                text-align: center;
                border-radius: 5px;
                background-color: #f8f9fa;
            }
            .metric-value {
                font-size: 36px;
                font-weight: bold;
                margin: 10px 0;
            }
            .metric-label {
                color: #666;
                font-size: 14px;
            }
        </style>
    </head>
    <body>
        <div class="container">
            $content
        </div>
    </body>
    </html>
    """
    
    @staticmethod
    def create_alert_email(title: str, alert_level: str, 
                          alert_message: str, details: Dict[str, Any]) -> str:
        """创建告警邮件"""
        # 根据告警级别设置颜色
        header_colors = {
            'critical': '#dc3545',
            'warning': '#ffc107',
            'info': '#17a2b8',
            'success': '#28a745'
        }
        
        header_color = header_colors.get(alert_level, '#007bff')
        
        # 构建内容
        content = f"""
        <div class="header">
            <h1>{title}</h1>
            <p>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
        </div>
        <div class="content">
            <div class="alert-box alert-{alert_level}">
                <h3>告警信息</h3>
                <p>{alert_message}</p>
            </div>
            
            <h3>详细信息</h3>
            <table>
        """
        
        for key, value in details.items():
            content += f"""
                <tr>
                    <td><strong>{key}</strong></td>
                    <td>{value}</td>
                </tr>
            """
        
        content += """
            </table>
        </div>
        <div class="footer">
            <p>此邮件由自动化监控系统发送,请勿回复</p>
        </div>
        """
        
        template = Template(EmailTemplate.BASE_TEMPLATE)
        return template.substitute(header_color=header_color, content=content)
    
    @staticmethod
    def create_report_email(title: str, metrics: List[Dict[str, Any]], 
                          table_data: List[Dict[str, Any]] = None,
                          chart_image_path: str = None) -> str:
        """创建报告邮件"""
        content = f"""
        <div class="header">
            <h1>{title}</h1>
            <p>报告生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
        </div>
        <div class="content">
            <h2>核心指标</h2>
            <div>
        """
        
        # 添加指标卡片
        for metric in metrics:
            color = metric.get('color', '#007bff')
            content += f"""
                <div class="metric-card">
                    <div class="metric-label">{metric['label']}</div>
                    <div class="metric-value">{metric['value']}</div>
                    <div class="metric-label">{metric.get('unit', '')}</div>
                </div>
            """
        
        content += "</div>"
        
        # 添加表格数据
        if table_data:
            content += "<h2>详细数据</h2><table>"
            
            # 表头
            if table_data:
                headers = table_data[0].keys()
                content += "<tr>"
                for header in headers:
                    content += f"<th>{header}</th>"
                content += "</tr>"
                
                # 数据行
                for row in table_data:
                    content += "<tr>"
                    for value in row.values():
                        content += f"<td>{value}</td>"
                    content += "</tr>"
            
            content += "</table>"
        
        # 添加图表
        if chart_image_path:
            content += """
                <h2>趋势图表</h2>
                <div>
                    <img src="cid:chart">

2.3.3 定制常用邮件格式示例详解

python

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.image import MIMEImage
from email.header import Header
import matplotlib.pyplot as plt
import io
import base64
from datetime import datetime, timedelta
import pandas as pd
from typing import Dict, List, Any

class MonitoringEmailGenerator:
    """监控邮件生成器"""
    
    def __init__(self):
        self.style = """
        <style>
            .monitoring-container {
                font-family: Arial, sans-serif;
                max-width: 900px;
                margin: 0 auto;
            }
            .header-section {
                background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
                color: white;
                padding: 30px;
                border-radius: 10px 10px 0 0;
            }
            .status-badge {
                display: inline-block;
                padding: 5px 15px;
                border-radius: 20px;
                font-size: 14px;
                font-weight: bold;
                margin-left: 10px;
            }
            .status-ok { background-color: #48bb78; }
            .status-warning { background-color: #ed8936; }
            .status-critical { background-color: #e53e3e; }
            
            .summary-grid {
                display: grid;
                grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
                gap: 20px;
                padding: 20px;
                background-color: #f7fafc;
            }
            .summary-card {
                background: white;
                padding: 20px;
                border-radius: 8px;
                box-shadow: 0 2px 4px rgba(0,0,0,0.1);
                text-align: center;
            }
            .summary-value {
                font-size: 32px;
                font-weight: bold;
                color: #2d3748;
                margin: 10px 0;
            }
            .summary-label {
                color: #718096;
                font-size: 14px;
            }
            
            .alert-section {
                margin: 20px;
                padding: 20px;
                background-color: #fff5f5;
                border-left: 4px solid #e53e3e;
                border-radius: 4px;
            }
            .alert-item {
                margin: 10px 0;
                padding: 10px;
                background-color: white;
                border-radius: 4px;
            }
            
            .performance-table {
                width: 100%;
                margin: 20px 0;
                border-collapse: collapse;
                background-color: white;
            }
            .performance-table th {
                background-color: #edf2f7;
                padding: 12px;
                text-align: left;
                font-weight: bold;
                border-bottom: 2px solid #e2e8f0;
            }
            .performance-table td {
                padding: 12px;
                border-bottom: 1px solid #e2e8f0;
            }
            .performance-bar {
                background-color: #e2e8f0;
                height: 20px;
                border-radius: 10px;
                overflow: hidden;
                position: relative;
            }
            .performance-fill {
                height: 100%;
                background-color: #48bb78;
                transition: width 0.3s ease;
            }
            .performance-text {
                position: absolute;
                top: 50%;
                left: 50%;
                transform: translate(-50%, -50%);
                font-size: 12px;
                font-weight: bold;
            }
        </style>
        """
    
    def create_system_monitoring_email(self, monitoring_data: Dict[str, Any]) -> str:
        """创建系统监控邮件"""
        html = f"""
        <html>
        <head>
            <meta charset="UTF-8">
            {self.style}
        </head>
        <body>
            <div class="monitoring-container">
                {self._create_header_section(monitoring_data)}
                {self._create_summary_section(monitoring_data)}
                {self._create_alerts_section(monitoring_data.get('alerts', []))}
                {self._create_performance_section(monitoring_data.get('servers', []))}
                {self._create_footer_section()}
            </div>
        </body>
        </html>
        """
        return html
    
    def _create_header_section(self, data: Dict[str, Any]) -> str:
        """创建邮件头部"""
        overall_status = data.get('overall_status', 'unknown')
        status_class = {
            'healthy': 'status-ok',
            'warning': 'status-warning',
            'critical': 'status-critical'
        }.get(overall_status, 'status-warning')
        
        return f"""
        <div class="header-section">
            <h1>系统监控报告
                <span class="status-badge {status_class}">
                    {overall_status.upper()}
                </span>
            </h1>
            <p>监控周期: {data.get('start_time')} - {data.get('end_time')}</p>
            <p>报告生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
        </div>
        """
    
    def _create_summary_section(self, data: Dict[str, Any]) -> str:
        """创建摘要部分"""
        summary = data.get('summary', {})
        
        cards = [
            ('服务器总数', summary.get('total_servers', 0), '#4299e1'),
            ('正常运行', summary.get('healthy_servers', 0), '#48bb78'),
            ('告警数量', summary.get('alert_count', 0), '#ed8936'),
            ('平均负载', f"{summary.get('avg_load', 0):.2f}", '#805ad5')
        ]
        
        cards_html = ''
        for label, value, color in cards:
            cards_html += f"""
            <div class="summary-card">
                <div class="summary-label">{label}</div>
                <div class="summary-value">{value}</div>
            </div>
            """
        
        return f"""
        <div class="summary-grid">
            {cards_html}
        </div>
        """
    
    def _create_alerts_section(self, alerts: List[Dict[str, Any]]) -> str:
        """创建告警部分"""
        if not alerts:
            return ""
        
        alerts_html = '<div class="alert-section"><h2>⚠️ 活动告警</h2>'
        
        for alert in alerts[:5]:  # 只显示前5个告警
            severity_icon = {
                'critical': '🔴',
                'warning': '🟡',
                'info': '🔵'
            }.get(alert.get('severity', 'info'), '⚪')
            
            alerts_html += f"""
            <div class="alert-item">
                <strong>{severity_icon} {alert.get('title', '未知告警')}</strong><br>
                服务器: {alert.get('server', 'N/A')} | 
                时间: {alert.get('time', 'N/A')}<br>
                描述: {alert.get('description', '无描述')}
            </div>
            """
        
        if len(alerts) > 5:
            alerts_html += f'<p>... 还有 {len(alerts) - 5} 个告警</p>'
        
        alerts_html += '</div>'
        return alerts_html
    
    def _create_performance_section(self, servers: List[Dict[str, Any]]) -> str:
        """创建性能部分"""
        if not servers:
            return ""
        
        table_html = """
        <div>
            <h2>服务器性能详情</h2>
            <table class="performance-table">
                <thead>
                    <tr>
                        <th>服务器</th>
                        <th>CPU使用率</th>
                        <th>内存使用率</th>
                        <th>磁盘使用率</th>
                        <th>状态</th>
                    </tr>
                </thead>
                <tbody>
        """
        
        for server in servers:
            cpu = server.get('cpu_usage', 0)
            memory = server.get('memory_usage', 0)
            disk = server.get('disk_usage', 0)
            status = server.get('status', 'unknown')
            
            status_style = {
                'healthy': 'color: #48bb78;',
                'warning': 'color: #ed8936;',
                'critical': 'color: #e53e3e;'
            }.get(status, '')
            
            table_html += f"""
                <tr>
                    <td><strong>{server.get('name', 'Unknown')}</strong></td>
                    <td>{self._create_progress_bar(cpu, 'CPU')}</td>
                    <td>{self._create_progress_bar(memory, 'Memory')}</td>
                    <td>{self._create_progress_bar(disk, 'Disk')}</td>
                    <td>● {status.upper()}</td>
                </tr>
            """
        
        table_html += """
                </tbody>
            </table>
        </div>
        """
        
        return table_html
    
    def _create_progress_bar(self, value: float, label: str) -> str:
        """创建进度条"""
        color = '#48bb78'  # 绿色
        if value > 80:
            color = '#e53e3e'  # 红色
        elif value > 60:
            color = '#ed8936'  # 橙色
        
        return f"""
        <div class="performance-bar">
            <div class="performance-fill"></div>
            <div class="performance-text">{value}%</div>
        </div>
        """
    
    def _create_footer_section(self) -> str:
        """创建邮件尾部"""
        return """
        <div>
            <p>此报告由自动化监控系统生成</p>
            <p>如需详细信息,请登录监控平台查看</p>
            <p>
                <a href="#">查看在线报告</a> |
                <a href="#">配置告警规则</a> |
                <a href="#">联系支持团队</a>
            </p>
        </div>
        """
    
    def create_chart_image(self, data: List[Dict[str, Any]], 
                          metric: str = 'cpu_usage') -> bytes:
        """创建图表图片"""
        plt.figure(figsize=(10, 6))
        plt.style.use('seaborn')
        
        # 准备数据
        times = [datetime.now() - timedelta(hours=i) for i in range(24, 0, -1)]
        values = [d.get(metric, 0) for d in data[-24:]]
        
        # 绘制图表
        plt.plot(times, values, 'b-', linewidth=2, label=metric.replace('_', ' ').title())
        plt.fill_between(times, values, alpha=0.3)
        
        # 添加阈值线
        plt.axhline(y=80, color='r', linestyle='--', label='Critical Threshold')
        plt.axhline(y=60, color='orange', linestyle='--', label='Warning Threshold')
        
        # 设置样式
        plt.title(f'{metric.replace("_", " ").title()} - Last 24 Hours', fontsize=16)
        plt.xlabel('Time', fontsize=12)
        plt.ylabel('Usage (%)', fontsize=12)
        plt.legend(loc='upper right')
        plt.grid(True, alpha=0.3)
        plt.xticks(rotation=45)
        plt.tight_layout()
        
        # 保存到内存
        buf = io.BytesIO()
        plt.savefig(buf, format='png', dpi=150)
        buf.seek(0)
        plt.close()
        
        return buf.getvalue()

class OperationalReportEmail:
    """运维报告邮件生成器"""
    
    def create_backup_report(self, backup_results: List[Dict[str, Any]]) -> str:
        """创建备份报告邮件"""
        success_count = sum(1 for r in backup_results if r['status'] == 'success')
        failed_count = len(backup_results) - success_count
        
        html = f"""
        <html>
        <head>
            <style>
                body {
           { font-family: Arial, sans-serif; }}
                .backup-header {
           {
                    background-color: {'#28a745' if failed_count == 0 else '#dc3545'};
                    color: white;
                    padding: 20px;
                    border-radius: 5px;
                }}
                .backup-table {
           {
                    width: 100%;
                    border-collapse: collapse;
                    margin-top: 20px;
                }}
                .backup-table th, .backup-table td {
           {
                    border: 1px solid #ddd;
                    padding: 12px;
                    text-align: left;
                }}
                .backup-table th {
           {
                    background-color: #f2f2f2;
                }}
                .success {
           { color: #28a745; }}
                .failed {
           { color: #dc3545; }}
            </style>
        </head>
        <body>
            <div class="backup-header">
                <h1>备份任务执行报告</h1>
                <p>执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
                <p>成功: {success_count} | 失败: {failed_count}</p>
            </div>
            
            <table class="backup-table">
                <thead>
                    <tr>
                        <th>备份任务</th>
                        <th>源路径</th>
                        <th>目标路径</th>
                        <th>大小</th>
                        <th>耗时</th>
                        <th>状态</th>
                        <th>备注</th>
                    </tr>
                </thead>
                <tbody>
        """
        
        for result in backup_results:
            status_class = 'success' if result['status'] == 'success' else 'failed'
            html += f"""
                    <tr>
                        <td>{result.get('task_name', 'N/A')}</td>
                        <td>{result.get('source', 'N/A')}</td>
                        <td>{result.get('destination', 'N/A')}</td>
                        <td>{result.get('size', 'N/A')}</td>
                        <td>{result.get('duration', 'N/A')}</td>
                        <td class="{status_class}">{result.get('status', 'N/A').upper()}</td>
                        <td>{result.get('message', '')}</td>
                    </tr>
            """
        
        html += """
                </tbody>
            </table>
            
            <div>
                <h3>备份策略提醒</h3>
                <ul>
                    <li>请定期检查备份完整性</li>
                    <li>建议每月进行一次备份恢复演练</li>
                    <li>重要数据建议采用3-2-1备份策略</li>
                </ul>
            </div>
        </body>
        </html>
        """
        
        return html
    
    def create_security_alert_email(self, security_events: List[Dict[str, Any]]) -> str:
        """创建安全告警邮件"""
        critical_events = [e for e in security_events if e.get('severity') == 'critical']
        
        html = """
        <html>
        <head>
            <style>
                body { font-family: Arial, sans-serif; }
                .security-alert {
                    background-color: #fff3cd;
                    border: 1px solid #ffeaa7;
                    color: #856404;
                    padding: 20px;
                    border-radius: 5px;
                    margin-bottom: 20px;
                }
                .critical {
                    background-color: #f8d7da;
                    border-color: #f5c6cb;
                    color: #721c24;
                }
                .event-details {
                    background-color: white;
                    padding: 15px;
                    margin: 10px 0;
                    border-left: 4px solid #dc3545;
                    box-shadow: 0 2px 4px rgba(0,0,0,0.1);
                }
                .recommendations {
                    background-color: #d1ecf1;
                    border: 1px solid #bee5eb;
                    color: #0c5460;
                    padding: 15px;
                    border-radius: 5px;
                    margin-top: 20px;
                }
            </style>
        </head>
        <body>
        """
        
        if critical_events:
            html += """
            <div class="security-alert critical">
                <h1>🚨 紧急安全告警</h1>
                <p>检测到关键安全事件,请立即处理!</p>
            </div>
            """
        else:
            html += """
            <div class="security-alert">
                <h1>⚠️ 安全事件通知</h1>
                <p>检测到安全相关事件,请及时已关注。</p>
            </div>
            """
        
        # 添加事件详情
        for event in security_events[:10]:
            severity_emoji = {
                'critical': '🔴',
                'high': '🟠',
                'medium': '🟡',
                'low': '🟢'
            }.get(event.get('severity', 'low'), '⚪')
            
            html += f"""
            <div class="event-details">
                <h3>{severity_emoji} {event.get('event_type', '未知事件')}</h3>
                <p><strong>时间:</strong> {event.get('timestamp', 'N/A')}</p>
                <p><strong>来源:</strong> {event.get('source', 'N/A')}</p>
                <p><strong>描述:</strong> {event.get('description', '无描述')}</p>
                <p><strong>影响:</strong> {event.get('impact', '未评估')}</p>
            </div>
            """
        
        # 添加建议措施
        html += """
            <div class="recommendations">
                <h3>建议采取的措施</h3>
                <ol>
                    <li>立即检查受影响的系统和服务</li>
                    <li>收集相关日志进行详细分析</li>
                    <li>评估潜在的安全风险</li>
                    <li>根据事件严重程度制定响应计划</li>
                    <li>更新安全策略和防护措施</li>
                </ol>
            </div>
        </body>
        </html>
        """
        
        return html

# 综合邮件发送示例
class ComprehensiveEmailSystem:
    """综合邮件系统"""
    
    def __init__(self, smtp_config: Dict[str, Any]):
        self.email_sender = PersonalizedEmailSender(**smtp_config)
        self.monitoring_generator = MonitoringEmailGenerator()
        self.report_generator = OperationalReportEmail()
    
    def send_daily_monitoring_report(self, recipients: List[str]):
        """发送每日监控报告"""
        # 模拟监控数据
        monitoring_data = {
            'overall_status': 'warning',
            'start_time': (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d 00:00'),
            'end_time': datetime.now().strftime('%Y-%m-%d %H:%M'),
            'summary': {
                'total_servers': 25,
                'healthy_servers': 22,
                'alert_count': 3,
                'avg_load': 2.45
            },
            'alerts': [
                {
                    'severity': 'warning',
                    'title': 'CPU使用率告警',
                    'server': 'web-server-03',
                    'time': '14:30:25',
                    'description': 'CPU使用率达到85%,持续10分钟'
                },
                {
                    'severity': 'critical',
                    'title': '磁盘空间不足',
                    'server': 'db-server-01',
                    'time': '15:45:12',
                    'description': '/data分区剩余空间仅5%'
                }
            ],
            'servers': [
                {'name': 'web-server-01', 'cpu_usage': 45, 'memory_usage': 62, 
                 'disk_usage': 38, 'status': 'healthy'},
                {'name': 'web-server-02', 'cpu_usage': 52, 'memory_usage': 71, 
                 'disk_usage': 45, 'status': 'healthy'},
                {'name': 'web-server-03', 'cpu_usage': 85, 'memory_usage': 78, 
                 'disk_usage': 55, 'status': 'warning'},
                {'name': 'db-server-01', 'cpu_usage': 68, 'memory_usage': 82, 
                 'disk_usage': 95, 'status': 'critical'},
                {'name': 'app-server-01', 'cpu_usage': 35, 'memory_usage': 48, 
                 'disk_usage': 42, 'status': 'healthy'}
            ]
        }
        
        # 生成邮件内容
        html_content = self.monitoring_generator.create_system_monitoring_email(monitoring_data)
        
        # 生成图表
        chart_data = [{'cpu_usage': 40 + i * 2} for i in range(24)]
        chart_image = self.monitoring_generator.create_chart_image(chart_data)
        
        # 发送邮件
        for recipient in recipients:
            self.email_sender.send_html_email(
                recipient,
                f"系统监控日报 - {datetime.now().strftime('%Y-%m-%d')}",
                html_content
            )
    
    def send_backup_report(self, recipients: List[str]):
        """发送备份报告"""
        # 模拟备份结果
        backup_results = [
            {
                'task_name': '数据库备份',
                'source': '/var/lib/mysql',
                'destination': '/backup/mysql/2024-01-01',
                'size': '15.3GB',
                'duration': '25分钟',
                'status': 'success',
                'message': '备份完成'
            },
            {
                'task_name': '应用文件备份',
                'source': '/opt/application',
                'destination': '/backup/app/2024-01-01',
                'size': '3.2GB',
                'duration': '8分钟',
                'status': 'success',
                'message': '备份完成'
            },
            {
                'task_name': '日志文件备份',
                'source': '/var/log',
                'destination': '/backup/logs/2024-01-01',
                'size': '850MB',
                'duration': '3分钟',
                'status': 'failed',
                'message': '磁盘空间不足'
            }
        ]
        
        html_content = self.report_generator.create_backup_report(backup_results)
        
        for recipient in recipients:
            self.email_sender.send_html_email(
                recipient,
                f"备份任务报告 - {datetime.now().strftime('%Y-%m-%d')}",
                html_content
            )

# 使用示例
def demo_comprehensive_email():
    """演示综合邮件系统"""
    # 配置SMTP
    smtp_config = {
        'smtp_server': 'smtp.example.com',
        'smtp_port': 587,
        'username': 'monitor@example.com',
        'password': 'password',
        'use_tls': True,
        'sender_name': '运维监控系统'
    }
    
    # 创建邮件系统
    email_system = ComprehensiveEmailSystem(smtp_config)
    
    # 发送每日监控报告
    email_system.send_daily_monitoring_report(['admin@example.com', 'ops@example.com'])
    
    # 发送备份报告
    email_system.send_backup_report(['backup-admin@example.com'])

# demo_comprehensive_email()

2.4 探测Web服务质量方法

Web服务质量监控是确保在线服务稳定性和用户体验的关键环节。

2.4.1 模块常用方法说明

python

import requests
import time
from urllib.parse import urlparse
import socket
import ssl
import certifi
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
import concurrent.futures
import json

class WebServiceProbe:
    """Web服务探测基础类"""
    
    def __init__(self, timeout: int = 10, retry_count: int = 3):
        self.timeout = timeout
        self.retry_count = retry_count
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) WebServiceMonitor/1.0'
        })
    
    def check_http_status(self, url: str) -> Dict[str, any]:
        """检查HTTP状态"""
        result = {
            'url': url,
            'status': 'unknown',
            'status_code': None,
            'response_time': None,
            'error': None,
            'timestamp': datetime.now().isoformat()
        }
        
        try:
            start_time = time.time()
            response = self.session.get(url, timeout=self.timeout, 
                                      allow_redirects=True, verify=True)
            response_time = (time.time() - start_time) * 1000  # 毫秒
            
            result.update({
                'status': 'success',
                'status_code': response.status_code,
                'response_time': round(response_time, 2),
                'content_length': len(response.content),
                'headers': dict(response.headers)
            })
            
        except requests.exceptions.Timeout:
            result.update({
                'status': 'timeout',
                'error': f'请求超时 (>{self.timeout}s)'
            })
        except requests.exceptions.ConnectionError as e:
            result.update({
                'status': 'connection_error',
                'error': f'连接错误: {str(e)}'
            })
        except requests.exceptions.SSLError as e:
            result.update({
                'status': 'ssl_error',
                'error': f'SSL错误: {str(e)}'
            })
        except Exception as e:
            result.update({
                'status': 'error',
                'error': f'未知错误: {str(e)}'
            })
        
        return result
    
    def check_ssl_certificate(self, hostname: str, port: int = 443) -> Dict[str, any]:
        """检查SSL证书"""
        result = {
            'hostname': hostname,
            'port': port,
            'status': 'unknown',
            'valid': False,
            'days_remaining': None,
            'issuer': None,
            'error': None
        }
        
        try:
            # 创建SSL上下文
            context = ssl.create_default_context(cafile=certifi.where())
            
            # 连接并获取证书
            with socket.create_connection((hostname, port), timeout=self.timeout) as sock:
                with context.wrap_socket(sock, server_hostname=hostname) as ssock:
                    cert = ssock.getpeercert()
                    
                    # 解析证书信息
                    not_after = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
                    days_remaining = (not_after - datetime.now()).days
                    
                    result.update({
                        'status': 'success',
                        'valid': True,
                        'days_remaining': days_remaining,
                        'issuer': dict(cert['issuer']),
                        'subject': dict(cert['subject']),
                        'not_before': cert['notBefore'],
                        'not_after': cert['notAfter'],
                        'version': cert['version']
                    })
                    
                    # 检查证书是否即将过期
                    if days_remaining < 30:
                        result['warning'] = f'证书将在{days_remaining}天后过期'
                    
        except socket.timeout:
            result.update({
                'status': 'timeout',
                'error': '连接超时'
            })
        except ssl.SSLError as e:
            result.update({
                'status': 'ssl_error',
                'error': f'SSL错误: {str(e)}'
            })
        except Exception as e:
            result.update({
                'status': 'error',
                'error': f'检查失败: {str(e)}'
            })
        
        return result
    
    def measure_dns_resolution(self, hostname: str) -> Dict[str, any]:
        """测量DNS解析时间"""
        result = {
            'hostname': hostname,
            'resolution_time': None,
            'ip_addresses': [],
            'status': 'unknown',
            'error': None
        }
        
        try:
            start_time = time.time()
            ip_addresses = socket.gethostbyname_ex(hostname)[2]
            resolution_time = (time.time() - start_time) * 1000  # 毫秒
            
            result.update({
                'status': 'success',
                'resolution_time': round(resolution_time, 2),
                'ip_addresses': ip_addresses
            })
            
        except socket.gaierror as e:
            result.update({
                'status': 'dns_error',
                'error': f'DNS解析失败: {str(e)}'
            })
        except Exception as e:
            result.update({
                'status': 'error',
                'error': f'未知错误: {str(e)}'
            })
        
        return result
    
    def trace_route(self, hostname: str, max_hops: int = 30) -> List[Dict[str, any]]:
        """追踪路由(简化版)"""
        # 注意:完整的traceroute需要原始套接字权限
        # 这里提供一个简化版本,通过逐步增加TTL来模拟
        hops = []
        
        try:
            target_ip = socket.gethostbyname(hostname)
            
            for ttl in range(1, max_hops + 1):
                hop_result = {
                    'hop': ttl,
                    'ip': None,
                    'hostname': None,
                    'response_time': None,
                    'status': 'unknown'
                }
                
                try:
                    # 这里仅作示例,实际traceroute需要更复杂的实现
                    # 可以使用第三方库如 python-traceroute
                    pass
                    
                except Exception:
                    pass
                
                hops.append(hop_result)
                
        except Exception as e:
            return [{'error': str(e)}]
        
        return hops

class AdvancedWebMonitor(WebServiceProbe):
    """高级Web监控类"""
    
    def __init__(self, timeout: int = 10, retry_count: int = 3):
        super().__init__(timeout, retry_count)
        self.performance_thresholds = {
            'response_time': 3000,  # 3秒
            'availability': 0.95,   # 95%
            'ssl_days': 30         # 30天
        }
    
    def comprehensive_check(self, url: str) -> Dict[str, any]:
        """综合检查"""
        parsed_url = urlparse(url)
        hostname = parsed_url.hostname
        
        result = {
            'url': url,
            'timestamp': datetime.now().isoformat(),
            'checks': {}
        }
        
        # HTTP检查
        http_result = self.check_http_status(url)
        result['checks']['http'] = http_result
        
        # DNS检查
        if hostname:
            dns_result = self.measure_dns_resolution(hostname)
            result['checks']['dns'] = dns_result
        
        # SSL检查(仅HTTPS)
        if parsed_url.scheme


        # SSL检查(仅HTTPS)
        if parsed_url.scheme == 'https' and hostname:
            ssl_result = self.check_ssl_certificate(hostname)
            result['checks']['ssl'] = ssl_result
        
        # 性能检查
        performance_result = self.check_performance(url)
        result['checks']['performance'] = performance_result
        
        # 综合评分
        result['overall_score'] = self._calculate_overall_score(result['checks'])
        result['overall_status'] = self._determine_overall_status(result['checks'])
        
        return result
    
    def check_performance(self, url: str, samples: int = 3) -> Dict[str, any]:
        """性能检查"""
        response_times = []
        successful_requests = 0
        
        for i in range(samples):
            result = self.check_http_status(url)
            if result['status'] == 'success':
                successful_requests += 1
                if result['response_time']:
                    response_times.append(result['response_time'])
            
            if i < samples - 1:
                time.sleep(1)  # 避免过于频繁的请求
        
        availability = successful_requests / samples
        
        performance_result = {
            'samples': samples,
            'successful_requests': successful_requests,
            'availability': round(availability, 3),
            'response_times': response_times
        }
        
        if response_times:
            performance_result.update({
                'avg_response_time': round(sum(response_times) / len(response_times), 2),
                'min_response_time': round(min(response_times), 2),
                'max_response_time': round(max(response_times), 2)
            })
        
        return performance_result
    
    def _calculate_overall_score(self, checks: Dict[str, any]) -> float:
        """计算综合评分"""
        score = 100.0
        
        # HTTP检查评分
        if 'http' in checks:
            http = checks['http']
            if http['status'] != 'success':
                score -= 50
            elif http['status_code'] >= 400:
                score -= 30
            elif http['response_time'] and http['response_time'] > self.performance_thresholds['response_time']:
                score -= 20
        
        # SSL检查评分
        if 'ssl' in checks:
            ssl_check = checks['ssl']
            if not ssl_check.get('valid', False):
                score -= 30
            elif ssl_check.get('days_remaining', 0) < self.performance_thresholds['ssl_days']:
                score -= 15
        
        # 性能评分
        if 'performance' in checks:
            perf = checks['performance']
            if perf.get('availability', 0) < self.performance_thresholds['availability']:
                score -= 25
        
        return max(0, score)
    
    def _determine_overall_status(self, checks: Dict[str, any]) -> str:
        """确定整体状态"""
        score = self._calculate_overall_score(checks)
        
        if score >= 90:
            return 'healthy'
        elif score >= 70:
            return 'warning'
        else:
            return 'critical'

class WebServiceMonitor:
    """Web服务监控器"""
    
    def __init__(self):
        self.monitor = AdvancedWebMonitor()
        self.history = {}
        self.alert_callbacks = []
    
    def add_alert_callback(self, callback):
        """添加告警回调"""
        self.alert_callbacks.append(callback)
    
    def monitor_urls(self, urls: List[str], interval: int = 300):
        """监控多个URL"""
        while True:
            results = self.batch_check(urls)
            
            for result in results:
                url = result['url']
                status = result['overall_status']
                
                # 记录历史
                if url not in self.history:
                    self.history[url] = []
                self.history[url].append(result)
                
                # 保留最近24小时的数据
                cutoff_time = datetime.now() - timedelta(hours=24)
                self.history[url] = [
                    r for r in self.history[url] 
                    if datetime.fromisoformat(r['timestamp']) > cutoff_time
                ]
                
                # 检查告警条件
                self._check_alerts(result)
            
            time.sleep(interval)
    
    def batch_check(self, urls: List[str], max_workers: int = 10) -> List[Dict[str, any]]:
        """批量检查URL"""
        results = []
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_url = {
                executor.submit(self.monitor.comprehensive_check, url): url 
                for url in urls
            }
            
            for future in concurrent.futures.as_completed(future_to_url):
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    url = future_to_url[future]
                    results.append({
                        'url': url,
                        'error': str(e),
                        'overall_status': 'error'
                    })
        
        return results
    
    def _check_alerts(self, result: Dict[str, any]):
        """检查并触发告警"""
        alerts = []
        
        # HTTP状态告警
        http_check = result.get('checks', {}).get('http', {})
        if http_check.get('status') != 'success':
            alerts.append({
                'type': 'http_error',
                'url': result['url'],
                'message': f"HTTP请求失败: {http_check.get('error', 'Unknown error')}",
                'severity': 'critical'
            })
        
        # SSL证书告警
        ssl_check = result.get('checks', {}).get('ssl', {})
        if ssl_check.get('days_remaining') is not None and ssl_check['days_remaining'] < 30:
            alerts.append({
                'type': 'ssl_expiry',
                'url': result['url'],
                'message': f"SSL证书将在{ssl_check['days_remaining']}天后过期",
                'severity': 'warning' if ssl_check['days_remaining'] > 7 else 'critical'
            })
        
        # 性能告警
        perf_check = result.get('checks', {}).get('performance', {})
        if perf_check.get('avg_response_time', 0) > 3000:
            alerts.append({
                'type': 'slow_response',
                'url': result['url'],
                'message': f"响应时间过慢: {perf_check['avg_response_time']}ms",
                'severity': 'warning'
            })
        
        # 触发告警
        for alert in alerts:
            for callback in self.alert_callbacks:
                try:
                    callback(alert)
                except Exception as e:
                    print(f"告警回调执行失败: {e}")
    
    def get_statistics(self, url: str) -> Dict[str, any]:
        """获取URL的统计信息"""
        if url not in self.history:
            return {}
        
        history = self.history[url]
        if not history:
            return {}
        
        # 计算统计信息
        total_checks = len(history)
        successful_checks = sum(1 for h in history if h.get('overall_status') != 'critical')
        
        response_times = []
        for h in history:
            http_check = h.get('checks', {}).get('http', {})
            if http_check.get('response_time'):
                response_times.append(http_check['response_time'])
        
        stats = {
            'url': url,
            'total_checks': total_checks,
            'successful_checks': successful_checks,
            'availability': round(successful_checks / total_checks * 100, 2),
            'last_check': history[-1]['timestamp']
        }
        
        if response_times:
            stats.update({
                'avg_response_time': round(sum(response_times) / len(response_times), 2),
                'min_response_time': round(min(response_times), 2),
                'max_response_time': round(max(response_times), 2)
            })
        
        return stats

2.4.2 实践:实现探测Web服务质量

python

import requests
import time
import json
import sqlite3
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import threading
import queue
from urllib.parse import urlparse
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from io import BytesIO
import base64

class WebQualityProbe:
    """Web服务质量探测实现"""
    
    def __init__(self, db_path: str = 'web_monitor.db'):
        self.db_path = db_path
        self.init_database()
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'WebQualityProbe/1.0'
        })
    
    def init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 创建监控结果表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS monitoring_results (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                url TEXT NOT NULL,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                status_code INTEGER,
                response_time REAL,
                content_size INTEGER,
                dns_time REAL,
                connect_time REAL,
                ssl_time REAL,
                first_byte_time REAL,
                total_time REAL,
                error_message TEXT,
                status TEXT
            )
        ''')
        
        # 创建告警记录表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS alerts (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                url TEXT NOT NULL,
                alert_type TEXT NOT NULL,
                severity TEXT NOT NULL,
                message TEXT,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                resolved BOOLEAN DEFAULT 0
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def probe_url_detailed(self, url: str) -> Dict[str, any]:
        """详细探测URL"""
        result = {
            'url': url,
            'timestamp': datetime.now(),
            'status': 'unknown',
            'timing': {}
        }
        
        try:
            # 使用requests的钩子函数获取详细时间信息
            timing_info = {}
            
            def record_timing(r, *args, **kwargs):
                timing_info['total_time'] = r.elapsed.total_seconds() * 1000
            
            # 发送请求
            start_time = time.time()
            response = self.session.get(
                url, 
                timeout=30,
                allow_redirects=True,
                stream=True,
                hooks={'response': record_timing}
            )
            
            # 记录详细时间(简化版本)
            result['timing'] = {
                'dns_lookup': 0,  # 需要更底层的实现来获取
                'tcp_connect': 0,  # 需要更底层的实现来获取
                'ssl_handshake': 0,  # 需要更底层的实现来获取
                'server_processing': timing_info.get('total_time', 0) * 0.7,  # 估算
                'content_transfer': timing_info.get('total_time', 0) * 0.3,  # 估算
                'total': timing_info.get('total_time', 0)
            }
            
            # 获取响应信息
            content = response.content
            
            result.update({
                'status': 'success',
                'status_code': response.status_code,
                'response_time': timing_info.get('total_time', 0),
                'content_size': len(content),
                'headers': dict(response.headers),
                'redirect_count': len(response.history),
                'final_url': response.url
            })
            
            # 检查内容
            if response.headers.get('content-type', '').startswith('text/html'):
                result['content_analysis'] = self._analyze_html_content(content)
            
        except requests.exceptions.Timeout:
            result.update({
                'status': 'timeout',
                'error_message': '请求超时'
            })
        except requests.exceptions.ConnectionError as e:
            result.update({
                'status': 'connection_error',
                'error_message': f'连接错误: {str(e)}'
            })
        except Exception as e:
            result.update({
                'status': 'error',
                'error_message': f'未知错误: {str(e)}'
            })
        
        # 保存到数据库
        self._save_result(result)
        
        return result
    
    def _analyze_html_content(self, content: bytes) -> Dict[str, any]:
        """分析HTML内容"""
        try:
            text = content.decode('utf-8', errors='ignore')
            
            # 简单的内容分析
            analysis = {
                'size_kb': len(content) / 1024,
                'has_title': '<title>' in text.lower(),
                'has_errors': any(error in text.lower() for error in ['error', 'exception', '错误']),
                'external_resources': text.count('http://') + text.count('https://'),
                'inline_scripts': text.count('<script'),
                'inline_styles': text.count('<style')
            }
            
            return analysis
        except:
            return {}
    
    def _save_result(self, result: Dict[str, any]):
        """保存监控结果到数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO monitoring_results 
            (url, timestamp, status_code, response_time, content_size, 
             total_time, error_message, status)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        ''', (
            result['url'],
            result['timestamp'],
            result.get('status_code'),
            result.get('response_time'),
            result.get('content_size'),
            result.get('timing', {}).get('total'),
            result.get('error_message'),
            result['status']
        ))
        
        conn.commit()
        conn.close()
    
    def get_statistics(self, url: str, hours: int = 24) -> Dict[str, any]:
        """获取统计信息"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        since = datetime.now() - timedelta(hours=hours)
        
        # 获取统计数据
        cursor.execute('''
            SELECT 
                COUNT(*) as total_checks,
                SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_checks,
                AVG(CASE WHEN status = 'success' THEN response_time ELSE NULL END) as avg_response_time,
                MIN(CASE WHEN status = 'success' THEN response_time ELSE NULL END) as min_response_time,
                MAX(CASE WHEN status = 'success' THEN response_time ELSE NULL END) as max_response_time,
                AVG(CASE WHEN status = 'success' THEN content_size ELSE NULL END) as avg_content_size
            FROM monitoring_results
            WHERE url = ? AND timestamp > ?
        ''', (url, since))
        
        row = cursor.fetchone()
        
        if row[0] == 0:
            return {'error': '没有数据'}
        
        stats = {
            'url': url,
            'period_hours': hours,
            'total_checks': row[0],
            'successful_checks': row[1] or 0,
            'availability': round((row[1] or 0) / row[0] * 100, 2),
            'avg_response_time': round(row[2] or 0, 2),
            'min_response_time': round(row[3] or 0, 2),
            'max_response_time': round(row[4] or 0, 2),
            'avg_content_size_kb': round((row[5] or 0) / 1024, 2)
        }
        
        # 获取错误分布
        cursor.execute('''
            SELECT status, COUNT(*) as count
            FROM monitoring_results
            WHERE url = ? AND timestamp > ? AND status != 'success'
            GROUP BY status
        ''', (url, since))
        
        error_distribution = {}
        for row in cursor.fetchall():
            error_distribution[row[0]] = row[1]
        
        stats['error_distribution'] = error_distribution
        
        conn.close()
        
        return stats

class WebQualityMonitor:
    """Web质量监控系统"""
    
    def __init__(self, probe: WebQualityProbe):
        self.probe = probe
        self.monitoring_queue = queue.Queue()
        self.monitoring_configs = {}
        self.running = False
        self.threads = []
    
    def add_monitoring_target(self, url: str, interval: int = 300,
                            alert_rules: Dict[str, any] = None):
        """添加监控目标"""
        self.monitoring_configs[url] = {
            'url': url,
            'interval': interval,
            'alert_rules': alert_rules or self._default_alert_rules(),
            'last_check': None
        }
    
    def _default_alert_rules(self) -> Dict[str, any]:
        """默认告警规则"""
        return {
            'response_time_threshold': 3000,  # 3秒
            'availability_threshold': 0.95,   # 95%
            'consecutive_failures': 3,        # 连续失败次数
            'content_change_detection': True  # 内容变化检测
        }
    
    def start_monitoring(self):
        """开始监控"""
        self.running = True
        
        # 启动监控线程
        monitor_thread = threading.Thread(target=self._monitoring_loop)
        monitor_thread.start()
        self.threads.append(monitor_thread)
        
        # 启动告警检查线程
        alert_thread = threading.Thread(target=self._alert_check_loop)
        alert_thread.start()
        self.threads.append(alert_thread)
        
        print("Web质量监控已启动")
    
    def stop_monitoring(self):
        """停止监控"""
        self.running = False
        for thread in self.threads:
            thread.join()
        print("Web质量监控已停止")
    
    def _monitoring_loop(self):
        """监控循环"""
        while self.running:
            current_time = time.time()
            
            for url, config in self.monitoring_configs.items():
                last_check = config.get('last_check', 0)
                
                if current_time - last_check >= config['interval']:
                    # 执行探测
                    result = self.probe.probe_url_detailed(url)
                    config['last_check'] = current_time
                    
                    # 放入队列供告警检查
                    self.monitoring_queue.put(result)
            
            time.sleep(1)
    
    def _alert_check_loop(self):
        """告警检查循环"""
        consecutive_failures = {}
        
        while self.running:
            try:
                result = self.monitoring_queue.get(timeout=1)
                url = result['url']
                config = self.monitoring_configs.get(url, {})
                rules = config.get('alert_rules', {})
                
                # 检查响应时间
                if (result['status'] == 'success' and 
                    result.get('response_time', 0) > rules['response_time_threshold']):
                    self._create_alert(url, 'slow_response', 'warning',
                                     f"响应时间过慢: {result['response_time']}ms")
                
                # 检查失败
                if result['status'] != 'success':
                    consecutive_failures[url] = consecutive_failures.get(url, 0) + 1
                    
                    if consecutive_failures[url] >= rules['consecutive_failures']:
                        self._create_alert(url, 'service_down', 'critical',
                                         f"服务连续失败{consecutive_failures[url]}次")
                else:
                    consecutive_failures[url] = 0
                
                # 检查可用性
                stats = self.probe.get_statistics(url, hours=1)
                if stats.get('availability', 100) < rules['availability_threshold'] * 100:
                    self._create_alert(url, 'low_availability', 'warning',
                                     f"可用性低于阈值: {stats['availability']}%")
                
            except queue.Empty:
                continue
            except Exception as e:
                print(f"告警检查错误: {e}")
    
    def _create_alert(self, url: str, alert_type: str, severity: str, message: str):
        """创建告警"""
        conn = sqlite3.connect(self.probe.db_path)
        cursor = conn.cursor()
        
        # 检查是否已有未解决的同类告警
        cursor.execute('''
            SELECT id FROM alerts
            WHERE url = ? AND alert_type = ? AND resolved = 0
            ORDER BY timestamp DESC LIMIT 1
        ''', (url, alert_type))
        
        existing = cursor.fetchone()
        
        if not existing:
            cursor.execute('''
                INSERT INTO alerts (url, alert_type, severity, message)
                VALUES (?, ?, ?, ?)
            ''', (url, alert_type, severity, message))
            
            print(f"[{severity.upper()}] {url}: {message}")
        
        conn.commit()
        conn.close()
    
    def generate_report(self, url: str, hours: int = 24) -> str:
        """生成监控报告"""
        stats = self.probe.get_statistics(url, hours)
        
        # 获取历史数据用于绘图
        conn = sqlite3.connect(self.probe.db_path)
        cursor = conn.cursor()
        
        since = datetime.now() - timedelta(hours=hours)
        cursor.execute('''
            SELECT timestamp, response_time, status
            FROM monitoring_results
            WHERE url = ? AND timestamp > ?
            ORDER BY timestamp
        ''', (url, since))
        
        data = cursor.fetchall()
        conn.close()
        
        # 生成图表
        if data:
            timestamps = [datetime.fromisoformat(row[0]) for row in data]
            response_times = [row[1] if row[2] == 'success' else None for row in data]
            
            plt.figure(figsize=(12, 6))
            
            # 响应时间图
            plt.subplot(2, 1, 1)
            plt.plot(timestamps, response_times, 'b-', alpha=0.7)
            plt.scatter([t for t, r in zip(timestamps, response_times) if r is None],
                       [0 for r in response_times if r is None], 
                       color='red', marker='x', s=50, label='失败')
            plt.ylabel('响应时间 (ms)')
            plt.title(f'{url} - 监控报告 ({hours}小时)')
            plt.legend()
            plt.grid(True, alpha=0.3)
            
            # 可用性图
            plt.subplot(2, 1, 2)
            # 计算每小时的可用性
            hourly_availability = []
            hourly_labels = []
            
            for i in range(hours):
                hour_start = datetime.now() - timedelta(hours=hours-i)
                hour_end = hour_start + timedelta(hours=1)
                
                hour_data = [row for row in data 
                           if hour_start <= datetime.fromisoformat(row[0]) < hour_end]
                
                if hour_data:
                    success_count = sum(1 for row in hour_data if row[2] == 'success')
                    availability = success_count / len(hour_data) * 100
                    hourly_availability.append(availability)
                    hourly_labels.append(hour_start.strftime('%H:%M'))
            
            plt.bar(range(len(hourly_availability)), hourly_availability, alpha=0.7)
            plt.axhline(y=95, color='r', linestyle='--', label='95%阈值')
            plt.ylabel('可用性 (%)')
            plt.xlabel('时间')
            plt.xticks(range(len(hourly_labels)), hourly_labels, rotation=45)
            plt.legend()
            plt.grid(True, alpha=0.3)
            
            plt.tight_layout()
            
            # 保存图表
            buf = BytesIO()
            plt.savefig(buf, format='png', dpi=100)
            buf.seek(0)
            chart_base64 = base64.b64encode(buf.getvalue()).decode()
            plt.close()
        else:
            chart_base64 = None
        
        # 生成HTML报告
        html_report = f"""
        <html>
        <head>
            <title>Web服务质量报告 - {url}</title>
            <style>
                body {
           { font-family: Arial, sans-serif; margin: 20px; }}
                .header {
           { background-color: #f0f0f0; padding: 20px; border-radius: 5px; }}
                .metric {
           { display: inline-block; margin: 10px 20px; }}
                .metric-value {
           { font-size: 24px; font-weight: bold; color: #333; }}
                .metric-label {
           { color: #666; }}
                .status-good {
           { color: #28a745; }}
                .status-warning {
           { color: #ffc107; }}
                .status-bad {
           { color: #dc3545; }}
                table {
           { border-collapse: collapse; width: 100%; margin-top: 20px; }}
                th, td {
           { border: 1px solid #ddd; padding: 8px; text-align: left; }}
                th {
           { background-color: #f2f2f2; }}
            </style>
        </head>
        <body>
            <div class="header">
                <h1>Web服务质量报告</h1>
                <p>URL: {url}</p>
                <p>监控周期: {hours}小时</p>
                <p>报告生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
            </div>
            
            <h2>关键指标</h2>
            <div>
                <div class="metric">
                    <div class="metric-label">可用性</div>
                    <div class="metric-value {self._get_status_class(stats['availability'], 95, 90)}">
                        {stats['availability']}%
                    </div>
                </div>
                <div class="metric">
                    <div class="metric-label">平均响应时间</div>
                    <div class="metric-value {self._get_status_class(stats['avg_response_time'], 1000, 3000, reverse=True)}">
                        {stats['avg_response_time']}ms
                    </div>
                </div>
                <div class="metric">
                    <div class="metric-label">检查次数</div>
                    <div class="metric-value">{stats['total_checks']}</div>
                </div>
            </div>
            
            <h2>详细统计</h2>
            <table>
                <tr>
                    <th>指标</th>
                    <th>数值</th>
                </tr>
                <tr>
                    <td>成功检查次数</td>
                    <td>{stats['successful_checks']}</td>
                </tr>
                <tr>
                    <td>最小响应时间</td>
                    <td>{stats['min_response_time']}ms</td>
                </tr>
                <tr>
                    <td>最大响应时间</td>
                    <td>{stats['max_response_time']}ms</td>
                </tr>
                <tr>
                    <td>平均内容大小</td>
                    <td>{stats['avg_content_size_kb']}KB</td>
                </tr>
            </table>
        """
        
        if stats.get('error_distribution'):
            html_report += """
            <h2>错误分布</h2>
            <table>
                <tr>
                    <th>错误类型</th>
                    <th>次数</th>
                </tr>
            """
            for error_type, count in stats['error_distribution'].items():
                html_report += f"""
                <tr>
                    <td>{error_type}</td>
                    <td>{count}</td>
                </tr>
                """
            html_report += "</table>"
        
        if chart_base64:
            html_report += f"""
            <h2>性能趋势</h2>
            <img src="data:image/png;base64,{chart_base64}">

总结

本章详细介绍了业务服务监控的四个核心方面:

文件内容差异对比

使用difflib模块进行文本和文件对比
生成多种格式的差异报告
专门的配置文件对比工具

文件与目录差异对比

使用filecmp模块进行文件和目录比较
实现备份验证和目录同步功能
增量备份验证

邮件发送

使用smtplib发送各类邮件
创建个性化的邮件模板
实现监控告警和报告邮件

Web服务质量监控

HTTP状态和性能监控
SSL证书检查
综合质量评估和趋势分析

这些工具和技术为构建完整的业务服务监控系统提供了坚实的基础,可以帮助运维人员及时发现和解决问题,确保服务的稳定运行。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容