007-系统批量运维管理器 Fabric 详解

系统批量运维管理器 Fabric 详解

Fabric是一个基于Python的命令行工具和库,用于简化系统管理和应用部署任务,特别是在多台服务器上执行命令和传输文件。与前几章介绍的pexpect和paramiko相比,Fabric提供了更高层次的抽象,使得批量运维任务的编写更加简洁高效。本章将详细介绍Fabric的安装、配置和使用方法,以及在实际运维场景中的应用示例。

7.1 Fabric 的安装

Fabric是一个Python库,因此可以通过pip包管理器轻松安装。本节将介绍不同环境下Fabric的安装方法和注意事项。

通过pip安装

最简单的安装方式是使用pip:

bash

# 安装Fabric 2.x版本(推荐)
pip install fabric

# 或者指定版本安装
pip install fabric==2.6.0

如果需要安装旧版本的Fabric 1.x,可以使用:

bash

pip install "fabric<2.0"

系统包管理器安装

在Linux系统中,也可以使用系统包管理器安装Fabric:

Debian/Ubuntu:

bash

sudo apt-get install fabric

CentOS/RHEL:

bash

sudo yum install fabric

注意: 系统包管理器安装的版本可能比较旧,建议使用pip安装最新版本。

依赖关系

Fabric 2.x依赖于以下Python库:

paramiko: SSH协议实现(前一章详细介绍过)
invoke: 命令执行库
patchwork: 文件和路径操作工具集

安装Fabric时,pip会自动安装这些依赖库。

验证安装

安装完成后,可以通过以下命令验证安装是否成功:

bash

# 查看版本
fab --version

# 或者在Python中导入验证
python -c "import fabric; print(fabric.__version__)"

Fabric 1.x 与 2.x 的区别

Fabric有两个主要版本系列:1.x和2.x,它们之间有显著差异。本章主要介绍Fabric 2.x版本,但也会提及两个版本的主要区别:

API变化:Fabric 2.x重新设计了API,更加面向对象,而Fabric 1.x更加函数化。
命令行工具:Fabric 2.x使用fab命令,但调用方式与1.x不同。
配置文件:2.x使用fabfile.py或任何Python模块,而1.x更严格地依赖fabfile.py
依赖库:2.x基于invoke和patchwork,而1.x是独立的实现。

如果你有使用Fabric 1.x的代码,需要做一些调整才能在Fabric 2.x中运行。

7.2 fab 的常用参数

Fabric提供了fab命令行工具,用于执行定义在fabfile中的任务。本节将介绍fab命令的常用参数和使用方法。

基本语法

bash

fab [options] task1 task2 ...

常用选项

以下是fab命令的常用选项:

选项 描述
-l, --list 列出可用的任务
-f, --file 指定fabfile路径,默认为当前目录下的fabfile.py
-H, --hosts 指定要连接的主机(逗号分隔)
-i, --identity 指定SSH密钥文件
-p, --password 指定SSH密码(不推荐,安全风险)
-u, --user 指定SSH用户名
-P, --parallel 并行执行任务
-S, --skip-bad-hosts 遇到连接错误时跳过该主机继续执行
-t, --timeout 设置命令执行超时时间
-T, --command-timeout 设置命令执行超时时间
-r, --reject-unknown-hosts 拒绝未知的SSH主机
-D, --disable-known-hosts 不加载用户的known_hosts文件
-k, --no-agent 不使用SSH代理
-V, --version 显示Fabric版本
-h, --help 显示帮助信息

示例用法

bash

# 列出可用任务
fab -l

# 执行特定任务
fab deploy

# 在指定主机上执行任务
fab -H web1.example.com,web2.example.com deploy

# 使用特定用户和密钥文件执行任务
fab -H web.example.com -u admin -i ~/.ssh/id_rsa deploy

# 并行执行任务
fab -H web1,web2,web3,web4 -P deploy

# 指定自定义fabfile
fab -f /path/to/my_fabfile.py deploy

传递参数给任务

在Fabric 2.x中,可以通过以下方式将参数传递给任务:

bash

# 传递位置参数
fab task_name arg1 arg2

# 传递关键字参数
fab task_name:key1=value1,key2=value2

# 组合使用
fab task_name:arg1,key1=value1

环境变量

Fabric也支持通过环境变量配置:

bash

# 设置SSH用户名
export FABRIC_USER=admin

# 设置SSH密码(不推荐)
export FABRIC_PASSWORD=secret

# 设置默认主机
export FABRIC_HOSTS=web1.example.com,web2.example.com

7.3 fabfile 的编写

fabfile是Fabric的核心,包含了要执行的任务定义。本节将详细介绍如何编写有效的fabfile,包括配置、API使用和常见模式。

7.3.1 全局属性设定

在Fabric 2.x中,可以通过Connection对象和配置文件设置全局属性。

基本结构

一个典型的fabfile.py结构如下:

python

from fabric import Connection, Config
from fabric import task
from invoke import task as invoke_task
import os

# 全局配置
config = Config(
    overrides={
        'connect_kwargs': {
            'key_filename': ['/path/to/ssh/key'],
        },
    }
)

# 定义任务
@task
def hello(c):
    """打印Hello World"""
    print("Hello World!")

@task
def deploy(c):
    """部署应用"""
    c.run('git pull')
    c.run('pip install -r requirements.txt')
    c.run('service myapp restart')
配置选项

以下是常用的全局配置选项:

SSH连接配置

python

config = Config(
    overrides={
        'user': 'admin',  # SSH用户名
        'port': 22,       # SSH端口
        'connect_kwargs': {
            'key_filename': ['/path/to/ssh/key'],   # SSH密钥
            'password': 'password',                 # SSH密码(不推荐)
            'timeout': 10,                          # 连接超时时间
            'allow_agent': True,                    # 允许使用SSH代理
            'look_for_keys': True,                  # 查找SSH密钥
        },
    }
)

运行时配置

python

config = Config(
    overrides={
        'run': {
            'echo': True,           # 回显执行的命令
            'pty': True,            # 分配伪终端
            'warn': True,           # 命令失败时发出警告而不是抛出异常
            'hide': False,          # 隐藏命令输出
            'timeout': 30,          # 命令执行超时时间
            'shell': '/bin/bash',   # 指定shell
        },
    }
)

文件传输配置

python

config = Config(
    overrides={
        'transfer': {
            'preserve_mode': True,     # 保留文件权限
            'preserve_times': True,    # 保留文件时间戳
        },
    }
)
主机组管理

通过定义主机组,可以更方便地管理多台服务器:

python

# 定义主机组
web_servers = ['web1.example.com', 'web2.example.com', 'web3.example.com']
db_servers = ['db1.example.com', 'db2.example.com']
all_servers = web_servers + db_servers

# 使用主机组执行任务
@task
def deploy_web(c):
    for host in web_servers:
        conn = Connection(host, config=config)
        conn.run('git pull')
        conn.run('service nginx restart')

@task
def deploy_all(c):
    for host in all_servers:
        conn = Connection(host, config=config)
        conn.run('apt-get update')
        conn.run('apt-get upgrade -y')

7.3.2 常用 API

Fabric 2.x提供了丰富的API,以下是最常用的几个核心API。

Connection 类

Connection是Fabric 2.x的核心类,代表与单个主机的连接。

python

from fabric import Connection

# 创建连接
c = Connection('web.example.com', user='admin', port=22)

# 使用配置创建连接
c = Connection('web.example.com', config=config)

# 执行命令
result = c.run('ls -la')
print(result.stdout)

# 使用sudo执行命令
result = c.sudo('apt-get update')

# 上传文件
c.put('local_file.txt', '/remote/path/file.txt')

# 下载文件
c.get('/remote/path/file.txt', 'local_file.txt')
Group 类

Group类用于管理多台主机,实现并行操作:

python

from fabric import Group

# 创建主机组
servers = Group('web1.example.com', 'web2.example.com', 'web3.example.com')

# 在所有主机上执行命令
results = servers.run('uptime')
for connection, result in results.items():
    print(f"{connection}: {result.stdout}")

# 在所有主机上执行sudo命令
servers.sudo('service nginx restart')

# 上传文件到所有主机
servers.put('local_file.txt', '/remote/path/')
run() 和 sudo() 方法

run()sudo()是最常用的命令执行方法:

python

# 基本用法
result = c.run('ls -la')

# 设置工作目录
result = c.run('ls -la', cwd='/var/www')

# 处理命令失败
try:
    c.run('command_that_might_fail', warn=False)
except UnexpectedExit:
    print("命令执行失败")

# 捕获输出但不显示
result = c.run('grep "secret" /etc/config', hide=True)
print(f"命令输出: {result.stdout}")

