系统批量运维管理器 Fabric 详解
Fabric是一个基于Python的命令行工具和库,用于简化系统管理和应用部署任务,特别是在多台服务器上执行命令和传输文件。与前几章介绍的pexpect和paramiko相比,Fabric提供了更高层次的抽象,使得批量运维任务的编写更加简洁高效。本章将详细介绍Fabric的安装、配置和使用方法,以及在实际运维场景中的应用示例。
7.1 Fabric 的安装
Fabric是一个Python库,因此可以通过pip包管理器轻松安装。本节将介绍不同环境下Fabric的安装方法和注意事项。
通过pip安装
最简单的安装方式是使用pip:
bash
# 安装Fabric 2.x版本(推荐)
pip install fabric
# 或者指定版本安装
pip install fabric==2.6.0
如果需要安装旧版本的Fabric 1.x,可以使用:
bash
pip install "fabric<2.0"
系统包管理器安装
在Linux系统中,也可以使用系统包管理器安装Fabric:
Debian/Ubuntu:
bash
sudo apt-get install fabric
CentOS/RHEL:
bash
sudo yum install fabric
注意: 系统包管理器安装的版本可能比较旧,建议使用pip安装最新版本。
依赖关系
Fabric 2.x依赖于以下Python库:
paramiko: SSH协议实现(前一章详细介绍过)
invoke: 命令执行库
patchwork: 文件和路径操作工具集
安装Fabric时,pip会自动安装这些依赖库。
验证安装
安装完成后,可以通过以下命令验证安装是否成功:
bash
# 查看版本
fab --version
# 或者在Python中导入验证
python -c "import fabric; print(fabric.__version__)"
Fabric 1.x 与 2.x 的区别
Fabric有两个主要版本系列:1.x和2.x,它们之间有显著差异。本章主要介绍Fabric 2.x版本,但也会提及两个版本的主要区别:
API变化:Fabric 2.x重新设计了API,更加面向对象,而Fabric 1.x更加函数化。
命令行工具:Fabric 2.x使用fab
命令,但调用方式与1.x不同。
配置文件:2.x使用fabfile.py
或任何Python模块,而1.x更严格地依赖fabfile.py
。
依赖库:2.x基于invoke和patchwork,而1.x是独立的实现。
如果你有使用Fabric 1.x的代码,需要做一些调整才能在Fabric 2.x中运行。
7.2 fab 的常用参数
Fabric提供了fab
命令行工具,用于执行定义在fabfile中的任务。本节将介绍fab
命令的常用参数和使用方法。
基本语法
bash
fab [options] task1 task2 ...
常用选项
以下是fab
命令的常用选项:
选项 | 描述 |
---|---|
-l, --list |
列出可用的任务 |
-f, --file |
指定fabfile路径,默认为当前目录下的fabfile.py |
-H, --hosts |
指定要连接的主机(逗号分隔) |
-i, --identity |
指定SSH密钥文件 |
-p, --password |
指定SSH密码(不推荐,安全风险) |
-u, --user |
指定SSH用户名 |
-P, --parallel |
并行执行任务 |
-S, --skip-bad-hosts |
遇到连接错误时跳过该主机继续执行 |
-t, --timeout |
设置命令执行超时时间 |
-T, --command-timeout |
设置命令执行超时时间 |
-r, --reject-unknown-hosts |
拒绝未知的SSH主机 |
-D, --disable-known-hosts |
不加载用户的known_hosts文件 |
-k, --no-agent |
不使用SSH代理 |
-V, --version |
显示Fabric版本 |
-h, --help |
显示帮助信息 |
示例用法
bash
# 列出可用任务
fab -l
# 执行特定任务
fab deploy
# 在指定主机上执行任务
fab -H web1.example.com,web2.example.com deploy
# 使用特定用户和密钥文件执行任务
fab -H web.example.com -u admin -i ~/.ssh/id_rsa deploy
# 并行执行任务
fab -H web1,web2,web3,web4 -P deploy
# 指定自定义fabfile
fab -f /path/to/my_fabfile.py deploy
传递参数给任务
在Fabric 2.x中,可以通过以下方式将参数传递给任务:
bash
# 传递位置参数
fab task_name arg1 arg2
# 传递关键字参数
fab task_name:key1=value1,key2=value2
# 组合使用
fab task_name:arg1,key1=value1
环境变量
Fabric也支持通过环境变量配置:
bash
# 设置SSH用户名
export FABRIC_USER=admin
# 设置SSH密码(不推荐)
export FABRIC_PASSWORD=secret
# 设置默认主机
export FABRIC_HOSTS=web1.example.com,web2.example.com
7.3 fabfile 的编写
fabfile是Fabric的核心,包含了要执行的任务定义。本节将详细介绍如何编写有效的fabfile,包括配置、API使用和常见模式。
7.3.1 全局属性设定
在Fabric 2.x中,可以通过Connection
对象和配置文件设置全局属性。
基本结构
一个典型的fabfile.py结构如下:
python
from fabric import Connection, Config
from fabric import task
from invoke import task as invoke_task
import os
# 全局配置
config = Config(
overrides={
'connect_kwargs': {
'key_filename': ['/path/to/ssh/key'],
},
}
)
# 定义任务
@task
def hello(c):
"""打印Hello World"""
print("Hello World!")
@task
def deploy(c):
"""部署应用"""
c.run('git pull')
c.run('pip install -r requirements.txt')
c.run('service myapp restart')
配置选项
以下是常用的全局配置选项:
SSH连接配置
python
config = Config(
overrides={
'user': 'admin', # SSH用户名
'port': 22, # SSH端口
'connect_kwargs': {
'key_filename': ['/path/to/ssh/key'], # SSH密钥
'password': 'password', # SSH密码(不推荐)
'timeout': 10, # 连接超时时间
'allow_agent': True, # 允许使用SSH代理
'look_for_keys': True, # 查找SSH密钥
},
}
)
运行时配置
python
config = Config(
overrides={
'run': {
'echo': True, # 回显执行的命令
'pty': True, # 分配伪终端
'warn': True, # 命令失败时发出警告而不是抛出异常
'hide': False, # 隐藏命令输出
'timeout': 30, # 命令执行超时时间
'shell': '/bin/bash', # 指定shell
},
}
)
文件传输配置
python
config = Config(
overrides={
'transfer': {
'preserve_mode': True, # 保留文件权限
'preserve_times': True, # 保留文件时间戳
},
}
)
主机组管理
通过定义主机组,可以更方便地管理多台服务器:
python
# 定义主机组
web_servers = ['web1.example.com', 'web2.example.com', 'web3.example.com']
db_servers = ['db1.example.com', 'db2.example.com']
all_servers = web_servers + db_servers
# 使用主机组执行任务
@task
def deploy_web(c):
for host in web_servers:
conn = Connection(host, config=config)
conn.run('git pull')
conn.run('service nginx restart')
@task
def deploy_all(c):
for host in all_servers:
conn = Connection(host, config=config)
conn.run('apt-get update')
conn.run('apt-get upgrade -y')
7.3.2 常用 API
Fabric 2.x提供了丰富的API,以下是最常用的几个核心API。
Connection 类
Connection
是Fabric 2.x的核心类,代表与单个主机的连接。
python
from fabric import Connection
# 创建连接
c = Connection('web.example.com', user='admin', port=22)
# 使用配置创建连接
c = Connection('web.example.com', config=config)
# 执行命令
result = c.run('ls -la')
print(result.stdout)
# 使用sudo执行命令
result = c.sudo('apt-get update')
# 上传文件
c.put('local_file.txt', '/remote/path/file.txt')
# 下载文件
c.get('/remote/path/file.txt', 'local_file.txt')
Group 类
Group
类用于管理多台主机,实现并行操作:
python
from fabric import Group
# 创建主机组
servers = Group('web1.example.com', 'web2.example.com', 'web3.example.com')
# 在所有主机上执行命令
results = servers.run('uptime')
for connection, result in results.items():
print(f"{connection}: {result.stdout}")
# 在所有主机上执行sudo命令
servers.sudo('service nginx restart')
# 上传文件到所有主机
servers.put('local_file.txt', '/remote/path/')
run() 和 sudo() 方法
run()
和sudo()
是最常用的命令执行方法:
python
# 基本用法
result = c.run('ls -la')
# 设置工作目录
result = c.run('ls -la', cwd='/var/www')
# 处理命令失败
try:
c.run('command_that_might_fail', warn=False)
except UnexpectedExit:
print("命令执行失败")
# 捕获输出但不显示
result = c.run('grep "secret" /etc/config', hide=True)
print(f"命令输出: {result.stdout}")
# 以root身份执行命令
c.sudo('apt-get update')
# 以特定用户身份执行
c.sudo('restart app', user='appuser')
# 提供sudo密码
c.sudo('apt-get upgrade -y', password='sudopassword')
put() 和 get() 方法
用于文件传输的方法:
python
# 上传文件
c.put('local_file.txt', '/remote/path/file.txt')
# 上传目录
c.put('local_dir/', '/remote/path/')
# 保留文件权限
c.put('script.sh', '/remote/script.sh', preserve_mode=True)
# 下载文件
c.get('/remote/path/file.txt', 'local_file.txt')
# 下载目录
c.get('/remote/dir/', 'local_dir/')
local() 方法
执行本地命令:
python
from invoke import run as local
# 执行本地命令
result = local('ls -la')
print(result.stdout)
# 捕获输出
git_hash = local('git rev-parse HEAD', hide=True).stdout.strip()
print(f"当前Git提交: {git_hash}")
7.3.3 示例1: 查看本地与远程主机信息
下面是一个实际的fabfile示例,用于收集本地和远程主机的系统信息:
python
from fabric import Connection, Config, task
from invoke import run as local
import os
import platform
import json
from datetime import datetime
# 全局配置
config = Config(
overrides={
'run': {
'echo': True,
'pty': True,
},
}
)
@task
def system_info(c):
"""收集本地系统信息"""
print("正在收集本地系统信息...")
info = {}
# 基本系统信息
info['hostname'] = platform.node()
info['platform'] = platform.platform()
info['system'] = platform.system()
info['processor'] = platform.processor()
info['python_version'] = platform.python_version()
# 详细系统信息
if platform.system() == 'Linux':
# 内核版本
info['kernel'] = local('uname -r', hide=True).stdout.strip()
# CPU信息
cpu_info = local('cat /proc/cpuinfo | grep "model name" | head -1', hide=True).stdout
if cpu_info:
info['cpu_model'] = cpu_info.split(':')[1].strip()
# 内存信息
mem_info = local('free -h', hide=True).stdout
info['memory'] = mem_info
# 磁盘使用情况
disk_info = local('df -h', hide=True).stdout
info['disk_usage'] = disk_info
# 网络接口
net_info = local('ip -br addr show', hide=True).stdout
info['network'] = net_info
# 已安装包数量
if os.path.exists('/usr/bin/apt'):
pkg_count = local('dpkg -l | grep "^ii" | wc -l', hide=True).stdout.strip()
info['package_count'] = pkg_count
info['package_manager'] = 'apt'
elif os.path.exists('/usr/bin/yum'):
pkg_count = local('rpm -qa | wc -l', hide=True).stdout.strip()
info['package_count'] = pkg_count
info['package_manager'] = 'yum'
# 保存信息到文件
with open(f'local_system_info_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json', 'w') as f:
json.dump(info, f, indent=2)
# 打印信息摘要
print("
系统信息摘要:")
print(f"主机名: {info['hostname']}")
print(f"平台: {info['platform']}")
print(f"处理器: {info['processor']}")
print(f"Python版本: {info['python_version']}")
return info
@task
def remote_system_info(c, hosts):
"""收集远程主机系统信息
参数:
hosts: 逗号分隔的主机列表
"""
host_list = hosts.split(',')
results = {}
for host in host_list:
print(f"
正在收集远程主机 {host} 的系统信息...")
try:
conn = Connection(host, config=config)
info = {}
# 基本系统信息
info['hostname'] = conn.run('hostname', hide=True).stdout.strip()
info['platform'] = conn.run('uname -a', hide=True).stdout.strip()
# CPU信息
cpu_info = conn.run('cat /proc/cpuinfo | grep "model name" | head -1', hide=True).stdout
if cpu_info:
info['cpu_model'] = cpu_info.split(':')[1].strip()
# 内核版本
info['kernel'] = conn.run('uname -r', hide=True).stdout.strip()
# 操作系统信息
if conn.run('test -f /etc/os-release', warn=True, hide=True).ok:
os_info = conn.run('cat /etc/os-release | grep "PRETTY_NAME"', hide=True).stdout
if os_info:
info['os'] = os_info.split('=')[1].strip('"'')
# 内存信息
mem_info = conn.run('free -h', hide=True).stdout
info['memory'] = mem_info
# 磁盘使用情况
disk_info = conn.run('df -h', hide=True).stdout
info['disk_usage'] = disk_info
# 网络接口
try:
net_info = conn.run('ip -br addr show', hide=True).stdout
info['network'] = net_info
except:
# 某些老系统可能没有ip命令
net_info = conn.run('ifconfig', hide=True, warn=True).stdout
info['network'] = net_info
# 已安装包数量
if conn.run('which dpkg', warn=True, hide=True).ok:
pkg_count = conn.run('dpkg -l | grep "^ii" | wc -l', hide=True).stdout.strip()
info['package_count'] = pkg_count
info['package_manager'] = 'apt'
elif conn.run('which rpm', warn=True, hide=True).ok:
pkg_count = conn.run('rpm -qa | wc -l', hide=True).stdout.strip()
info['package_count'] = pkg_count
info['package_manager'] = 'yum'
# 运行时间
uptime = conn.run('uptime', hide=True).stdout.strip()
info['uptime'] = uptime
# 登录用户
who = conn.run('who', hide=True).stdout.strip()
info['logged_users'] = who
# 最后10个登录用户
last = conn.run('last -n 10', hide=True).stdout.strip()
info['last_logins'] = last
results[host] = info
# 打印信息摘要
print(f"
主机 {host} 信息摘要:")
print(f"主机名: {info['hostname']}")
print(f"平台: {info['platform']}")
if 'os' in info:
print(f"操作系统: {info['os']}")
if 'cpu_model' in info:
print(f"CPU: {info['cpu_model']}")
print(f"内核: {info['kernel']}")
print(f"运行时间: {info['uptime']}")
except Exception as e:
print(f"收集 {host} 信息时出错: {str(e)}")
results[host] = {"error": str(e)}
# 保存信息到文件
with open(f'remote_system_info_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json', 'w') as f:
json.dump(results, f, indent=2)
return results
@task
def compare_hosts(c, hosts):
"""比较多台主机的系统配置
参数:
hosts: 逗号分隔的主机列表
"""
# 收集远程主机信息
host_info = remote_system_info(c, hosts)
# 比较核心配置
host_list = hosts.split(',')
print("
========== 主机配置比较 ==========")
# 比较内核版本
print("
== 内核版本 ==")
for host in host_list:
if host in host_info and 'error' not in host_info[host]:
print(f"{host}: {host_info[host].get('kernel', 'N/A')}")
# 比较CPU
print("
== CPU型号 ==")
for host in host_list:
if host in host_info and 'error' not in host_info[host]:
print(f"{host}: {host_info[host].get('cpu_model', 'N/A')}")
# 比较操作系统
print("
== 操作系统 ==")
for host in host_list:
if host in host_info and 'error' not in host_info[host]:
print(f"{host}: {host_info[host].get('os', 'N/A')}")
# 比较包管理器和包数量
print("
== 软件包 ==")
for host in host_list:
if host in host_info and 'error' not in host_info[host]:
pkg_mgr = host_info[host].get('package_manager', 'unknown')
pkg_count = host_info[host].get('package_count', 'N/A')
print(f"{host}: {pkg_mgr} - {pkg_count} 个包")
print("
================================")
使用方法:
bash
# 查看本地系统信息
fab system-info
# 查看远程主机信息
fab remote-system-info --hosts web1.example.com,web2.example.com
# 比较多台主机配置
fab compare-hosts --hosts web1.example.com,web2.example.com,db1.example.com
7.3.4 示例2: 动态获取远程目录列表
下面是一个示例,展示如何动态获取远程服务器的目录列表,并支持交互式选择:
python
from fabric import Connection, Config, task
import os
from pprint import pprint
# 全局配置
config = Config(
overrides={
'run': {
'echo': False, # 不回显命令
'hide': True, # 隐藏输出
},
}
)
@task
def list_directories(c, host, path="/var/www"):
"""列出远程主机指定路径下的目录
参数:
host: 远程主机地址
path: 要列出目录的路径,默认为/var/www
"""
print(f"正在连接到 {host}...")
conn = Connection(host, config=config)
print(f"列出 {host}:{path} 中的目录...")
result = conn.run(f'find {path} -maxdepth 1 -type d | sort')
# 解析目录列表
directories = result.stdout.strip().split('
')
# 过滤掉路径本身
directories = [d for d in directories if d != path]
if not directories:
print(f"在 {path} 中没有找到目录")
return []
print(f"在 {path} 中找到 {len(directories)} 个目录:")
for i, directory in enumerate(directories, 1):
print(f"{i}. {directory}")
return directories
@task
def explore_directories(c, host, path="/var/www"):
"""交互式浏览远程主机目录
参数:
host: 远程主机地址
path: 起始路径,默认为/var/www
"""
conn = Connection(host, config=config)
current_path = path
while True:
print(f"
当前路径: {current_path}")
# 获取当前路径下的目录和文件
dirs_result = conn.run(f'find {current_path} -maxdepth 1 -type d | sort', hide=True)
files_result = conn.run(f'find {current_path} -maxdepth 1 -type f | sort', hide=True)
directories = dirs_result.stdout.strip().split('
')
files = files_result.stdout.strip().split('
')
# 过滤掉空行和当前目录
directories = [d for d in directories if d and d != current_path]
files = [f for f in files if f]
# 显示目录
print("
目录:")
if directories:
for i, directory in enumerate(directories, 1):
dir_name = os.path.basename(directory)
print(f"{i}. {dir_name}/")
else:
print(" (无子目录)")
# 显示文件
print("
文件:")
if files:
for i, file in enumerate(files, 1):
file_name = os.path.basename(file)
# 获取文件大小
size_result = conn.run(f'stat -c "%s" "{file}"', hide=True)
size = int(size_result.stdout.strip())
size_human = get_human_size(size)
print(f"{i}. {file_name} ({size_human})")
else:
print(" (无文件)")
# 用户操作菜单
print("
操作:")
print("cd [number] - 进入选择的目录")
print("cat [number] - 查看文件内容")
print("stat [number] - 查看文件详细信息")
print("cd .. - 返回上级目录")
print("q - 退出")
choice = input("
请输入命令: ").strip()
if choice.lower() == 'q':
break
elif choice.lower() == 'cd ..':
current_path = os.path.dirname(current_path)
elif choice.startswith('cd '):
try:
index = int(choice.split()[1]) - 1
if 0 <= index < len(directories):
current_path = directories[index]
else:
print("无效的目录索引")
except (ValueError, IndexError):
print("无效的命令格式")
elif choice.startswith('cat '):
try:
index = int(choice.split()[1]) - 1
if 0 <= index < len(files):
file_path = files[index]
file_size_result = conn.run(f'stat -c "%s" "{file_path}"', hide=True)
file_size = int(file_size_result.stdout.strip())
if file_size > 1024 * 1024: # 大于1MB
confirm = input(f"文件大小为 {get_human_size(file_size)},确定要查看? (y/n): ")
if confirm.lower() != 'y':
continue
result = conn.run(f'cat "{file_path}"', hide=False)
# 如果文件内容很长,可以使用分页显示
if len(result.stdout.split('
')) > 24:
print("
(按 q 退出查看)")
import subprocess
try:
subprocess.run(['less'], input=result.stdout.encode())
except:
print(result.stdout)
else:
print("
" + result.stdout)
else:
print("无效的文件索引")
except (ValueError, IndexError):
print("无效的命令格式")
elif choice.startswith('stat '):
try:
index = int(choice.split()[1]) - 1
if 0 <= index < len(files):
file_path = files[index]
result = conn.run(f'stat "{file_path}"', hide=False)
print("
" + result.stdout)
else:
print("无效的文件索引")
except (ValueError, IndexError):
print("无效的命令格式")
else:
print("未知命令")
def get_human_size(size_bytes):
"""将字节大小转换为人类可读的格式"""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes/1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes/(1024*1024):.1f} MB"
else:
return f"{size_bytes/(1024*1024*1024):.1f} GB"
@task
def check_disk_usage(c, hosts, threshold=80):
"""检查多台主机的磁盘使用情况
参数:
hosts: 逗号分隔的主机列表
threshold: 使用率警告阈值,默认80%
"""
host_list = hosts.split(',')
issues_found = False
for host in host_list:
print(f"
检查主机 {host} 的磁盘使用情况...")
try:
conn = Connection(host, config=config)
result = conn.run('df -h', hide=True)
# 解析df输出
lines = result.stdout.strip().split('
')
headers = lines[0]
filesystems = lines[1:]
print(f"
{host} 的磁盘使用情况:")
print(headers)
for fs in filesystems:
print(fs)
# 检查使用率是否超过阈值
parts = fs.split()
if len(parts) >= 5:
usage = parts[4].strip('%')
try:
usage_int = int(usage)
if usage_int >= threshold:
print(f"警告: {parts[0]} 使用率 {usage}% 超过阈值 {threshold}%")
issues_found = True
except ValueError:
# 忽略无法解析的行
pass
except Exception as e:
print(f"检查 {host} 时出错: {str(e)}")
return not issues_found # 如果没有发现问题,返回True
使用方法:
bash
# 列出远程主机上的目录
fab list-directories --host web.example.com --path /var/www/html
# 交互式浏览远程主机目录
fab explore-directories --host web.example.com
# 检查多台主机的磁盘使用情况
fab check-disk-usage --hosts web1.example.com,web2.example.com,db1.example.com --threshold 90
7.3.5 示例3: 网关模式文件上传与执行
下面是一个示例,演示如何通过网关(堡垒机)连接到内部服务器,并上传文件和执行命令:
python
from fabric import Connection, Config, task
import os
import io
import time
from tempfile import NamedTemporaryFile
# 全局配置
config = Config(
overrides={
'run': {
'pty': True,
},
}
)
@task
def gateway_command(c, gateway, target, command, gateway_user=None, target_user=None):
"""通过网关服务器在目标服务器上执行命令
参数:
gateway: 网关服务器地址
target: 目标服务器地址
command: 要执行的命令
gateway_user: 网关服务器用户名(可选)
target_user: 目标服务器用户名(可选)
"""
# 构建连接参数
gateway_kwargs = {}
target_kwargs = {}
if gateway_user:
gateway_kwargs['user'] = gateway_user
if target_user:
target_kwargs['user'] = target_user
print(f"连接到网关服务器 {gateway}...")
with Connection(gateway, config=config, **gateway_kwargs) as gateway_conn:
print(f"通过网关连接到目标服务器 {target}...")
# 创建从网关到目标服务器的ProxyJump连接
# 这需要在网关服务器上配置了到目标服务器的SSH访问权限
with gateway_conn.forward_local(
local_port=0, # 自动选择一个本地端口
remote_port=22,
remote_host=target
) as tunnel:
target_conn = Connection(
"localhost",
port=tunnel.local_port,
config=config,
**target_kwargs
)
print(f"在目标服务器 {target} 上执行命令: {command}")
result = target_conn.run(command, hide=False)
print(f"命令执行完成,退出状态码: {result.return_code}")
return result
@task
def gateway_file_upload(c, gateway, target, local_path, remote_path,
gateway_user=None, target_user=None):
"""通过网关服务器上传文件到目标服务器
参数:
gateway: 网关服务器地址
target: 目标服务器地址
local_path: 本地文件路径
remote_path: 目标服务器上的文件路径
gateway_user: 网关服务器用户名(可选)
target_user: 目标服务器用户名(可选)
"""
# 检查本地文件是否存在
if not os.path.exists(local_path):
print(f"错误: 本地文件 {local_path} 不存在")
return False
# 构建连接参数
gateway_kwargs = {}
target_kwargs = {}
if gateway_user:
gateway_kwargs['user'] = gateway_user
if target_user:
target_kwargs['user'] = target_user
print(f"连接到网关服务器 {gateway}...")
with Connection(gateway, config=config, **gateway_kwargs) as gateway_conn:
# 先将文件上传到网关服务器的临时目录
temp_path = f"/tmp/fabric_upload_{os.path.basename(local_path)}_{int(time.time())}"
print(f"上传文件到网关服务器: {local_path} -> {temp_path}")
gateway_conn.put(local_path, temp_path)
print(f"通过网关连接到目标服务器 {target}...")
# 创建从网关到目标服务器的ProxyJump连接
with gateway_conn.forward_local(
local_port=0,
remote_port=22,
remote_host=target
) as tunnel:
target_conn = Connection(
"localhost",
port=tunnel.local_port,
config=config,
**target_kwargs
)
# 在网关上创建一个加密的压缩包
print("创建临时压缩文件...")
temp_tar = f"{temp_path}.tar.gz"
gateway_conn.run(f"tar -czf {temp_tar} -C {os.path.dirname(temp_path)} {os.path.basename(temp_path)}")
# 获取压缩包内容
get_result = io.BytesIO()
gateway_conn.get(temp_tar, get_result)
get_result.seek(0)
# 将压缩包通过SSH直接传输到目标服务器
print(f"将文件传输到目标服务器: {remote_path}")
# 确保目标目录存在
target_dir = os.path.dirname(remote_path)
if target_dir:
target_conn.run(f"mkdir -p {target_dir}", warn=True)
# 使用临时文件
with NamedTemporaryFile() as temp_file:
temp_file.write(get_result.read())
temp_file.flush()
# 上传到目标服务器
target_conn.put(temp_file.name, f"{remote_path}.tar.gz")
# 在目标服务器上解压
target_conn.run(f"tar -xzf {remote_path}.tar.gz -C {os.path.dirname(remote_path)}")
target_conn.run(f"mv {os.path.dirname(remote_path)}/{os.path.basename(temp_path)} {remote_path}")
target_conn.run(f"rm {remote_path}.tar.gz")
# 清理网关上的临时文件
gateway_conn.run(f"rm {temp_path} {temp_tar}")
print(f"文件成功上传到 {target}:{remote_path}")
# 验证文件大小
local_size = os.path.getsize(local_path)
remote_size_result = target_conn.run(f"stat -c %s {remote_path}", hide=True)
remote_size = int(remote_size_result.stdout.strip())
if local_size == remote_size:
print(f"文件大小验证成功: {local_size} 字节")
return True
else:
print(f"警告: 文件大小不匹配! 本地: {local_size} 字节, 远程: {remote_size} 字节")
return False
@task
def deploy_via_gateway(c, gateway, targets, app_path, deploy_path, restart=False):
"""通过网关部署应用到多个目标服务器
参数:
gateway: 网关服务器地址
targets: 逗号分隔的目标服务器列表
app_path: 本地应用路径
deploy_path: 部署路径
restart: 是否重启应用
"""
target_list = targets.split(',')
success_count = 0
error_count = 0
# 打包应用
app_name = os.path.basename(app_path.rstrip('/'))
local_tar = f"/tmp/{app_name}.tar.gz"
print(f"打包应用 {app_path} -> {local_tar}")
os.system(f"tar -czf {local_tar} -C {os.path.dirname(app_path)} {app_name}")
# 连接到网关
print(f"连接到网关服务器 {gateway}...")
with Connection(gateway, config=config) as gateway_conn:
# 将应用包上传到网关
gateway_temp = f"/tmp/{app_name}.tar.gz"
print(f"上传应用包到网关: {local_tar} -> {gateway_temp}")
gateway_conn.put(local_tar, gateway_temp)
# 部署到每个目标服务器
for target in target_list:
print(f"
开始部署到 {target}...")
try:
# 建立到目标服务器的连接
with gateway_conn.forward_local(
local_port=0,
remote_port=22,
remote_host=target
) as tunnel:
target_conn = Connection(
"localhost",
port=tunnel.local_port,
config=config
)
# 确保部署目录存在
target_conn.run(f"mkdir -p {os.path.dirname(deploy_path)}", warn=True)
# 备份现有应用
backup_path = f"{deploy_path}_backup_{time.strftime('%Y%m%d_%H%M%S')}"
if target_conn.run(f"test -d {deploy_path}", warn=True).ok:
print(f"备份现有应用: {deploy_path} -> {backup_path}")
target_conn.run(f"cp -a {deploy_path} {backup_path}")
# 部署新版本
target_temp = f"/tmp/{app_name}.tar.gz"
print(f"传输应用包到目标服务器: {target_temp}")
# 直接使用SCP通过网关传输
print("使用SCP通过网关传输应用包...")
gateway_conn.run(f"scp {gateway_temp} {target}:{target_temp}")
print("解压应用包...")
target_conn.run(f"rm -rf {deploy_path}")
target_conn.run(f"mkdir -p {deploy_path}")
target_conn.run(f"tar -xzf {target_temp} -C {os.path.dirname(deploy_path)}")
# 设置权限
print("设置应用权限...")
target_conn.run(f"chmod -R 755 {deploy_path}")
# 清理临时文件
target_conn.run(f"rm {target_temp}")
# 如果需要,重启应用
if restart:
print("重启应用...")
# 这里可以根据实际应用的重启方式调整命令
if target_conn.run(f"test -f {deploy_path}/restart.sh", warn=True).ok:
target_conn.run(f"cd {deploy_path} && ./restart.sh")
else:
print("未找到restart.sh脚本,跳过重启")
print(f"部署到 {target} 成功")
success_count += 1
except Exception as e:
print(f"部署到 {target} 失败: {str(e)}")
error_count += 1
# 清理网关上的临时文件
gateway_conn.run(f"rm {gateway_temp}")
# 清理本地临时文件
os.remove(local_tar)
print(f"
部署完成: {success_count} 成功, {error_count} 失败")
return success_count, error_count
使用方法:
bash
# 通过网关执行命令
fab gateway-command --gateway bastion.example.com --target internal.example.com
--command "df -h" --gateway-user admin --target-user user
# 通过网关上传文件
fab gateway-file-upload --gateway bastion.example.com --target internal.example.com
--local-path /path/to/local/file.txt
--remote-path /path/on/target/file.txt
# 通过网关部署应用到多个服务器
fab deploy-via-gateway --gateway bastion.example.com
--targets web1.internal,web2.internal,web3.internal
--app-path /path/to/myapp
--deploy-path /var/www/myapp
--restart
7.4 Fabric 应用示例
以下示例展示Fabric在实际运维场景中的应用,包括文件操作、服务器环境配置和代码发布管理。
7.4.1 示例1: 文件打包、上传与校验
以下示例实现了文件打包、上传和完整性校验功能:
python
from fabric import Connection, Config, task
import os
import hashlib
import tempfile
import time
from invoke import run as local
# 全局配置
config = Config(
overrides={
'run': {
'pty': True,
},
}
)
def calculate_md5(filepath):
"""计算文件的MD5哈希值"""
hash_md5 = hashlib.md5()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
@task
def package_files(c, source_dir, output_name=None):
"""打包文件或目录
参数:
source_dir: 源文件或目录路径
output_name: 输出文件名,默认为源目录名加时间戳
"""
if not os.path.exists(source_dir):
print(f"错误: 源路径 {source_dir} 不存在")
return None
# 确定输出文件名
if not output_name:
base_name = os.path.basename(source_dir.rstrip('/'))
output_name = f"{base_name}_{int(time.time())}.tar.gz"
if not output_name.endswith('.tar.gz'):
output_name += '.tar.gz'
# 创建临时目录
with tempfile.TemporaryDirectory() as temp_dir:
output_path = os.path.join(temp_dir, output_name)
# 打包文件
print(f"打包 {source_dir} -> {output_path}")
result = local(f"tar -czf {output_path} -C {os.path.dirname(source_dir)} {os.path.basename(source_dir.rstrip('/'))}", hide=True)
if result.ok:
# 计算MD5
md5 = calculate_md5(output_path)
print(f"打包完成,MD5: {md5}")
# 移动到当前目录
local(f"cp {output_path} ./")
local_path = f"./{output_name}"
print(f"打包文件已保存到: {local_path}")
return local_path, md5
else:
print(f"打包失败: {result.stderr}")
return None, None
@task
def upload_with_verify(c, host, package_path, remote_dir, md5=None):
"""上传文件并验证完整性
参数:
host: 目标主机
package_path: 本地包路径
remote_dir: 远程目录
md5: 预期的MD5值,如果为None则在上传前计算
"""
if not os.path.exists(package_path):
print(f"错误: 本地文件 {package_path} 不存在")
return False
# 如果未提供MD5,计算一个
if not md5:
print("计算本地文件MD5...")
md5 = calculate_md5(package_path)
print(f"本地文件MD5: {md5}")
# 连接到目标服务器
print(f"连接到 {host}...")
conn = Connection(host, config=config)
# 确保远程目录存在
conn.run(f"mkdir -p {remote_dir}", warn=True)
# 上传文件
remote_path = f"{remote_dir}/{os.path.basename(package_path)}"
print(f"上传文件: {package_path} -> {remote_path}")
start_time = time.time()
conn.put(package_path, remote_path)
upload_time = time.time() - start_time
# 计算上传速度
file_size = os.path.getsize(package_path)
speed_mbps = (file_size / 1024 / 1024) / upload_time
print(f"上传完成,耗时: {upload_time:.2f}秒,速度: {speed_mbps:.2f} MB/s")
# 在远程服务器上计算MD5
print("验证远程文件MD5...")
result = conn.run(f"md5sum {remote_path} | cut -d' ' -f1", hide=True)
remote_md5 = result.stdout.strip()
print(f"远程文件MD5: {remote_md5}")
# 比较MD5
if md5 == remote_md5:
print("验证成功: MD5匹配")
return True
else:
print("验证失败: MD5不匹配!")
return False
@task
def unpack_and_verify(c, host, package_path, extract_dir, md5=None):
"""上传、解压并验证包
参数:
host: 目标主机
package_path: 本地包路径
extract_dir: 解压目录
md5: 预期的MD5值
"""
# 上传并验证
if not upload_with_verify(c, host, package_path, "/tmp", md5):
print("上传验证失败,中止操作")
return False
# 连接到目标服务器
conn = Connection(host, config=config)
# 备份现有目录
remote_package = f"/tmp/{os.path.basename(package_path)}"
if conn.run(f"test -d {extract_dir}", warn=True).ok:
backup_dir = f"{extract_dir}_backup_{time.strftime('%Y%m%d_%H%M%S')}"
print(f"备份现有目录: {extract_dir} -> {backup_dir}")
conn.run(f"cp -a {extract_dir} {backup_dir}")
# 确保解压目录存在
conn.run(f"mkdir -p {extract_dir}", warn=True)
# 解压文件
print(f"解压文件到 {extract_dir}")
conn.run(f"tar -xzf {remote_package} -C {os.path.dirname(extract_dir)}")
# 验证解压后的文件
print("验证解压结果...")
original_filename = os.path.basename(package_path).replace('.tar.gz', '')
expected_path = f"{os.path.dirname(extract_dir)}/{original_filename}"
if conn.run(f"test -e {expected_path}", warn=True).ok:
# 如果解压目录与期望目录不同,需要移动
if expected_path != extract_dir:
print(f"移动文件: {expected_path} -> {extract_dir}")
conn.run(f"rm -rf {extract_dir}")
conn.run(f"mv {expected_path} {extract_dir}")
print(f"文件成功解压到 {extract_dir}")
# 清理临时文件
conn.run(f"rm {remote_package}")
return True
else:
print(f"解压失败: 找不到预期的路径 {expected_path}")
return False
@task
def distribute_package(c, hosts, source_dir, remote_dir, extract=True):
"""将文件打包并分发到多台服务器
参数:
hosts: 逗号分隔的主机列表
source_dir: 源文件或目录
remote_dir: 远程目录
extract: 是否解压文件
"""
host_list = hosts.split(',')
# 打包文件
package_path, md5 = package_files(c, source_dir)
if not package_path:
return False
success_count = 0
fail_count = 0
# 分发到每台主机
for host in host_list:
print(f"
处理主机: {host}")
try:
if extract:
# 上传、解压并验证
if unpack_and_verify(c, host, package_path, remote_dir, md5):
success_count += 1
else:
fail_count += 1
else:
# 仅上传并验证
if upload_with_verify(c, host, package_path, remote_dir, md5):
success_count += 1
else:
fail_count += 1
except Exception as e:
print(f"处理 {host} 时出错: {str(e)}")
fail_count += 1
print(f"
分发完成: {success_count} 成功, {fail_count} 失败")
return success_count, fail_count
使用方法:
bash
# 打包文件
fab package-files --source-dir /path/to/myapp
# 上传并验证文件
fab upload-with-verify --host web.example.com --package-path myapp.tar.gz --remote-dir /opt/apps
# 上传、解压并验证
fab unpack-and-verify --host web.example.com --package-path myapp.tar.gz --extract-dir /var/www/myapp
# 将文件分发到多台服务器
fab distribute-package --hosts web1.example.com,web2.example.com,web3.example.com
--source-dir /path/to/myapp --remote-dir /var/www/myapp
7.4.2 示例2: 部署LNMP业务服务环境
以下示例展示如何使用Fabric部署LNMP(Linux + Nginx + MySQL + PHP)环境:
python
from fabric import Connection, Config, task
import os
import time
# 全局配置
config = Config(
overrides={
'run': {
'pty': True,
},
}
)
@task
def install_lnmp(c, host, mysql_root_password, web_root="/var/www/html"):
"""在服务器上安装LNMP环境
参数:
host: 目标主机
mysql_root_password: MySQL root密码
web_root: Web根目录,默认为/var/www/html
"""
print(f"连接到 {host}...")
conn = Connection(host, config=config)
# 检查系统类型
if conn.run("which apt", warn=True, hide=True).ok:
# Debian/Ubuntu系统
install_lnmp_debian(conn, mysql_root_password, web_root)
elif conn.run("which yum", warn=True, hide=True).ok:
# CentOS/RHEL系统
install_lnmp_centos(conn, mysql_root_password, web_root)
else:
print("不支持的系统类型")
return False
return True
def install_lnmp_debian(conn, mysql_root_password, web_root):
"""在Debian/Ubuntu系统上安装LNMP环境"""
print("更新系统包...")
conn.sudo("apt-get update")
print("安装LNMP基础包...")
conn.sudo("DEBIAN_FRONTEND=noninteractive apt-get install -y nginx mysql-server php-fpm php-mysql php-cli php-curl php-gd php-mbstring php-xml php-zip unzip")
# 配置MySQL
print("配置MySQL...")
mysql_secure_installation = f"""
sudo mysql -u root <<EOF
ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY '{mysql_root_password}';
DELETE FROM mysql.user WHERE User='';
DELETE FROM mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');
DROP DATABASE IF EXISTS test;
DELETE FROM mysql.db WHERE Db='test' OR Db='test\_%';
FLUSH PRIVILEGES;
EOF
"""
conn.run(mysql_secure_installation)
# 配置PHP-FPM
print("配置PHP-FPM...")
conn.sudo("sed -i 's/;cgi.fix_pathinfo=1/cgi.fix_pathinfo=0/g' /etc/php/*/fpm/php.ini")
conn.sudo("systemctl restart php*-fpm.service")
# 配置Nginx
print("配置Nginx...")
nginx_conf = f"""
server {
{
listen 80 default_server;
listen [::]:80 default_server;
root {web_root};
index index.php index.html index.htm;
server_name _;
location / {
{
try_files $uri $uri/ =404;
}}
location ~ \.php$ {
{
include snippets/fastcgi-php.conf;
fastcgi_pass unix:/var/run/php/php7.4-fpm.sock;
}}
location ~ /\.ht {
{
deny all;
}}
}}
"""
# 创建配置文件
conn.sudo(f"mkdir -p {web_root}")
conn.run(f"echo '{nginx_conf}' > /tmp/default")
conn.sudo("mv /tmp/default /etc/nginx/sites-available/default")
# 创建测试页面
php_info = "<?php phpinfo(); ?>"
conn.run(f"echo '{php_info}' > /tmp/info.php")
conn.sudo(f"mv /tmp/info.php {web_root}/info.php")
conn.sudo(f"chown -R www-data:www-data {web_root}")
# 重启服务
print("重启服务...")
conn.sudo("systemctl restart nginx")
print("LNMP环境安装完成!")
print(f"MySQL root密码: {mysql_root_password}")
print(f"测试PHP: http://<server_ip>/info.php")
def install_lnmp_centos(conn, mysql_root_password, web_root):
"""在CentOS/RHEL系统上安装LNMP环境"""
print("更新系统包...")
conn.sudo("yum update -y")
# 安装EPEL仓库
conn.sudo("yum install -y epel-release")
# 安装Nginx
print("安装Nginx...")
conn.sudo("yum install -y nginx")
conn.sudo("systemctl enable nginx")
conn.sudo("systemctl start nginx")
# 安装MariaDB (MySQL)
print("安装MariaDB...")
conn.sudo("yum install -y mariadb-server")
conn.sudo("systemctl enable mariadb")
conn.sudo("systemctl start mariadb")
# 配置MySQL
mysql_secure_installation = f"""
sudo mysql -u root <<EOF
UPDATE mysql.user SET Password=PASSWORD('{mysql_root_password}') WHERE User='root';
DELETE FROM mysql.user WHERE User='';
DELETE FROM mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');
DROP DATABASE IF EXISTS test;
DELETE FROM mysql.db WHERE Db='test' OR Db='test\_%';
FLUSH PRIVILEGES;
EOF
"""
conn.run(mysql_secure_installation)
# 安装PHP
print("安装PHP...")
conn.sudo("yum install -y php php-fpm php-mysqlnd php-cli php-curl php-gd php-mbstring php-xml php-zip unzip")
conn.sudo("systemctl enable php-fpm")
conn.sudo("systemctl start php-fpm")
# 配置PHP-FPM
conn.sudo("sed -i 's/;cgi.fix_pathinfo=1/cgi.fix_pathinfo=0/g' /etc/php.ini")
conn.sudo("sed -i 's/user = apache/user = nginx/g' /etc/php-fpm.d/www.conf")
conn.sudo("sed -i 's/group = apache/group = nginx/g' /etc/php-fpm.d/www.conf")
conn.sudo("systemctl restart php-fpm")
# 配置Nginx
nginx_conf = f"""
server {
{
listen 80 default_server;
listen [::]:80 default_server;
root {web_root};
index index.php index.html index.htm;
server_name _;
location / {
{
try_files $uri $uri/ =404;
}}
location ~ \.php$ {
{
try_files $uri =404;
fastcgi_pass 127.0.0.1:9000;
fastcgi_index index.php;
fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
include fastcgi_params;
}}
location ~ /\.ht {
{
deny all;
}}
}}
"""
# 创建配置文件
conn.sudo(f"mkdir -p {web_root}")
conn.run(f"echo '{nginx_conf}' > /tmp/default.conf")
conn.sudo("mv /tmp/default.conf /etc/nginx/conf.d/default.conf")
# 创建测试页面
php_info = "<?php phpinfo(); ?>"
conn.run(f"echo '{php_info}' > /tmp/info.php")
conn.sudo(f"mv /tmp/info.php {web_root}/info.php")
conn.sudo(f"chown -R nginx:nginx {web_root}")
# 配置SELinux
if conn.run("which sestatus", warn=True, hide=True).ok:
status = conn.run("sestatus | grep 'SELinux status'", hide=True).stdout
if "enabled" in status:
print("配置SELinux...")
conn.sudo("setsebool -P httpd_can_network_connect 1")
# 配置防火墙
if conn.run("which firewall-cmd", warn=True, hide=True).ok:
print("配置防火墙...")
conn.sudo("firewall-cmd --permanent --zone=public --add-service=http")
conn.sudo("firewall-cmd --permanent --zone=public --add-service=https")
conn.sudo("firewall-cmd --reload")
# 重启服务
print("重启服务...")
conn.sudo("systemctl restart nginx")
print("LNMP环境安装完成!")
print(f"MySQL root密码: {mysql_root_password}")
print(f"测试PHP: http://<server_ip>/info.php")
@task
def deploy_website(c, host, source_dir, domain, web_root=None, db_name=None, db_user=None, db_password=None):
"""部署网站到LNMP环境
参数:
host: 目标主机
source_dir: 本地源代码目录
domain: 网站域名
web_root: Web根目录,如未指定则使用/var/www/{domain}
db_name: 数据库名称(可选)
db_user: 数据库用户名(可选)
db_password: 数据库密码(可选)
"""
if not web_root:
web_root = f"/var/www/{domain}"
# 检查源目录是否存在
if not os.path.isdir(source_dir):
print(f"错误: 源目录 {source_dir} 不存在")
return False
print(f"连接到 {host}...")
conn = Connection(host, config=config)
# 创建Web目录
print(f"创建Web目录: {web_root}")
conn.sudo(f"mkdir -p {web_root}")
# 上传网站文件
print("上传网站文件...")
local_tar = f"/tmp/{domain}.tar.gz"
remote_tar = f"/tmp/{domain}.tar.gz"
# 打包源目录
print(f"打包源目录: {source_dir} -> {local_tar}")
os.system(f"tar -czf {local_tar} -C {os.path.dirname(source_dir)} {os.path.basename(source_dir)}")
# 上传压缩包
print(f"上传网站文件: {local_tar} -> {remote_tar}")
conn.put(local_tar, remote_tar)
# 解压文件
print(f"解压文件到 {web_root}")
conn.sudo(f"tar -xzf {remote_tar} -C {os.path.dirname(web_root)}")
# 如果解压的目录名与目标目录名不同,则进行移动
source_name = os.path.basename(source_dir.rstrip('/'))
if source_name != os.path.basename(web_root.rstrip('/')):
extracted_path = f"{os.path.dirname(web_root)}/{source_name}"
conn.sudo(f"rm -rf {web_root}")
conn.sudo(f"mv {extracted_path} {web_root}")
# 设置权限
print("设置文件权限...")
if conn.run("which apt", warn=True, hide=True).ok:
# Debian/Ubuntu系统
conn.sudo(f"chown -R www-data:www-data {web_root}")
else:
# CentOS/RHEL系统
conn.sudo(f"chown -R nginx:nginx {web_root}")
conn.sudo(f"chmod -R 755 {web_root}")
# 如果有需要写入权限的目录,单独设置
for write_dir in ['uploads', 'cache', 'logs', 'tmp']:
if conn.run(f"test -d {web_root}/{write_dir}", warn=True, hide=True).ok:
conn.sudo(f"chmod -R 775 {web_root}/{write_dir}")
# 创建Nginx配置
print(f"配置Nginx虚拟主机: {domain}")
nginx_conf = f"""
server {
{
listen 80;
server_name {domain} www.{domain};
root {web_root};
index index.php index.html index.htm;
location / {
{
try_files $uri $uri/ /index.php?$args;
}}
location ~ \.php$ {
{
"""
if conn.run("which apt", warn=True, hide=True).ok:
# Debian/Ubuntu系统
nginx_conf += """
include snippets/fastcgi-php.conf;
fastcgi_pass unix:/var/run/php/php7.4-fpm.sock;
"""
else:
# CentOS/RHEL系统
nginx_conf += """
try_files $uri =404;
fastcgi_pass 127.0.0.1:9000;
fastcgi_index index.php;
fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
include fastcgi_params;
"""
nginx_conf += """
}
location ~ /\.ht {
deny all;
}
}
"""
# 写入配置文件
conn.run(f"echo '{nginx_conf}' > /tmp/{domain}.conf")
if conn.run("which apt", warn=True, hide=True).ok:
# Debian/Ubuntu系统
conn.sudo(f"mv /tmp/{domain}.conf /etc/nginx/sites-available/{domain}.conf")
conn.sudo(f"ln -sf /etc/nginx/sites-available/{domain}.conf /etc/nginx/sites-enabled/{domain}.conf")
else:
# CentOS/RHEL系统
conn.sudo(f"mv /tmp/{domain}.conf /etc/nginx/conf.d/{domain}.conf")
# 如果提供了数据库信息,创建数据库
if db_name and db_user and db_password:
print(f"创建数据库: {db_name} 和用户: {db_user}")
create_db_sql = f"""
sudo mysql -e "CREATE DATABASE IF NOT EXISTS {db_name};"
sudo mysql -e "CREATE USER IF NOT EXISTS '{db_user}'@'localhost' IDENTIFIED BY '{db_password}';"
sudo mysql -e "GRANT ALL PRIVILEGES ON {db_name}.* TO '{db_user}'@'localhost';"
sudo mysql -e "FLUSH PRIVILEGES;"
"""
conn.run(create_db_sql)
# 如果源目录中有SQL文件,导入数据库
if conn.run(f"find {web_root} -name '*.sql' | wc -l", hide=True).stdout.strip() != '0':
print("导入SQL文件...")
sql_files = conn.run(f"find {web_root} -name '*.sql'", hide=True).stdout.strip().split('
')
for sql_file in sql_files:
conn.run(f"sudo mysql {db_name} < {sql_file}")
# 重启Nginx
print("重启Nginx...")
conn.sudo("systemctl restart nginx")
# 清理临时文件
conn.run(f"rm {remote_tar}")
os.remove(local_tar)
print(f"网站部署完成: http://{domain}")
if db_name:
print(f"数据库信息: DB={db_name}, User={db_user}, Password={db_password}")
return True
使用方法:
bash
# 安装LNMP环境
fab install-lnmp --host web.example.com --mysql-root-password SecurePass123
# 部署网站
fab deploy-website --host web.example.com --source-dir /path/to/mysite
--domain example.com
--db-name mysite_db --db-user mysite_user --db-password DbPass123
7.4.3 示例3: 生产环境代码包发布管理
以下示例实现了完整的生产环境代码发布流程,包括构建、版本控制、部署和回滚:
python
from fabric import Connection, Config, task
import os
import time
import json
import tempfile
import hashlib
import shutil
from datetime import datetime
from invoke import run as local
# 全局配置
config = Config(
overrides={
'run': {
'pty': True,
},
}
)
# 发布配置
class DeployConfig:
VERSION_FILE = 'version.json' # 版本信息文件
KEEP_RELEASES = 5 # 保留的最近发布版本数
SHARED_DIRS = ['uploads', 'logs', 'cache'] # 共享目录
@task
def build(c, app_dir, version=None):
"""构建应用包
参数:
app_dir: 应用目录
version: 版本号,默认使用当前时间
"""
if not os.path.isdir(app_dir):
print(f"错误: 应用目录 {app_dir} 不存在")
return None
# 如果未指定版本号,使用时间戳
if not version:
version = datetime.now().strftime("%Y%m%d%H%M%S")
app_name = os.path.basename(app_dir.rstrip('/'))
build_dir = f"/tmp/build_{app_name}_{version}"
output_file = f"{app_name}_{version}.tar.gz"
# 清理之前的构建目录
if os.path.exists(build_dir):
shutil.rmtree(build_dir)
# 复制应用文件到构建目录
print(f"复制应用文件到构建目录: {app_dir} -> {build_dir}")
shutil.copytree(app_dir, build_dir, symlinks=True, ignore=shutil.ignore_patterns(
'*.git*', '*.svn*', '*.idea*', '*.vscode*', 'node_modules', '*.log', '*.tmp'
))
# 创建版本信息文件
version_info = {
'version': version,
'timestamp': int(time.time()),
'date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'commit': None
}
# 如果是Git仓库,获取提交信息
git_dir = os.path.join(app_dir, '.git')
if os.path.exists(git_dir):
try:
# 获取当前分支
branch = local("cd " + app_dir + " && git rev-parse --abbrev-ref HEAD", hide=True).stdout.strip()
version_info['branch'] = branch
# 获取最近提交
commit = local("cd " + app_dir + " && git rev-parse HEAD", hide=True).stdout.strip()
version_info['commit'] = commit
# 获取提交作者
author = local("cd " + app_dir + " && git log -1 --pretty=format:'%an'", hide=True).stdout.strip()
version_info['author'] = author
# 获取提交信息
message = local("cd " + app_dir + " && git log -1 --pretty=format:'%s'", hide=True).stdout.strip()
version_info['message'] = message
# 检查是否有未提交的更改
status = local("cd " + app_dir + " && git status --porcelain", hide=True).stdout
version_info['clean'] = len(status.strip()) == 0
except Exception as e:
print(f"获取Git信息时出错: {str(e)}")
# 写入版本信息文件
with open(os.path.join(build_dir, DeployConfig.VERSION_FILE), 'w') as f:
json.dump(version_info, f, indent=2)
# 创建共享目录的占位符
for shared_dir in DeployConfig.SHARED_DIRS:
shared_path = os.path.join(build_dir, shared_dir)
if os.path.exists(shared_path):
shutil.rmtree(shared_path)
os.makedirs(shared_path, exist_ok=True)
with open(os.path.join(shared_path, '.gitkeep'), 'w') as f:
f.write('')
# 打包构建目录
print(f"打包应用: {build_dir} -> {output_file}")
local(f"tar -czf {output_file} -C {os.path.dirname(build_dir)} {os.path.basename(build_dir)}")
# 计算包的MD5
md5 = calculate_md5(output_file)
print(f"构建完成: {output_file} (MD5: {md5})")
# 清理构建目录
shutil.rmtree(build_dir)
return {
'file': output_file,
'version': version,
'md5': md5,
'info': version_info
}
def calculate_md5(filepath):
"""计算文件的MD5哈希值"""
hash_md5 = hashlib.md5()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
@task
def deploy(c, host, package_path, deploy_dir, shared_dir=None):
"""部署应用到服务器
参数:
host: 目标主机
package_path: 本地包路径
deploy_dir: 部署目录
shared_dir: 共享目录,默认为{deploy_dir}/shared
"""
if not os.path.exists(package_path):
print(f"错误: 本地文件 {package_path} 不存在")
return False
# 如果未指定共享目录,使用默认路径
if not shared_dir:
shared_dir = f"{deploy_dir}/shared"
# 提取版本号
filename = os.path.basename(package_path)
version = filename.split('_')[-1].replace('.tar.gz', '')
print(f"部署版本 {version} 到 {host}:{deploy_dir}")
# 连接到目标服务器
conn = Connection(host, config=config)
# 创建必要的目录
print("创建部署目录结构...")
conn.sudo(f"mkdir -p {deploy_dir}/releases/{version}")
conn.sudo(f"mkdir -p {shared_dir}")
# 上传应用包
print(f"上传应用包: {package_path}")
remote_package = f"/tmp/{filename}"
conn.put(package_path, remote_package)
# 解压应用包
print(f"解压应用包到 {deploy_dir}/releases/{version}")
conn.sudo(f"tar -xzf {remote_package} -C {deploy_dir}/releases/{version} --strip-components=1")
# 创建共享目录链接
print("创建共享目录链接...")
for shared_folder in DeployConfig.SHARED_DIRS:
# 确保共享目录存在
conn.sudo(f"mkdir -p {shared_dir}/{shared_folder}")
# 删除发布目录中的对应目录
conn.sudo(f"rm -rf {deploy_dir}/releases/{version}/{shared_folder}")
# 创建符号链接
conn.sudo(f"ln -s {shared_dir}/{shared_folder} {deploy_dir}/releases/{version}/{shared_folder}")
# 更新当前版本链接
print("更新当前版本链接...")
conn.sudo(f"ln -sfn {deploy_dir}/releases/{version} {deploy_dir}/current")
# 更新版本历史
print("更新版本历史...")
version_history = f"{deploy_dir}/version_history.json"
# 读取版本信息
version_file = f"{deploy_dir}/releases/{version}/{DeployConfig.VERSION_FILE}"
version_data = conn.sudo(f"cat {version_file}", hide=True).stdout
version_info = json.loads(version_data)
# 检查版本历史文件是否存在
history_exists = conn.run(f"test -f {version_history}", warn=True, hide=True).ok
if history_exists:
# 读取现有历史
history_data = conn.sudo(f"cat {version_history}", hide=True).stdout
history = json.loads(history_data)
else:
# 创建新的历史记录
history = {"versions": []}
# 添加当前版本
deploy_info = {
"version": version,
"timestamp": int(time.time()),
"date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"info": version_info
}
history["versions"].insert(0, deploy_info)
# 只保留最近N个版本
if len(history["versions"]) > DeployConfig.KEEP_RELEASES:
old_versions = history["versions"][DeployConfig.KEEP_RELEASES:]
history["versions"] = history["versions"][:DeployConfig.KEEP_RELEASES]
# 清理旧版本
print(f"清理旧版本...")
for old_ver in old_versions:
old_version = old_ver["version"]
old_path = f"{deploy_dir}/releases/{old_version}"
if conn.run(f"test -d {old_path}", warn=True, hide=True).ok:
conn.sudo(f"rm -rf {old_path}")
print(f"已删除旧版本: {old_version}")
# 保存更新后的历史
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file:
json.dump(history, temp_file, indent=2)
temp_file_path = temp_file.name
conn.put(temp_file_path, "/tmp/version_history.json")
conn.sudo(f"mv /tmp/version_history.json {version_history}")
os.unlink(temp_file_path)
# 清理临时文件
conn.run(f"rm {remote_package}")
print(f"部署完成: {host}:{deploy_dir}/current -> releases/{version}")
return True
@task
def rollback(c, host, deploy_dir, steps=1):
"""回滚到之前的版本
参数:
host: 目标主机
deploy_dir: 部署目录
steps: 回滚步数,默认为1
"""
print(f"回滚部署: {host}:{deploy_dir}, 步数: {steps}")
# 连接到目标服务器
conn = Connection(host, config=config)
# 读取版本历史
version_history = f"{deploy_dir}/version_history.json"
if not conn.run(f"test -f {version_history}", warn=True, hide=True).ok:
print(f"错误: 版本历史文件不存在: {version_history}")
return False
# 读取历史记录
history_data = conn.sudo(f"cat {version_history}", hide=True).stdout
history = json.loads(history_data)
if len(history["versions"]) <= steps:
print(f"错误: 没有足够的历史版本可供回滚")
return False
# 获取当前版本和目标版本
current_version = history["versions"][0]["version"]
target_version = history["versions"][steps]["version"]
print(f"回滚版本: {current_version} -> {target_version}")
# 检查目标版本目录是否存在
target_path = f"{deploy_dir}/releases/{target_version}"
if not conn.run(f"test -d {target_path}", warn=True, hide=True).ok:
print(f"错误: 目标版本目录不存在: {target_path}")
return False
# 更新当前版本链接
print("更新当前版本链接...")
conn.sudo(f"ln -sfn {target_path} {deploy_dir}/current")
# 更新版本历史
print("更新版本历史...")
# 将回滚的版本移动到历史记录的顶部
rollback_version = history["versions"].pop(steps)
rollback_version["rollback"] = True
rollback_version["rollback_from"] = current_version
rollback_version["timestamp"] = int(time.time())
rollback_version["date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
history["versions"].insert(0, rollback_version)
# 保存更新后的历史
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file:
json.dump(history, temp_file, indent=2)
temp_file_path = temp_file.name
conn.put(temp_file_path, "/tmp/version_history.json")
conn.sudo(f"mv /tmp/version_history.json {version_history}")
os.unlink(temp_file_path)
print(f"回滚完成: {host}:{deploy_dir}/current -> releases/{target_version}")
return True
@task
def status(c, host, deploy_dir):
"""查看部署状态
参数:
host: 目标主机
deploy_dir: 部署目录
"""
print(f"查看部署状态: {host}:{deploy_dir}")
# 连接到目标服务器
conn = Connection(host, config=config)
# 检查部署目录是否存在
if not conn.run(f"test -d {deploy_dir}", warn=True, hide=True).ok:
print(f"错误: 部署目录不存在: {deploy_dir}")
return False
# 检查当前版本链接
current_link = f"{deploy_dir}/current"
if conn.run(f"test -L {current_link}", warn=True, hide=True).ok:
current_path = conn.run(f"readlink {current_link}", hide=True).stdout.strip()
current_version = os.path.basename(current_path)
print(f"当前版本: {current_version}")
# 读取版本信息
version_file = f"{current_link}/{DeployConfig.VERSION_FILE}"
if conn.run(f"test -f {version_file}", warn=True, hide=True).ok:
version_data = conn.run(f"cat {version_file}", hide=True).stdout
version_info = json.loads(version_data)
print("
版本信息:")
print(f" 版本号: {version_info['version']}")
print(f" 部署时间: {version_info['date']}")
if 'branch' in version_info:
print(f" 分支: {version_info['branch']}")
if 'commit' in version_info:
print(f" 提交: {version_info['commit']}")
if 'author' in version_info:
print(f" 作者: {version_info['author']}")
if 'message' in version_info:
print(f" 提交信息: {version_info['message']}")
else:
print("警告: 当前版本链接不存在")
# 列出所有可用版本
releases_dir = f"{deploy_dir}/releases"
if conn.run(f"test -d {releases_dir}", warn=True, hide=True).ok:
versions = conn.run(f"ls -1 {releases_dir}", hide=True).stdout.strip().split('
')
print("
可用版本:")
for version in versions:
if version:
if conn.run(f"test -d {releases_dir}/{version}", warn=True, hide=True).ok:
version_file = f"{releases_dir}/{version}/{DeployConfig.VERSION_FILE}"
if conn.run(f"test -f {version_file}", warn=True, hide=True).ok:
data = conn.run(f"cat {version_file}", hide=True).stdout
info = json.loads(data)
print(f" {version} - {info.get('date', 'Unknown date')}")
# 读取版本历史
version_history = f"{deploy_dir}/version_history.json"
if conn.run(f"test -f {version_history}", warn=True, hide=True).ok:
history_data = conn.run(f"cat {version_history}", hide=True).stdout
history = json.loads(history_data)
print("
部署历史:")
for i, version in enumerate(history["versions"]):
rollback = " (回滚)" if version.get("rollback", False) else ""
print(f" {i+1}. {version['version']} - {version['date']}{rollback}")
if version.get("rollback", False) and "rollback_from" in version:
print(f" 从版本 {version['rollback_from']} 回滚")
return True
使用方法:
bash
# 构建应用包
fab build --app-dir /path/to/myapp --version v1.2.3
# 部署应用
fab deploy --host prod.example.com --package-path myapp_v1.2.3.tar.gz
--deploy-dir /var/www/myapp
# 查看部署状态
fab status --host prod.example.com --deploy-dir /var/www/myapp
# 回滚到上一个版本
fab rollback --host prod.example.com --deploy-dir /var/www/myapp
通过本章的详细介绍,应该已经掌握了Fabric的安装配置、核心API使用和实际应用示例。Fabric作为一个功能强大的系统批量运维管理工具,在服务器管理、应用部署和自动化运维中发挥着重要作用。结合前几章介绍的pexpect和paramiko,已经了解了Python在系统运维领域的强大能力,能够根据实际需求选择合适的工具,高效完成各种运维任务。
暂无评论内容