# 以root身份执行命令
c.sudo('apt-get update')

# 以特定用户身份执行
c.sudo('restart app', user='appuser')

# 提供sudo密码
c.sudo('apt-get upgrade -y', password='sudopassword')
put() 和 get() 方法

用于文件传输的方法:

python

# 上传文件
c.put('local_file.txt', '/remote/path/file.txt')

# 上传目录
c.put('local_dir/', '/remote/path/')

# 保留文件权限
c.put('script.sh', '/remote/script.sh', preserve_mode=True)

# 下载文件
c.get('/remote/path/file.txt', 'local_file.txt')

# 下载目录
c.get('/remote/dir/', 'local_dir/')
local() 方法

执行本地命令:

python

from invoke import run as local

# 执行本地命令
result = local('ls -la')
print(result.stdout)

# 捕获输出
git_hash = local('git rev-parse HEAD', hide=True).stdout.strip()
print(f"当前Git提交: {git_hash}")

7.3.3 示例1: 查看本地与远程主机信息

下面是一个实际的fabfile示例,用于收集本地和远程主机的系统信息:

python

from fabric import Connection, Config, task
from invoke import run as local
import os
import platform
import json
from datetime import datetime

# 全局配置
config = Config(
    overrides={
        'run': {
            'echo': True,
            'pty': True,
        },
    }
)

@task
def system_info(c):
    """收集本地系统信息"""
    print("正在收集本地系统信息...")
    
    info = {}
    
    # 基本系统信息
    info['hostname'] = platform.node()
    info['platform'] = platform.platform()
    info['system'] = platform.system()
    info['processor'] = platform.processor()
    info['python_version'] = platform.python_version()
    
    # 详细系统信息
    if platform.system() == 'Linux':
        # 内核版本
        info['kernel'] = local('uname -r', hide=True).stdout.strip()
        
        # CPU信息
        cpu_info = local('cat /proc/cpuinfo | grep "model name" | head -1', hide=True).stdout
        if cpu_info:
            info['cpu_model'] = cpu_info.split(':')[1].strip()
        
        # 内存信息
        mem_info = local('free -h', hide=True).stdout
        info['memory'] = mem_info
        
        # 磁盘使用情况
        disk_info = local('df -h', hide=True).stdout
        info['disk_usage'] = disk_info
        
        # 网络接口
        net_info = local('ip -br addr show', hide=True).stdout
        info['network'] = net_info
        
        # 已安装包数量
        if os.path.exists('/usr/bin/apt'):
            pkg_count = local('dpkg -l | grep "^ii" | wc -l', hide=True).stdout.strip()
            info['package_count'] = pkg_count
            info['package_manager'] = 'apt'
        elif os.path.exists('/usr/bin/yum'):
            pkg_count = local('rpm -qa | wc -l', hide=True).stdout.strip()
            info['package_count'] = pkg_count
            info['package_manager'] = 'yum'
    
    # 保存信息到文件
    with open(f'local_system_info_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json', 'w') as f:
        json.dump(info, f, indent=2)
    
    # 打印信息摘要
    print("
系统信息摘要:")
    print(f"主机名: {info['hostname']}")
    print(f"平台: {info['platform']}")
    print(f"处理器: {info['processor']}")
    print(f"Python版本: {info['python_version']}")
    
    return info

@task
def remote_system_info(c, hosts):
    """收集远程主机系统信息
    
    参数:
        hosts: 逗号分隔的主机列表
    """
    host_list = hosts.split(',')
    results = {}
    
    for host in host_list:
        print(f"
正在收集远程主机 {host} 的系统信息...")
        
        try:
            conn = Connection(host, config=config)
            info = {}
            
            # 基本系统信息
            info['hostname'] = conn.run('hostname', hide=True).stdout.strip()
            info['platform'] = conn.run('uname -a', hide=True).stdout.strip()
            
            # CPU信息
            cpu_info = conn.run('cat /proc/cpuinfo | grep "model name" | head -1', hide=True).stdout
            if cpu_info:
                info['cpu_model'] = cpu_info.split(':')[1].strip()
            
            # 内核版本
            info['kernel'] = conn.run('uname -r', hide=True).stdout.strip()
            
            # 操作系统信息
            if conn.run('test -f /etc/os-release', warn=True, hide=True).ok:
                os_info = conn.run('cat /etc/os-release | grep "PRETTY_NAME"', hide=True).stdout
                if os_info:
                    info['os'] = os_info.split('=')[1].strip('"'')
            
            # 内存信息
            mem_info = conn.run('free -h', hide=True).stdout
            info['memory'] = mem_info
            
            # 磁盘使用情况
            disk_info = conn.run('df -h', hide=True).stdout
            info['disk_usage'] = disk_info
            
            # 网络接口
            try:
                net_info = conn.run('ip -br addr show', hide=True).stdout
                info['network'] = net_info
            except:
                # 某些老系统可能没有ip命令
                net_info = conn.run('ifconfig', hide=True, warn=True).stdout
                info['network'] = net_info
            
            # 已安装包数量
            if conn.run('which dpkg', warn=True, hide=True).ok:
                pkg_count = conn.run('dpkg -l | grep "^ii" | wc -l', hide=True).stdout.strip()
                info['package_count'] = pkg_count
                info['package_manager'] = 'apt'
            elif conn.run('which rpm', warn=True, hide=True).ok:
                pkg_count = conn.run('rpm -qa | wc -l', hide=True).stdout.strip()
                info['package_count'] = pkg_count
                info['package_manager'] = 'yum'
            
            # 运行时间
            uptime = conn.run('uptime', hide=True).stdout.strip()
            info['uptime'] = uptime
            
            # 登录用户
            who = conn.run('who', hide=True).stdout.strip()
            info['logged_users'] = who
            
            # 最后10个登录用户
            last = conn.run('last -n 10', hide=True).stdout.strip()
            info['last_logins'] = last
            
            results[host] = info
            
            # 打印信息摘要
            print(f"
主机 {host} 信息摘要:")
            print(f"主机名: {info['hostname']}")
            print(f"平台: {info['platform']}")
            if 'os' in info:
                print(f"操作系统: {info['os']}")
            if 'cpu_model' in info:
                print(f"CPU: {info['cpu_model']}")
            print(f"内核: {info['kernel']}")
            print(f"运行时间: {info['uptime']}")
            
        except Exception as e:
            print(f"收集 {host} 信息时出错: {str(e)}")
            results[host] = {"error": str(e)}
    
    # 保存信息到文件
    with open(f'remote_system_info_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json', 'w') as f:
        json.dump(results, f, indent=2)
    
    return results

@task
def compare_hosts(c, hosts):
    """比较多台主机的系统配置
    
    参数:
        hosts: 逗号分隔的主机列表
    """
    # 收集远程主机信息
    host_info = remote_system_info(c, hosts)
    
    # 比较核心配置
    host_list = hosts.split(',')
    
    print("
========== 主机配置比较 ==========")
    
    # 比较内核版本
    print("
== 内核版本 ==")
    for host in host_list:
        if host in host_info and 'error' not in host_info[host]:
            print(f"{host}: {host_info[host].get('kernel', 'N/A')}")
    
    # 比较CPU
    print("
== CPU型号 ==")
    for host in host_list:
        if host in host_info and 'error' not in host_info[host]:
            print(f"{host}: {host_info[host].get('cpu_model', 'N/A')}")
    
    # 比较操作系统
    print("
== 操作系统 ==")
    for host in host_list:
        if host in host_info and 'error' not in host_info[host]:
            print(f"{host}: {host_info[host].get('os', 'N/A')}")
    
    # 比较包管理器和包数量
    print("
== 软件包 ==")
    for host in host_list:
        if host in host_info and 'error' not in host_info[host]:
            pkg_mgr = host_info[host].get('package_manager', 'unknown')
            pkg_count = host_info[host].get('package_count', 'N/A')
            print(f"{host}: {pkg_mgr} - {pkg_count} 个包")
    
    print("
================================")

使用方法:

bash

# 查看本地系统信息
fab system-info

# 查看远程主机信息
fab remote-system-info --hosts web1.example.com,web2.example.com

# 比较多台主机配置
fab compare-hosts --hosts web1.example.com,web2.example.com,db1.example.com

7.3.4 示例2: 动态获取远程目录列表

下面是一个示例,展示如何动态获取远程服务器的目录列表,并支持交互式选择:

python

from fabric import Connection, Config, task
import os
from pprint import pprint

# 全局配置
config = Config(
    overrides={
        'run': {
            'echo': False,  # 不回显命令
            'hide': True,   # 隐藏输出
        },
    }
)

@task
def list_directories(c, host, path="/var/www"):
    """列出远程主机指定路径下的目录
    
    参数:
        host: 远程主机地址
        path: 要列出目录的路径,默认为/var/www
    """
    print(f"正在连接到 {host}...")
    conn = Connection(host, config=config)
    
    print(f"列出 {host}:{path} 中的目录...")
    result = conn.run(f'find {path} -maxdepth 1 -type d | sort')
    
    # 解析目录列表
    directories = result.stdout.strip().split('
')
    
    # 过滤掉路径本身
    directories = [d for d in directories if d != path]
    
    if not directories:
        print(f"在 {path} 中没有找到目录")
        return []
    
    print(f"在 {path} 中找到 {len(directories)} 个目录:")
    for i, directory in enumerate(directories, 1):
        print(f"{i}. {directory}")
    
    return directories

@task
def explore_directories(c, host, path="/var/www"):
    """交互式浏览远程主机目录
    
    参数:
        host: 远程主机地址
        path: 起始路径,默认为/var/www
    """
    conn = Connection(host, config=config)
    current_path = path
    
    while True:
        print(f"
当前路径: {current_path}")
        
        # 获取当前路径下的目录和文件
        dirs_result = conn.run(f'find {current_path} -maxdepth 1 -type d | sort', hide=True)
        files_result = conn.run(f'find {current_path} -maxdepth 1 -type f | sort', hide=True)
        
        directories = dirs_result.stdout.strip().split('
')
        files = files_result.stdout.strip().split('
')
        
        # 过滤掉空行和当前目录
        directories = [d for d in directories if d and d != current_path]
        files = [f for f in files if f]
        
        # 显示目录
        print("
目录:")
        if directories:
            for i, directory in enumerate(directories, 1):
                dir_name = os.path.basename(directory)
                print(f"{i}. {dir_name}/")
        else:
            print("  (无子目录)")
        
        # 显示文件
        print("
文件:")
        if files:
            for i, file in enumerate(files, 1):
                file_name = os.path.basename(file)
                # 获取文件大小
                size_result = conn.run(f'stat -c "%s" "{file}"', hide=True)
                size = int(size_result.stdout.strip())
                size_human = get_human_size(size)
                print(f"{i}. {file_name} ({size_human})")
        else:
            print("  (无文件)")
        
        # 用户操作菜单
        print("
操作:")
        print("cd [number] - 进入选择的目录")
        print("cat [number] - 查看文件内容")
        print("stat [number] - 查看文件详细信息")
        print("cd .. - 返回上级目录")
        print("q - 退出")
        
        choice = input("
请输入命令: ").strip()
        
        if choice.lower() == 'q':
            break
            
        elif choice.lower() == 'cd ..':
            current_path = os.path.dirname(current_path)
            
        elif choice.startswith('cd '):
            try:
                index = int(choice.split()[1]) - 1
                if 0 <= index < len(directories):
                    current_path = directories[index]
                else:
                    print("无效的目录索引")
            except (ValueError, IndexError):
                print("无效的命令格式")
                
        elif choice.startswith('cat '):
            try:
                index = int(choice.split()[1]) - 1
                if 0 <= index < len(files):
                    file_path = files[index]
                    file_size_result = conn.run(f'stat -c "%s" "{file_path}"', hide=True)
                    file_size = int(file_size_result.stdout.strip())
                    
                    if file_size > 1024 * 1024:  # 大于1MB
                        confirm = input(f"文件大小为 {get_human_size(file_size)},确定要查看? (y/n): ")
                        if confirm.lower() != 'y':
                            continue
                    
                    result = conn.run(f'cat "{file_path}"', hide=False)
                    
                    # 如果文件内容很长,可以使用分页显示
                    if len(result.stdout.split('
')) > 24:
                        print("
(按 q 退出查看)")
                        import subprocess
                        try:
                            subprocess.run(['less'], input=result.stdout.encode())
                        except:
                            print(result.stdout)
                    else:
                        print("
" + result.stdout)
                else:
                    print("无效的文件索引")
            except (ValueError, IndexError):
                print("无效的命令格式")
                
        elif choice.startswith('stat '):
            try:
                index = int(choice.split()[1]) - 1
                if 0 <= index < len(files):
                    file_path = files[index]
                    result = conn.run(f'stat "{file_path}"', hide=False)
                    print("
" + result.stdout)
                else:
                    print("无效的文件索引")
            except (ValueError, IndexError):
                print("无效的命令格式")
                
        else:
            print("未知命令")

def get_human_size(size_bytes):
    """将字节大小转换为人类可读的格式"""
    if size_bytes < 1024:
        return f"{size_bytes} B"
    elif size_bytes < 1024 * 1024:
        return f"{size_bytes/1024:.1f} KB"
    elif size_bytes < 1024 * 1024 * 1024:
        return f"{size_bytes/(1024*1024):.1f} MB"
    else:
        return f"{size_bytes/(1024*1024*1024):.1f} GB"

@task
def check_disk_usage(c, hosts, threshold=80):
    """检查多台主机的磁盘使用情况
    
    参数:
        hosts: 逗号分隔的主机列表
        threshold: 使用率警告阈值,默认80%
    """
    host_list = hosts.split(',')
    issues_found = False
    
    for host in host_list:
        print(f"
检查主机 {host} 的磁盘使用情况...")
        
        try:
            conn = Connection(host, config=config)
            result = conn.run('df -h', hide=True)
            
            # 解析df输出
            lines = result.stdout.strip().split('
')
            headers = lines[0]
            filesystems = lines[1:]
            
            print(f"
{host} 的磁盘使用情况:")
            print(headers)
            
            for fs in filesystems:
                print(fs)
                
                # 检查使用率是否超过阈值
                parts = fs.split()
                if len(parts) >= 5:
                    usage = parts[4].strip('%')
                    try:
                        usage_int = int(usage)
                        if usage_int >= threshold:
                            print(f"警告: {parts[0]} 使用率 {usage}% 超过阈值 {threshold}%")
                            issues_found = True
                    except ValueError:
                        # 忽略无法解析的行
                        pass
                        
        except Exception as e:
            print(f"检查 {host} 时出错: {str(e)}")
    
    return not issues_found  # 如果没有发现问题,返回True

使用方法:

bash

# 列出远程主机上的目录
fab list-directories --host web.example.com --path /var/www/html

# 交互式浏览远程主机目录
fab explore-directories --host web.example.com

# 检查多台主机的磁盘使用情况
fab check-disk-usage --hosts web1.example.com,web2.example.com,db1.example.com --threshold 90

7.3.5 示例3: 网关模式文件上传与执行

下面是一个示例,演示如何通过网关(堡垒机)连接到内部服务器,并上传文件和执行命令:

python

from fabric import Connection, Config, task
import os
import io
import time
from tempfile import NamedTemporaryFile

# 全局配置
config = Config(
    overrides={
        'run': {
            'pty': True,
        },
    }
)

@task
def gateway_command(c, gateway, target, command, gateway_user=None, target_user=None):
    """通过网关服务器在目标服务器上执行命令
    
    参数:
        gateway: 网关服务器地址
        target: 目标服务器地址
        command: 要执行的命令
        gateway_user: 网关服务器用户名(可选)
        target_user: 目标服务器用户名(可选)
    """
    # 构建连接参数
    gateway_kwargs = {}
    target_kwargs = {}
    
    if gateway_user:
        gateway_kwargs['user'] = gateway_user
    
    if target_user:
        target_kwargs['user'] = target_user
    
    print(f"连接到网关服务器 {gateway}...")
    with Connection(gateway, config=config, **gateway_kwargs) as gateway_conn:
        print(f"通过网关连接到目标服务器 {target}...")
        
        # 创建从网关到目标服务器的ProxyJump连接
        # 这需要在网关服务器上配置了到目标服务器的SSH访问权限
        with gateway_conn.forward_local(
            local_port=0,  # 自动选择一个本地端口
            remote_port=22,
            remote_host=target
        ) as tunnel:
            target_conn = Connection(
                "localhost", 
                port=tunnel.local_port,
                config=config,
                **target_kwargs
            )
            
            print(f"在目标服务器 {target} 上执行命令: {command}")
            result = target_conn.run(command, hide=False)
            
            print(f"命令执行完成,退出状态码: {result.return_code}")
            return result

@task
def gateway_file_upload(c, gateway, target, local_path, remote_path, 
                        gateway_user=None, target_user=None):
    """通过网关服务器上传文件到目标服务器
    
    参数:
        gateway: 网关服务器地址
        target: 目标服务器地址
        local_path: 本地文件路径
        remote_path: 目标服务器上的文件路径
        gateway_user: 网关服务器用户名(可选)
        target_user: 目标服务器用户名(可选)
    """
    # 检查本地文件是否存在
    if not os.path.exists(local_path):
        print(f"错误: 本地文件 {local_path} 不存在")
        return False
    
    # 构建连接参数
    gateway_kwargs = {}
    target_kwargs = {}
    
    if gateway_user:
        gateway_kwargs['user'] = gateway_user
    
    if target_user:
        target_kwargs['user'] = target_user
    
    print(f"连接到网关服务器 {gateway}...")
    with Connection(gateway, config=config, **gateway_kwargs) as gateway_conn:
        # 先将文件上传到网关服务器的临时目录
        temp_path = f"/tmp/fabric_upload_{os.path.basename(local_path)}_{int(time.time())}"
        print(f"上传文件到网关服务器: {local_path} -> {temp_path}")
        
        gateway_conn.put(local_path, temp_path)
        
        print(f"通过网关连接到目标服务器 {target}...")
        
        # 创建从网关到目标服务器的ProxyJump连接
        with gateway_conn.forward_local(
            local_port=0,
            remote_port=22,
            remote_host=target
        ) as tunnel:
            target_conn = Connection(
                "localhost", 
                port=tunnel.local_port,
                config=config,
                **target_kwargs
            )
            
            # 在网关上创建一个加密的压缩包
            print("创建临时压缩文件...")
            temp_tar = f"{temp_path}.tar.gz"
            gateway_conn.run(f"tar -czf {temp_tar} -C {os.path.dirname(temp_path)} {os.path.basename(temp_path)}")
            
            # 获取压缩包内容
            get_result = io.BytesIO()
            gateway_conn.get(temp_tar, get_result)
            get_result.seek(0)
            
            # 将压缩包通过SSH直接传输到目标服务器
            print(f"将文件传输到目标服务器: {remote_path}")
            
            # 确保目标目录存在
            target_dir = os.path.dirname(remote_path)
            if target_dir:
                target_conn.run(f"mkdir -p {target_dir}", warn=True)
            
            # 使用临时文件
            with NamedTemporaryFile() as temp_file:
                temp_file.write(get_result.read())
                temp_file.flush()
                
                # 上传到目标服务器
                target_conn.put(temp_file.name, f"{remote_path}.tar.gz")
            
            # 在目标服务器上解压
            target_conn.run(f"tar -xzf {remote_path}.tar.gz -C {os.path.dirname(remote_path)}")
            target_conn.run(f"mv {os.path.dirname(remote_path)}/{os.path.basename(temp_path)} {remote_path}")
            target_conn.run(f"rm {remote_path}.tar.gz")
            
            # 清理网关上的临时文件
            gateway_conn.run(f"rm {temp_path} {temp_tar}")
            
            print(f"文件成功上传到 {target}:{remote_path}")
            
            # 验证文件大小
            local_size = os.path.getsize(local_path)
            remote_size_result = target_conn.run(f"stat -c %s {remote_path}", hide=True)
            remote_size = int(remote_size_result.stdout.strip())
            
            if local_size == remote_size:
                print(f"文件大小验证成功: {local_size} 字节")
                return True
            else:
                print(f"警告: 文件大小不匹配! 本地: {local_size} 字节, 远程: {remote_size} 字节")
                return False

@task
def deploy_via_gateway(c, gateway, targets, app_path, deploy_path, restart=False):
    """通过网关部署应用到多个目标服务器
    
    参数:
        gateway: 网关服务器地址
        targets: 逗号分隔的目标服务器列表
        app_path: 本地应用路径
        deploy_path: 部署路径
        restart: 是否重启应用
    """
    target_list = targets.split(',')
    success_count = 0
    error_count = 0
    
    # 打包应用
    app_name = os.path.basename(app_path.rstrip('/'))
    local_tar = f"/tmp/{app_name}.tar.gz"
    
    print(f"打包应用 {app_path} -> {local_tar}")
    os.system(f"tar -czf {local_tar} -C {os.path.dirname(app_path)} {app_name}")
    
    # 连接到网关
    print(f"连接到网关服务器 {gateway}...")
    with Connection(gateway, config=config) as gateway_conn:
        # 将应用包上传到网关
        gateway_temp = f"/tmp/{app_name}.tar.gz"
        print(f"上传应用包到网关: {local_tar} -> {gateway_temp}")
        gateway_conn.put(local_tar, gateway_temp)
        
        # 部署到每个目标服务器
        for target in target_list:
            print(f"
开始部署到 {target}...")
            
            try:
                # 建立到目标服务器的连接
                with gateway_conn.forward_local(
                    local_port=0,
                    remote_port=22,
                    remote_host=target
                ) as tunnel:
                    target_conn = Connection(
                        "localhost", 
                        port=tunnel.local_port,
                        config=config
                    )
                    
                    # 确保部署目录存在
                    target_conn.run(f"mkdir -p {os.path.dirname(deploy_path)}", warn=True)
                    
                    # 备份现有应用
                    backup_path = f"{deploy_path}_backup_{time.strftime('%Y%m%d_%H%M%S')}"
                    if target_conn.run(f"test -d {deploy_path}", warn=True).ok:
                        print(f"备份现有应用: {deploy_path} -> {backup_path}")
                        target_conn.run(f"cp -a {deploy_path} {backup_path}")
                    
                    # 部署新版本
                    target_temp = f"/tmp/{app_name}.tar.gz"
                    print(f"传输应用包到目标服务器: {target_temp}")
                    
                    # 直接使用SCP通过网关传输
                    print("使用SCP通过网关传输应用包...")
                    gateway_conn.run(f"scp {gateway_temp} {target}:{target_temp}")
                    
                    print("解压应用包...")
                    target_conn.run(f"rm -rf {deploy_path}")
                    target_conn.run(f"mkdir -p {deploy_path}")
                    target_conn.run(f"tar -xzf {target_temp} -C {os.path.dirname(deploy_path)}")
                    
                    # 设置权限
                    print("设置应用权限...")
                    target_conn.run(f"chmod -R 755 {deploy_path}")
                    
                    # 清理临时文件
                    target_conn.run(f"rm {target_temp}")
                    
                    # 如果需要,重启应用
                    if restart:
                        print("重启应用...")
                        # 这里可以根据实际应用的重启方式调整命令
                        if target_conn.run(f"test -f {deploy_path}/restart.sh", warn=True).ok:
                            target_conn.run(f"cd {deploy_path} && ./restart.sh")
                        else:
                            print("未找到restart.sh脚本,跳过重启")
                    
                    print(f"部署到 {target} 成功")
                    success_count += 1
                    
            except Exception as e:
                print(f"部署到 {target} 失败: {str(e)}")
                error_count += 1
        
        # 清理网关上的临时文件
        gateway_conn.run(f"rm {gateway_temp}")
    
    # 清理本地临时文件
    os.remove(local_tar)
    
    print(f"
部署完成: {success_count} 成功, {error_count} 失败")
    return success_count, error_count

使用方法:

bash

# 通过网关执行命令
fab gateway-command --gateway bastion.example.com --target internal.example.com 
                   --command "df -h" --gateway-user admin --target-user user

# 通过网关上传文件
fab gateway-file-upload --gateway bastion.example.com --target internal.example.com 
                       --local-path /path/to/local/file.txt 
                       --remote-path /path/on/target/file.txt

# 通过网关部署应用到多个服务器
fab deploy-via-gateway --gateway bastion.example.com 
                      --targets web1.internal,web2.internal,web3.internal 
                      --app-path /path/to/myapp 
                      --deploy-path /var/www/myapp 
                      --restart

7.4 Fabric 应用示例

以下示例展示Fabric在实际运维场景中的应用,包括文件操作、服务器环境配置和代码发布管理。

7.4.1 示例1: 文件打包、上传与校验

以下示例实现了文件打包、上传和完整性校验功能:

python

from fabric import Connection, Config, task
import os
import hashlib
import tempfile
import time
from invoke import run as local

# 全局配置
config = Config(
    overrides={
        'run': {
            'pty': True,
        },
    }
)

def calculate_md5(filepath):
    """计算文件的MD5哈希值"""
    hash_md5 = hashlib.md5()
    with open(filepath, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

@task
def package_files(c, source_dir, output_name=None):
    """打包文件或目录
    
    参数:
        source_dir: 源文件或目录路径
        output_name: 输出文件名,默认为源目录名加时间戳
    """
    if not os.path.exists(source_dir):
        print(f"错误: 源路径 {source_dir} 不存在")
        return None
    
    # 确定输出文件名
    if not output_name:
        base_name = os.path.basename(source_dir.rstrip('/'))
        output_name = f"{base_name}_{int(time.time())}.tar.gz"
    
    if not output_name.endswith('.tar.gz'):
        output_name += '.tar.gz'
    
    # 创建临时目录
    with tempfile.TemporaryDirectory() as temp_dir:
        output_path = os.path.join(temp_dir, output_name)
        
        # 打包文件
        print(f"打包 {source_dir} -> {output_path}")
        result = local(f"tar -czf {output_path} -C {os.path.dirname(source_dir)} {os.path.basename(source_dir.rstrip('/'))}", hide=True)
        
        if result.ok:
            # 计算MD5
            md5 = calculate_md5(output_path)
            print(f"打包完成,MD5: {md5}")
            
            # 移动到当前目录
            local(f"cp {output_path} ./")
            local_path = f"./{output_name}"
            
            print(f"打包文件已保存到: {local_path}")
            return local_path, md5
        else:
            print(f"打包失败: {result.stderr}")
            return None, None

@task
def upload_with_verify(c, host, package_path, remote_dir, md5=None):
    """上传文件并验证完整性
    
    参数:
        host: 目标主机
        package_path: 本地包路径
        remote_dir: 远程目录
        md5: 预期的MD5值,如果为None则在上传前计算
    """
    if not os.path.exists(package_path):
        print(f"错误: 本地文件 {package_path} 不存在")
        return False
    
    # 如果未提供MD5,计算一个
    if not md5:
        print("计算本地文件MD5...")
        md5 = calculate_md5(package_path)
    
    print(f"本地文件MD5: {md5}")
    
    # 连接到目标服务器
    print(f"连接到 {host}...")
    conn = Connection(host, config=config)
    
    # 确保远程目录存在
    conn.run(f"mkdir -p {remote_dir}", warn=True)
    
    # 上传文件
    remote_path = f"{remote_dir}/{os.path.basename(package_path)}"
    print(f"上传文件: {package_path} -> {remote_path}")
    
    start_time = time.time()
    conn.put(package_path, remote_path)
    upload_time = time.time() - start_time
    
    # 计算上传速度
    file_size = os.path.getsize(package_path)
    speed_mbps = (file_size / 1024 / 1024) / upload_time
    
    print(f"上传完成,耗时: {upload_time:.2f}秒,速度: {speed_mbps:.2f} MB/s")
    
    # 在远程服务器上计算MD5
    print("验证远程文件MD5...")
    result = conn.run(f"md5sum {remote_path} | cut -d' ' -f1", hide=True)
    remote_md5 = result.stdout.strip()
    
    print(f"远程文件MD5: {remote_md5}")
    
    # 比较MD5
    if md5 == remote_md5:
        print("验证成功: MD5匹配")
        return True
    else:
        print("验证失败: MD5不匹配!")
        return False

@task
def unpack_and_verify(c, host, package_path, extract_dir, md5=None):
    """上传、解压并验证包
    
    参数:
        host: 目标主机
        package_path: 本地包路径
        extract_dir: 解压目录
        md5: 预期的MD5值
    """
    # 上传并验证
    if not upload_with_verify(c, host, package_path, "/tmp", md5):
        print("上传验证失败,中止操作")
        return False
    
    # 连接到目标服务器
    conn = Connection(host, config=config)
    
    # 备份现有目录
    remote_package = f"/tmp/{os.path.basename(package_path)}"
    if conn.run(f"test -d {extract_dir}", warn=True).ok:
        backup_dir = f"{extract_dir}_backup_{time.strftime('%Y%m%d_%H%M%S')}"
        print(f"备份现有目录: {extract_dir} -> {backup_dir}")
        conn.run(f"cp -a {extract_dir} {backup_dir}")
    
    # 确保解压目录存在
    conn.run(f"mkdir -p {extract_dir}", warn=True)
    
    # 解压文件
    print(f"解压文件到 {extract_dir}")
    conn.run(f"tar -xzf {remote_package} -C {os.path.dirname(extract_dir)}")
    
    # 验证解压后的文件
    print("验证解压结果...")
    original_filename = os.path.basename(package_path).replace('.tar.gz', '')
    expected_path = f"{os.path.dirname(extract_dir)}/{original_filename}"
    
    if conn.run(f"test -e {expected_path}", warn=True).ok:
        # 如果解压目录与期望目录不同,需要移动
        if expected_path != extract_dir:
            print(f"移动文件: {expected_path} -> {extract_dir}")
            conn.run(f"rm -rf {extract_dir}")
            conn.run(f"mv {expected_path} {extract_dir}")
        
        print(f"文件成功解压到 {extract_dir}")
        
        # 清理临时文件
        conn.run(f"rm {remote_package}")
        
        return True
    else:
        print(f"解压失败: 找不到预期的路径 {expected_path}")
        return False

@task
def distribute_package(c, hosts, source_dir, remote_dir, extract=True):
    """将文件打包并分发到多台服务器
    
    参数:
        hosts: 逗号分隔的主机列表
        source_dir: 源文件或目录
        remote_dir: 远程目录
        extract: 是否解压文件
    """
    host_list = hosts.split(',')
    
    # 打包文件
    package_path, md5 = package_files(c, source_dir)
    if not package_path:
        return False
    
    success_count = 0
    fail_count = 0
    
    # 分发到每台主机
    for host in host_list:
        print(f"
处理主机: {host}")
        
        try:
            if extract:
                # 上传、解压并验证
                if unpack_and_verify(c, host, package_path, remote_dir, md5):
                    success_count += 1
                else:
                    fail_count += 1
            else:
                # 仅上传并验证
                if upload_with_verify(c, host, package_path, remote_dir, md5):
                    success_count += 1
                else:
                    fail_count += 1
        except Exception as e:
            print(f"处理 {host} 时出错: {str(e)}")
            fail_count += 1
    
    print(f"
分发完成: {success_count} 成功, {fail_count} 失败")
    return success_count, fail_count

使用方法:

bash

# 打包文件
fab package-files --source-dir /path/to/myapp

# 上传并验证文件
fab upload-with-verify --host web.example.com --package-path myapp.tar.gz --remote-dir /opt/apps

# 上传、解压并验证
fab unpack-and-verify --host web.example.com --package-path myapp.tar.gz --extract-dir /var/www/myapp

# 将文件分发到多台服务器
fab distribute-package --hosts web1.example.com,web2.example.com,web3.example.com 
                     --source-dir /path/to/myapp --remote-dir /var/www/myapp

7.4.2 示例2: 部署LNMP业务服务环境

以下示例展示如何使用Fabric部署LNMP(Linux + Nginx + MySQL + PHP)环境:

python

from fabric import Connection, Config, task
import os
import time

# 全局配置
config = Config(
    overrides={
        'run': {
            'pty': True,
        },
    }
)

@task
def install_lnmp(c, host, mysql_root_password, web_root="/var/www/html"):
    """在服务器上安装LNMP环境
    
    参数:
        host: 目标主机
        mysql_root_password: MySQL root密码
        web_root: Web根目录,默认为/var/www/html
    """
    print(f"连接到 {host}...")
    conn = Connection(host, config=config)
    
    # 检查系统类型
    if conn.run("which apt", warn=True, hide=True).ok:
        # Debian/Ubuntu系统
        install_lnmp_debian(conn, mysql_root_password, web_root)
    elif conn.run("which yum", warn=True, hide=True).ok:
        # CentOS/RHEL系统
        install_lnmp_centos(conn, mysql_root_password, web_root)
    else:
        print("不支持的系统类型")
        return False
    
    return True

def install_lnmp_debian(conn, mysql_root_password, web_root):
    """在Debian/Ubuntu系统上安装LNMP环境"""
    print("更新系统包...")
    conn.sudo("apt-get update")
    
    print("安装LNMP基础包...")
    conn.sudo("DEBIAN_FRONTEND=noninteractive apt-get install -y nginx mysql-server php-fpm php-mysql php-cli php-curl php-gd php-mbstring php-xml php-zip unzip")
    
    # 配置MySQL
    print("配置MySQL...")
    mysql_secure_installation = f"""
    sudo mysql -u root <<EOF
    ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY '{mysql_root_password}';
    DELETE FROM mysql.user WHERE User='';
    DELETE FROM mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');
    DROP DATABASE IF EXISTS test;
    DELETE FROM mysql.db WHERE Db='test' OR Db='test\_%';
    FLUSH PRIVILEGES;
EOF
    """
    conn.run(mysql_secure_installation)
    
    # 配置PHP-FPM
    print("配置PHP-FPM...")
    conn.sudo("sed -i 's/;cgi.fix_pathinfo=1/cgi.fix_pathinfo=0/g' /etc/php/*/fpm/php.ini")
    conn.sudo("systemctl restart php*-fpm.service")
    
    # 配置Nginx
    print("配置Nginx...")
    nginx_conf = f"""
server {
           {
    listen 80 default_server;
    listen [::]:80 default_server;
    
    root {web_root};
    index index.php index.html index.htm;
    
    server_name _;
    
    location / {
           {
        try_files $uri $uri/ =404;
    }}
    
    location ~ \.php$ {
           {
        include snippets/fastcgi-php.conf;
        fastcgi_pass unix:/var/run/php/php7.4-fpm.sock;
    }}
    
    location ~ /\.ht {
           {
        deny all;
    }}
}}
    """
    
    # 创建配置文件
    conn.sudo(f"mkdir -p {web_root}")
    conn.run(f"echo '{nginx_conf}' > /tmp/default")
    conn.sudo("mv /tmp/default /etc/nginx/sites-available/default")
    
    # 创建测试页面
    php_info = "<?php phpinfo(); ?>"
    conn.run(f"echo '{php_info}' > /tmp/info.php")
    conn.sudo(f"mv /tmp/info.php {web_root}/info.php")
    conn.sudo(f"chown -R www-data:www-data {web_root}")
    
    # 重启服务
    print("重启服务...")
    conn.sudo("systemctl restart nginx")
    
    print("LNMP环境安装完成!")
    print(f"MySQL root密码: {mysql_root_password}")
    print(f"测试PHP: http://<server_ip>/info.php")

def install_lnmp_centos(conn, mysql_root_password, web_root):
    """在CentOS/RHEL系统上安装LNMP环境"""
    print("更新系统包...")
    conn.sudo("yum update -y")
    
    # 安装EPEL仓库
    conn.sudo("yum install -y epel-release")
    
    # 安装Nginx
    print("安装Nginx...")
    conn.sudo("yum install -y nginx")
    conn.sudo("systemctl enable nginx")
    conn.sudo("systemctl start nginx")
    
    # 安装MariaDB (MySQL)
    print("安装MariaDB...")
    conn.sudo("yum install -y mariadb-server")
    conn.sudo("systemctl enable mariadb")
    conn.sudo("systemctl start mariadb")
    
    # 配置MySQL
    mysql_secure_installation = f"""
sudo mysql -u root <<EOF
UPDATE mysql.user SET Password=PASSWORD('{mysql_root_password}') WHERE User='root';
DELETE FROM mysql.user WHERE User='';
DELETE FROM mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');
DROP DATABASE IF EXISTS test;
DELETE FROM mysql.db WHERE Db='test' OR Db='test\_%';
FLUSH PRIVILEGES;
EOF
    """
    conn.run(mysql_secure_installation)
    
    # 安装PHP
    print("安装PHP...")
    conn.sudo("yum install -y php php-fpm php-mysqlnd php-cli php-curl php-gd php-mbstring php-xml php-zip unzip")
    conn.sudo("systemctl enable php-fpm")
    conn.sudo("systemctl start php-fpm")
    
    # 配置PHP-FPM
    conn.sudo("sed -i 's/;cgi.fix_pathinfo=1/cgi.fix_pathinfo=0/g' /etc/php.ini")
    conn.sudo("sed -i 's/user = apache/user = nginx/g' /etc/php-fpm.d/www.conf")
    conn.sudo("sed -i 's/group = apache/group = nginx/g' /etc/php-fpm.d/www.conf")
    conn.sudo("systemctl restart php-fpm")
    
    # 配置Nginx
    nginx_conf = f"""
server {
           {
    listen 80 default_server;
    listen [::]:80 default_server;
    
    root {web_root};
    index index.php index.html index.htm;
    
    server_name _;
    
    location / {
           {
        try_files $uri $uri/ =404;
    }}
    
    location ~ \.php$ {
           {
        try_files $uri =404;
        fastcgi_pass 127.0.0.1:9000;
        fastcgi_index index.php;
        fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
        include fastcgi_params;
    }}
    
    location ~ /\.ht {
           {
        deny all;
    }}
}}
    """
    
    # 创建配置文件
    conn.sudo(f"mkdir -p {web_root}")
    conn.run(f"echo '{nginx_conf}' > /tmp/default.conf")
    conn.sudo("mv /tmp/default.conf /etc/nginx/conf.d/default.conf")
    
    # 创建测试页面
    php_info = "<?php phpinfo(); ?>"
    conn.run(f"echo '{php_info}' > /tmp/info.php")
    conn.sudo(f"mv /tmp/info.php {web_root}/info.php")
    conn.sudo(f"chown -R nginx:nginx {web_root}")
    
    # 配置SELinux
    if conn.run("which sestatus", warn=True, hide=True).ok:
        status = conn.run("sestatus | grep 'SELinux status'", hide=True).stdout
        if "enabled" in status:
            print("配置SELinux...")
            conn.sudo("setsebool -P httpd_can_network_connect 1")
    
    # 配置防火墙
    if conn.run("which firewall-cmd", warn=True, hide=True).ok:
        print("配置防火墙...")
        conn.sudo("firewall-cmd --permanent --zone=public --add-service=http")
        conn.sudo("firewall-cmd --permanent --zone=public --add-service=https")
        conn.sudo("firewall-cmd --reload")
    
    # 重启服务
    print("重启服务...")
    conn.sudo("systemctl restart nginx")
    
    print("LNMP环境安装完成!")
    print(f"MySQL root密码: {mysql_root_password}")
    print(f"测试PHP: http://<server_ip>/info.php")

@task
def deploy_website(c, host, source_dir, domain, web_root=None, db_name=None, db_user=None, db_password=None):
    """部署网站到LNMP环境
    
    参数:
        host: 目标主机
        source_dir: 本地源代码目录
        domain: 网站域名
        web_root: Web根目录,如未指定则使用/var/www/{domain}
        db_name: 数据库名称(可选)
        db_user: 数据库用户名(可选)
        db_password: 数据库密码(可选)
    """
    if not web_root:
        web_root = f"/var/www/{domain}"
    
    # 检查源目录是否存在
    if not os.path.isdir(source_dir):
        print(f"错误: 源目录 {source_dir} 不存在")
        return False
    
    print(f"连接到 {host}...")
    conn = Connection(host, config=config)
    
    # 创建Web目录
    print(f"创建Web目录: {web_root}")
    conn.sudo(f"mkdir -p {web_root}")
    
    # 上传网站文件
    print("上传网站文件...")
    local_tar = f"/tmp/{domain}.tar.gz"
    remote_tar = f"/tmp/{domain}.tar.gz"
    
    # 打包源目录
    print(f"打包源目录: {source_dir} -> {local_tar}")
    os.system(f"tar -czf {local_tar} -C {os.path.dirname(source_dir)} {os.path.basename(source_dir)}")
    
    # 上传压缩包
    print(f"上传网站文件: {local_tar} -> {remote_tar}")
    conn.put(local_tar, remote_tar)
    
    # 解压文件
    print(f"解压文件到 {web_root}")
    conn.sudo(f"tar -xzf {remote_tar} -C {os.path.dirname(web_root)}")
    
    # 如果解压的目录名与目标目录名不同,则进行移动
    source_name = os.path.basename(source_dir.rstrip('/'))
    if source_name != os.path.basename(web_root.rstrip('/')):
        extracted_path = f"{os.path.dirname(web_root)}/{source_name}"
        conn.sudo(f"rm -rf {web_root}")
        conn.sudo(f"mv {extracted_path} {web_root}")
    
    # 设置权限
    print("设置文件权限...")
    if conn.run("which apt", warn=True, hide=True).ok:
        # Debian/Ubuntu系统
        conn.sudo(f"chown -R www-data:www-data {web_root}")
    else:
        # CentOS/RHEL系统
        conn.sudo(f"chown -R nginx:nginx {web_root}")
    
    conn.sudo(f"chmod -R 755 {web_root}")
    
    # 如果有需要写入权限的目录,单独设置
    for write_dir in ['uploads', 'cache', 'logs', 'tmp']:
        if conn.run(f"test -d {web_root}/{write_dir}", warn=True, hide=True).ok:
            conn.sudo(f"chmod -R 775 {web_root}/{write_dir}")
    
    # 创建Nginx配置
    print(f"配置Nginx虚拟主机: {domain}")
    nginx_conf = f"""
server {
           {
    listen 80;
    server_name {domain} www.{domain};
    
    root {web_root};
    index index.php index.html index.htm;
    
    location / {
           {
        try_files $uri $uri/ /index.php?$args;
    }}
    
    location ~ \.php$ {
           {
"""
    
    if conn.run("which apt", warn=True, hide=True).ok:
        # Debian/Ubuntu系统
        nginx_conf += """
        include snippets/fastcgi-php.conf;
        fastcgi_pass unix:/var/run/php/php7.4-fpm.sock;
"""
    else:
        # CentOS/RHEL系统
        nginx_conf += """
        try_files $uri =404;
        fastcgi_pass 127.0.0.1:9000;
        fastcgi_index index.php;
        fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
        include fastcgi_params;
"""
    
    nginx_conf += """
    }
    
    location ~ /\.ht {
        deny all;
    }
}
"""
    
    # 写入配置文件
    conn.run(f"echo '{nginx_conf}' > /tmp/{domain}.conf")
    
    if conn.run("which apt", warn=True, hide=True).ok:
        # Debian/Ubuntu系统
        conn.sudo(f"mv /tmp/{domain}.conf /etc/nginx/sites-available/{domain}.conf")
        conn.sudo(f"ln -sf /etc/nginx/sites-available/{domain}.conf /etc/nginx/sites-enabled/{domain}.conf")
    else:
        # CentOS/RHEL系统
        conn.sudo(f"mv /tmp/{domain}.conf /etc/nginx/conf.d/{domain}.conf")
    
    # 如果提供了数据库信息,创建数据库
    if db_name and db_user and db_password:
        print(f"创建数据库: {db_name} 和用户: {db_user}")
        create_db_sql = f"""
sudo mysql -e "CREATE DATABASE IF NOT EXISTS {db_name};"
sudo mysql -e "CREATE USER IF NOT EXISTS '{db_user}'@'localhost' IDENTIFIED BY '{db_password}';"
sudo mysql -e "GRANT ALL PRIVILEGES ON {db_name}.* TO '{db_user}'@'localhost';"
sudo mysql -e "FLUSH PRIVILEGES;"
        """
        conn.run(create_db_sql)
        
        # 如果源目录中有SQL文件,导入数据库
        if conn.run(f"find {web_root} -name '*.sql' | wc -l", hide=True).stdout.strip() != '0':
            print("导入SQL文件...")
            sql_files = conn.run(f"find {web_root} -name '*.sql'", hide=True).stdout.strip().split('
')
            for sql_file in sql_files:
                conn.run(f"sudo mysql {db_name} < {sql_file}")
    
    # 重启Nginx
    print("重启Nginx...")
    conn.sudo("systemctl restart nginx")
    
    # 清理临时文件
    conn.run(f"rm {remote_tar}")
    os.remove(local_tar)
    
    print(f"网站部署完成: http://{domain}")
    if db_name:
        print(f"数据库信息: DB={db_name}, User={db_user}, Password={db_password}")
    
    return True

使用方法:

bash

# 安装LNMP环境
fab install-lnmp --host web.example.com --mysql-root-password SecurePass123

# 部署网站
fab deploy-website --host web.example.com --source-dir /path/to/mysite 
                  --domain example.com 
                  --db-name mysite_db --db-user mysite_user --db-password DbPass123

7.4.3 示例3: 生产环境代码包发布管理

以下示例实现了完整的生产环境代码发布流程,包括构建、版本控制、部署和回滚:

python

from fabric import Connection, Config, task
import os
import time
import json
import tempfile
import hashlib
import shutil
from datetime import datetime
from invoke import run as local

# 全局配置
config = Config(
    overrides={
        'run': {
            'pty': True,
        },
    }
)

# 发布配置
class DeployConfig:
    VERSION_FILE = 'version.json'  # 版本信息文件
    KEEP_RELEASES = 5              # 保留的最近发布版本数
    SHARED_DIRS = ['uploads', 'logs', 'cache']  # 共享目录

@task
def build(c, app_dir, version=None):
    """构建应用包
    
    参数:
        app_dir: 应用目录
        version: 版本号,默认使用当前时间
    """
    if not os.path.isdir(app_dir):
        print(f"错误: 应用目录 {app_dir} 不存在")
        return None
    
    # 如果未指定版本号,使用时间戳
    if not version:
        version = datetime.now().strftime("%Y%m%d%H%M%S")
    
    app_name = os.path.basename(app_dir.rstrip('/'))
    build_dir = f"/tmp/build_{app_name}_{version}"
    output_file = f"{app_name}_{version}.tar.gz"
    
    # 清理之前的构建目录
    if os.path.exists(build_dir):
        shutil.rmtree(build_dir)
    
    # 复制应用文件到构建目录
    print(f"复制应用文件到构建目录: {app_dir} -> {build_dir}")
    shutil.copytree(app_dir, build_dir, symlinks=True, ignore=shutil.ignore_patterns(
        '*.git*', '*.svn*', '*.idea*', '*.vscode*', 'node_modules', '*.log', '*.tmp'
    ))
    
    # 创建版本信息文件
    version_info = {
        'version': version,
        'timestamp': int(time.time()),
        'date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        'commit': None
    }
    
    # 如果是Git仓库,获取提交信息
    git_dir = os.path.join(app_dir, '.git')
    if os.path.exists(git_dir):
        try:
            # 获取当前分支
            branch = local("cd " + app_dir + " && git rev-parse --abbrev-ref HEAD", hide=True).stdout.strip()
            version_info['branch'] = branch
            
            # 获取最近提交
            commit = local("cd " + app_dir + " && git rev-parse HEAD", hide=True).stdout.strip()
            version_info['commit'] = commit
            
            # 获取提交作者
            author = local("cd " + app_dir + " && git log -1 --pretty=format:'%an'", hide=True).stdout.strip()
            version_info['author'] = author
            
            # 获取提交信息
            message = local("cd " + app_dir + " && git log -1 --pretty=format:'%s'", hide=True).stdout.strip()
            version_info['message'] = message
            
            # 检查是否有未提交的更改
            status = local("cd " + app_dir + " && git status --porcelain", hide=True).stdout
            version_info['clean'] = len(status.strip()) == 0
            
        except Exception as e:
            print(f"获取Git信息时出错: {str(e)}")
    
    # 写入版本信息文件
    with open(os.path.join(build_dir, DeployConfig.VERSION_FILE), 'w') as f:
        json.dump(version_info, f, indent=2)
    
    # 创建共享目录的占位符
    for shared_dir in DeployConfig.SHARED_DIRS:
        shared_path = os.path.join(build_dir, shared_dir)
        if os.path.exists(shared_path):
            shutil.rmtree(shared_path)
        os.makedirs(shared_path, exist_ok=True)
        with open(os.path.join(shared_path, '.gitkeep'), 'w') as f:
            f.write('')
    
    # 打包构建目录
    print(f"打包应用: {build_dir} -> {output_file}")
    local(f"tar -czf {output_file} -C {os.path.dirname(build_dir)} {os.path.basename(build_dir)}")
    
    # 计算包的MD5
    md5 = calculate_md5(output_file)
    print(f"构建完成: {output_file} (MD5: {md5})")
    
    # 清理构建目录
    shutil.rmtree(build_dir)
    
    return {
        'file': output_file,
        'version': version,
        'md5': md5,
        'info': version_info
    }

def calculate_md5(filepath):
    """计算文件的MD5哈希值"""
    hash_md5 = hashlib.md5()
    with open(filepath, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

@task
def deploy(c, host, package_path, deploy_dir, shared_dir=None):
    """部署应用到服务器
    
    参数:
        host: 目标主机
        package_path: 本地包路径
        deploy_dir: 部署目录
        shared_dir: 共享目录,默认为{deploy_dir}/shared
    """
    if not os.path.exists(package_path):
        print(f"错误: 本地文件 {package_path} 不存在")
        return False
    
    # 如果未指定共享目录,使用默认路径
    if not shared_dir:
        shared_dir = f"{deploy_dir}/shared"
    
    # 提取版本号
    filename = os.path.basename(package_path)
    version = filename.split('_')[-1].replace('.tar.gz', '')
    
    print(f"部署版本 {version} 到 {host}:{deploy_dir}")
    
    # 连接到目标服务器
    conn = Connection(host, config=config)
    
    # 创建必要的目录
    print("创建部署目录结构...")
    conn.sudo(f"mkdir -p {deploy_dir}/releases/{version}")
    conn.sudo(f"mkdir -p {shared_dir}")
    
    # 上传应用包
    print(f"上传应用包: {package_path}")
    remote_package = f"/tmp/{filename}"
    conn.put(package_path, remote_package)
    
    # 解压应用包
    print(f"解压应用包到 {deploy_dir}/releases/{version}")
    conn.sudo(f"tar -xzf {remote_package} -C {deploy_dir}/releases/{version} --strip-components=1")
    
    # 创建共享目录链接
    print("创建共享目录链接...")
    for shared_folder in DeployConfig.SHARED_DIRS:
        # 确保共享目录存在
        conn.sudo(f"mkdir -p {shared_dir}/{shared_folder}")
        
        # 删除发布目录中的对应目录
        conn.sudo(f"rm -rf {deploy_dir}/releases/{version}/{shared_folder}")
        
        # 创建符号链接
        conn.sudo(f"ln -s {shared_dir}/{shared_folder} {deploy_dir}/releases/{version}/{shared_folder}")
    
    # 更新当前版本链接
    print("更新当前版本链接...")
    conn.sudo(f"ln -sfn {deploy_dir}/releases/{version} {deploy_dir}/current")
    
    # 更新版本历史
    print("更新版本历史...")
    version_history = f"{deploy_dir}/version_history.json"
    
    # 读取版本信息
    version_file = f"{deploy_dir}/releases/{version}/{DeployConfig.VERSION_FILE}"
    version_data = conn.sudo(f"cat {version_file}", hide=True).stdout
    version_info = json.loads(version_data)
    
    # 检查版本历史文件是否存在
    history_exists = conn.run(f"test -f {version_history}", warn=True, hide=True).ok
    
    if history_exists:
        # 读取现有历史
        history_data = conn.sudo(f"cat {version_history}", hide=True).stdout
        history = json.loads(history_data)
    else:
        # 创建新的历史记录
        history = {"versions": []}
    
    # 添加当前版本
    deploy_info = {
        "version": version,
        "timestamp": int(time.time()),
        "date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "info": version_info
    }
    
    history["versions"].insert(0, deploy_info)
    
    # 只保留最近N个版本
    if len(history["versions"]) > DeployConfig.KEEP_RELEASES:
        old_versions = history["versions"][DeployConfig.KEEP_RELEASES:]
        history["versions"] = history["versions"][:DeployConfig.KEEP_RELEASES]
        
        # 清理旧版本
        print(f"清理旧版本...")
        for old_ver in old_versions:
            old_version = old_ver["version"]
            old_path = f"{deploy_dir}/releases/{old_version}"
            if conn.run(f"test -d {old_path}", warn=True, hide=True).ok:
                conn.sudo(f"rm -rf {old_path}")
                print(f"已删除旧版本: {old_version}")
    
    # 保存更新后的历史
    with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file:
        json.dump(history, temp_file, indent=2)
        temp_file_path = temp_file.name
    
    conn.put(temp_file_path, "/tmp/version_history.json")
    conn.sudo(f"mv /tmp/version_history.json {version_history}")
    os.unlink(temp_file_path)
    
    # 清理临时文件
    conn.run(f"rm {remote_package}")
    
    print(f"部署完成: {host}:{deploy_dir}/current -> releases/{version}")
    return True

@task
def rollback(c, host, deploy_dir, steps=1):
    """回滚到之前的版本
    
    参数:
        host: 目标主机
        deploy_dir: 部署目录
        steps: 回滚步数,默认为1
    """
    print(f"回滚部署: {host}:{deploy_dir}, 步数: {steps}")
    
    # 连接到目标服务器
    conn = Connection(host, config=config)
    
    # 读取版本历史
    version_history = f"{deploy_dir}/version_history.json"
    
    if not conn.run(f"test -f {version_history}", warn=True, hide=True).ok:
        print(f"错误: 版本历史文件不存在: {version_history}")
        return False
    
    # 读取历史记录
    history_data = conn.sudo(f"cat {version_history}", hide=True).stdout
    history = json.loads(history_data)
    
    if len(history["versions"]) <= steps:
        print(f"错误: 没有足够的历史版本可供回滚")
        return False
    
    # 获取当前版本和目标版本
    current_version = history["versions"][0]["version"]
    target_version = history["versions"][steps]["version"]
    
    print(f"回滚版本: {current_version} -> {target_version}")
    
    # 检查目标版本目录是否存在
    target_path = f"{deploy_dir}/releases/{target_version}"
    if not conn.run(f"test -d {target_path}", warn=True, hide=True).ok:
        print(f"错误: 目标版本目录不存在: {target_path}")
        return False
    
    # 更新当前版本链接
    print("更新当前版本链接...")
    conn.sudo(f"ln -sfn {target_path} {deploy_dir}/current")
    
    # 更新版本历史
    print("更新版本历史...")
    
    # 将回滚的版本移动到历史记录的顶部
    rollback_version = history["versions"].pop(steps)
    rollback_version["rollback"] = True
    rollback_version["rollback_from"] = current_version
    rollback_version["timestamp"] = int(time.time())
    rollback_version["date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    history["versions"].insert(0, rollback_version)
    
    # 保存更新后的历史
    with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file:
        json.dump(history, temp_file, indent=2)
        temp_file_path = temp_file.name
    
    conn.put(temp_file_path, "/tmp/version_history.json")
    conn.sudo(f"mv /tmp/version_history.json {version_history}")
    os.unlink(temp_file_path)
    
    print(f"回滚完成: {host}:{deploy_dir}/current -> releases/{target_version}")
    return True

@task
def status(c, host, deploy_dir):
    """查看部署状态
    
    参数:
        host: 目标主机
        deploy_dir: 部署目录
    """
    print(f"查看部署状态: {host}:{deploy_dir}")
    
    # 连接到目标服务器
    conn = Connection(host, config=config)
    
    # 检查部署目录是否存在
    if not conn.run(f"test -d {deploy_dir}", warn=True, hide=True).ok:
        print(f"错误: 部署目录不存在: {deploy_dir}")
        return False
    
    # 检查当前版本链接
    current_link = f"{deploy_dir}/current"
    if conn.run(f"test -L {current_link}", warn=True, hide=True).ok:
        current_path = conn.run(f"readlink {current_link}", hide=True).stdout.strip()
        current_version = os.path.basename(current_path)
        print(f"当前版本: {current_version}")
        
        # 读取版本信息
        version_file = f"{current_link}/{DeployConfig.VERSION_FILE}"
        if conn.run(f"test -f {version_file}", warn=True, hide=True).ok:
            version_data = conn.run(f"cat {version_file}", hide=True).stdout
            version_info = json.loads(version_data)
            
            print("
版本信息:")
            print(f"  版本号: {version_info['version']}")
            print(f"  部署时间: {version_info['date']}")
            
            if 'branch' in version_info:
                print(f"  分支: {version_info['branch']}")
            
            if 'commit' in version_info:
                print(f"  提交: {version_info['commit']}")
            
            if 'author' in version_info:
                print(f"  作者: {version_info['author']}")
            
            if 'message' in version_info:
                print(f"  提交信息: {version_info['message']}")
    else:
        print("警告: 当前版本链接不存在")
    
    # 列出所有可用版本
    releases_dir = f"{deploy_dir}/releases"
    if conn.run(f"test -d {releases_dir}", warn=True, hide=True).ok:
        versions = conn.run(f"ls -1 {releases_dir}", hide=True).stdout.strip().split('
')
        
        print("
可用版本:")
        for version in versions:
            if version:
                if conn.run(f"test -d {releases_dir}/{version}", warn=True, hide=True).ok:
                    version_file = f"{releases_dir}/{version}/{DeployConfig.VERSION_FILE}"
                    if conn.run(f"test -f {version_file}", warn=True, hide=True).ok:
                        data = conn.run(f"cat {version_file}", hide=True).stdout
                        info = json.loads(data)
                        print(f"  {version} - {info.get('date', 'Unknown date')}")
    
    # 读取版本历史
    version_history = f"{deploy_dir}/version_history.json"
    if conn.run(f"test -f {version_history}", warn=True, hide=True).ok:
        history_data = conn.run(f"cat {version_history}", hide=True).stdout
        history = json.loads(history_data)
        
        print("
部署历史:")
        for i, version in enumerate(history["versions"]):
            rollback = " (回滚)" if version.get("rollback", False) else ""
            print(f"  {i+1}. {version['version']} - {version['date']}{rollback}")
            if version.get("rollback", False) and "rollback_from" in version:
                print(f"     从版本 {version['rollback_from']} 回滚")
    
    return True

使用方法:

bash

# 构建应用包
fab build --app-dir /path/to/myapp --version v1.2.3

# 部署应用
fab deploy --host prod.example.com --package-path myapp_v1.2.3.tar.gz 
           --deploy-dir /var/www/myapp

# 查看部署状态
fab status --host prod.example.com --deploy-dir /var/www/myapp

# 回滚到上一个版本
fab rollback --host prod.example.com --deploy-dir /var/www/myapp

通过本章的详细介绍,应该已经掌握了Fabric的安装配置、核心API使用和实际应用示例。Fabric作为一个功能强大的系统批量运维管理工具,在服务器管理、应用部署和自动化运维中发挥着重要作用。结合前几章介绍的pexpect和paramiko,已经了解了Python在系统运维领域的强大能力,能够根据实际需求选择合适的工具,高效完成各种运维任务。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容