系统批量运维管理器 paramiko 详解
Paramiko是Python的一个强大的SSH协议库,提供了SSH客户端和服务器的功能,是进行远程服务器管理、自动化运维的重要工具。与前一章介绍的pexpect不同,paramiko不依赖于交互式程序,而是直接实现了SSH协议,提供了更加稳定和灵活的远程操作能力。本章将详细介绍paramiko的安装、核心组件及其在系统批量运维中的应用。
6.1 paramiko 的安装
Paramiko是一个纯Python实现的SSH协议库,安装简便,兼容性良好。以下是详细的安装步骤和注意事项。
使用pip安装
最简单的安装方式是通过pip包管理器:
bash
pip install paramiko
对于特定Python版本,可以使用:
bash
pip3 install paramiko # 指定使用Python 3
系统包管理器安装
在各种Linux发行版中,也可以使用系统包管理器安装:
Debian/Ubuntu:
bash
sudo apt-get install python3-paramiko
CentOS/RHEL:
bash
sudo yum install python3-paramiko
Fedora:
bash
sudo dnf install python3-paramiko
依赖关系
Paramiko依赖于以下Python库:
cryptography: 提供密码学算法支持
bcrypt: 用于密码散列
pynacl: 提供加密功能
安装paramiko时,pip通常会自动安装这些依赖。如果出现问题,可以手动安装:
bash
pip install cryptography bcrypt pynacl
验证安装
安装完成后,可以通过以下Python代码验证paramiko是否正确安装:
python
import paramiko
print(paramiko.__version__)
如果显示版本号,说明安装成功。
可能遇到的问题及解决方案
加密库编译问题:
在某些系统上,cryptography库需要编译,可能会遇到缺少编译工具或开发库的问题。解决方案:
bash
# Debian/Ubuntu
sudo apt-get install build-essential libssl-dev libffi-dev python3-dev
# CentOS/RHEL
sudo yum install gcc openssl-devel libffi-devel python3-devel
Windows平台安装问题:
在Windows上,可能需要安装Visual C++ Build Tools。最简单的方法是使用预编译的wheel包:
bash
pip install --only-binary=:all: paramiko
版本兼容性问题:
如果遇到版本兼容性问题,可以尝试安装特定版本:
bash
pip install paramiko==2.7.2 # 安装特定版本
源码安装
对于需要最新开发版本或者特定修改的用户,也可以从源码安装:
bash
git clone https://github.com/paramiko/paramiko.git
cd paramiko
pip install -e .
6.2 paramiko 的核心组件
Paramiko提供了多个核心组件,用于实现SSH客户端和服务器的功能。下面将详细介绍这些组件的使用方法和关键特性。
6.2.1 SSHClient 类
SSHClient
是paramiko最常用的类之一,提供了类似于OpenSSH的高级SSH客户端功能。
基本使用
以下是使用SSHClient
的基本示例:
python
import paramiko
# 创建SSH客户端实例
client = paramiko.SSHClient()
# 设置未知主机密钥的策略
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# 连接到远程服务器
client.connect(
hostname='example.com',
port=22,
username='user',
password='password'
)
# 执行命令
stdin, stdout, stderr = client.exec_command('ls -l')
# 获取命令输出
output = stdout.read().decode()
error = stderr.read().decode()
# 打印输出
print("输出:", output)
if error:
print("错误:", error)
finally:
# 关闭连接
client.close()
主要方法和参数
SSHClient
类提供了以下重要方法:
connect(hostname, port=22, username=None, password=None, pkey=None, …)
建立与SSH服务器的连接。主要参数包括:
hostname
: 主机名或IP地址
port
: SSH端口,默认为22
username
: 用户名
password
: 密码(与pkey二选一)
pkey
: 私钥对象(与password二选一)
key_filename
: 私钥文件路径
timeout
: 连接超时时间
allow_agent
: 是否允许使用SSH代理
look_for_keys
: 是否在~/.ssh中查找密钥文件
exec_command(command, bufsize=-1, timeout=None, …)
在远程服务器上执行命令,并返回stdin, stdout和stderr的文件对象。
open_sftp()
打开SFTP会话,返回SFTPClient
对象,用于文件传输操作。
set_missing_host_key_policy(policy)
设置处理未知主机密钥的策略。常用的策略包括:
paramiko.AutoAddPolicy()
: 自动添加新主机密钥
paramiko.RejectPolicy()
: 拒绝未知主机
paramiko.WarningPolicy()
: 记录警告但接受新主机
close()
关闭SSH连接。
高级功能:使用密钥认证
使用SSH密钥认证通常比密码认证更安全:
python
import paramiko
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 方法1:指定私钥文件路径
client.connect(
hostname='example.com',
username='user',
key_filename='/path/to/private_key'
)
# 方法2:使用RSAKey对象
key = paramiko.RSAKey.from_private_key_file('/path/to/private_key', password='key_password')
client.connect(
hostname='example.com',
username='user',
pkey=key
)
处理主机密钥
SSH安全的一个关键方面是验证主机密钥,防止中间人攻击。Paramiko提供了几种处理主机密钥的方式:
python
import paramiko
import os
client = paramiko.SSHClient()
# 方法1:自动添加(适合测试环境)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 方法2:从known_hosts加载
client.load_system_host_keys() # 加载系统级known_hosts
client.load_host_keys(os.path.expanduser('~/.ssh/known_hosts')) # 加载用户级known_hosts
# 方法3:自定义处理策略
class MyPolicy(paramiko.MissingHostKeyPolicy):
def missing_host_key(self, client, hostname, key):
print(f"警告:未知主机 {hostname}!")
print(f"指纹:{key.get_fingerprint().hex()}")
response = input("是否继续连接? (yes/no): ")
if response.lower() == 'yes':
client._host_keys.add(hostname, key.get_name(), key)
client._host_keys_filename = os.path.expanduser('~/.ssh/known_hosts')
client.save_host_keys(client._host_keys_filename)
return
raise paramiko.SSHException("主机密钥未被验证")
client.set_missing_host_key_policy(MyPolicy())
6.2.2 SFTPClient 类
SFTPClient
提供了SFTP协议的实现,用于安全的文件传输操作。
基本使用
通常,SFTPClient
对象是通过SSHClient
的open_sftp
方法获取的:
python
import paramiko
# 创建SSH客户端
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('example.com', username='user', password='password')
# 打开SFTP会话
sftp = client.open_sftp()
try:
# 上传文件
sftp.put('local_file.txt', 'remote_file.txt')
# 下载文件
sftp.get('remote_file2.txt', 'local_file2.txt')
# 列出目录内容
file_list = sftp.listdir('.')
print("远程目录内容:", file_list)
finally:
# 关闭SFTP会话
sftp.close()
# 关闭SSH连接
client.close()
主要方法
SFTPClient
类提供了以下重要方法:
put(localpath, remotepath, callback=None, confirm=True)
将本地文件上传到远程服务器。参数说明:
localpath
: 本地文件路径
remotepath
: 远程文件路径
callback
: 进度回调函数,形如func(bytes_transferred, total_bytes)
confirm
: 是否检查远程文件大小以确认传输成功
get(remotepath, localpath, callback=None)
从远程服务器下载文件到本地。参数说明:
remotepath
: 远程文件路径
localpath
: 本地文件路径
callback
: 进度回调函数
listdir(path='.')
列出指定目录的内容,返回文件名列表。
mkdir(path, mode=511)
在远程服务器上创建目录。
remove(path)
删除远程文件。
rename(oldpath, newpath)
重命名远程文件或目录。
stat(path)
获取远程文件的状态,类似于os.stat()。
chmod(path, mode)
更改远程文件的权限。
chown(path, uid, gid)
更改远程文件的所有者和组。
处理大文件和显示传输进度
对于大文件传输,可以使用回调函数显示进度:
python
import paramiko
import os
def progress_callback(bytes_transferred, total_bytes):
percentage = (bytes_transferred / total_bytes) * 100
print(f"
传输进度: {percentage:.2f}%", end="")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('example.com', username='user', password='password')
sftp = client.open_sftp()
try:
# 获取本地文件大小
local_file = 'large_file.zip'
file_size = os.path.getsize(local_file)
print(f"开始上传文件: {local_file} ({file_size} 字节)")
# 上传文件并显示进度
sftp.put(local_file, 'remote_large_file.zip', callback=progress_callback)
print("
文件上传完成!")
finally:
sftp.close()
client.close()
递归操作目录
Paramiko的SFTP客户端没有直接提供递归目录操作的方法,但可以自己实现:
python
import paramiko
import os
def upload_directory(sftp, local_dir, remote_dir):
"""递归上传目录内容"""
# 确保远程目录存在
try:
sftp.stat(remote_dir)
except FileNotFoundError:
sftp.mkdir(remote_dir)
# 遍历本地目录
for item in os.listdir(local_dir):
local_path = os.path.join(local_dir, item)
remote_path = os.path.join(remote_dir, item)
if os.path.isfile(local_path):
# 上传文件
sftp.put(local_path, remote_path)
print(f"上传文件: {local_path} -> {remote_path}")
elif os.path.isdir(local_path):
# 递归上传子目录
upload_directory(sftp, local_path, remote_path)
def download_directory(sftp, remote_dir, local_dir):
"""递归下载目录内容"""
# 确保本地目录存在
if not os.path.exists(local_dir):
os.makedirs(local_dir)
# 遍历远程目录
for item in sftp.listdir(remote_dir):
remote_path = os.path.join(remote_dir, item)
local_path = os.path.join(local_dir, item)
try:
# 检查是否为目录
sftp.stat(remote_path)
# 尝试作为目录列出内容
try:
sftp.listdir(remote_path)
# 是目录,递归下载
download_directory(sftp, remote_path, local_path)
except (IOError, PermissionError):
# 是文件,直接下载
sftp.get(remote_path, local_path)
print(f"下载文件: {remote_path} -> {local_path}")
except (IOError, PermissionError):
# 是文件,直接下载
sftp.get(remote_path, local_path)
print(f"下载文件: {remote_path} -> {local_path}")
# 使用示例
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('example.com', username='user', password='password')
sftp = client.open_sftp()
try:
# 上传整个目录
upload_directory(sftp, './local_folder', '/remote/folder')
# 下载整个目录
download_directory(sftp, '/remote/folder', './downloaded_folder')
finally:
sftp.close()
client.close()
6.3 paramiko 应用示例
下面通过实际的应用示例,展示paramiko在系统批量运维中的强大功能。
6.3.1 实现密钥方式登录远程主机
使用SSH密钥登录是一种更安全的认证方式,下面是一个完整的实现示例:
python
#!/usr/bin/env python3
"""
使用SSH密钥登录远程主机并执行命令
"""
import paramiko
import os
import sys
import argparse
import getpass
import logging
import socket
import time
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('ssh_key_login')
class SSHKeyLogin:
def __init__(self, hostname, username, key_file=None, port=22, passphrase=None):
"""初始化SSH密钥登录客户端
Args:
hostname: 远程主机名或IP
username: 用户名
key_file: 私钥文件路径,默认为~/.ssh/id_rsa
port: SSH端口,默认为22
passphrase: 私钥密码,如果有的话
"""
self.hostname = hostname
self.username = username
self.port = port
self.key_file = key_file or os.path.expanduser('~/.ssh/id_rsa')
self.passphrase = passphrase
self.client = None
def connect(self):
"""连接到远程主机"""
try:
# 创建SSH客户端
self.client = paramiko.SSHClient()
# 加载系统主机密钥
self.client.load_system_host_keys()
# 对于未知的主机,添加密钥
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 加载私钥
try:
key = paramiko.RSAKey.from_private_key_file(self.key_file, password=self.passphrase)
except paramiko.ssh_exception.PasswordRequiredException:
# 如果私钥需要密码但未提供
if not self.passphrase:
self.passphrase = getpass.getpass(f"请输入私钥'{self.key_file}'的密码: ")
key = paramiko.RSAKey.from_private_key_file(self.key_file, password=self.passphrase)
# 建立连接
logger.info(f"正在连接到 {self.username}@{self.hostname}:{self.port}")
self.client.connect(
hostname=self.hostname,
port=self.port,
username=self.username,
pkey=key,
timeout=10
)
logger.info(f"成功连接到 {self.hostname}")
return True
except paramiko.AuthenticationException:
logger.error(f"认证失败: {self.username}@{self.hostname}")
return False
except paramiko.SSHException as e:
logger.error(f"SSH连接错误: {str(e)}")
return False
except socket.error as e:
logger.error(f"网络连接错误: {str(e)}")
return False
except Exception as e:
logger.error(f"连接时发生错误: {str(e)}")
return False
def execute_command(self, command, timeout=30):
"""在远程主机上执行命令
Args:
command: 要执行的命令
timeout: 命令超时时间,默认30秒
Returns:
tuple: (exit_status, stdout, stderr)
"""
if not self.client:
logger.error("未连接到远程主机")
return -1, "", "未连接到远程主机"
try:
logger.info(f"执行命令: {command}")
# 执行命令
stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
# 获取退出状态
exit_status = stdout.channel.recv_exit_status()
# 读取输出
stdout_content = stdout.read().decode().strip()
stderr_content = stderr.read().decode().strip()
if exit_status == 0:
logger.info("命令执行成功")
else:
logger.warning(f"命令执行失败,退出状态: {exit_status}")
if stderr_content:
logger.warning(f"错误输出: {stderr_content}")
return exit_status, stdout_content, stderr_content
except socket.timeout:
logger.error(f"命令执行超时: {command}")
return -1, "", "命令执行超时"
except Exception as e:
logger.error(f"执行命令时发生错误: {str(e)}")
return -1, "", str(e)
def close(self):
"""关闭SSH连接"""
if self.client:
self.client.close()
logger.info(f"已关闭与 {self.hostname} 的连接")
def main():
parser = argparse.ArgumentParser(description='使用SSH密钥登录远程主机并执行命令')
parser.add_argument('-H', '--host', required=True, help='远程主机名或IP')
parser.add_argument('-u', '--user', required=True, help='用户名')
parser.add_argument('-k', '--key', help='私钥文件路径,默认为~/.ssh/id_rsa')
parser.add_argument('-p', '--port', type=int, default=22, help='SSH端口,默认为22')
parser.add_argument('-c', '--command', help='要执行的命令')
parser.add_argument('-f', '--file', help='从文件读取要执行的命令')
parser.add_argument('--passphrase', help='私钥密码')
args = parser.parse_args()
# 检查命令或命令文件
if not args.command and not args.file:
parser.error("必须提供要执行的命令(-c)或命令文件(-f)")
# 创建SSH连接
ssh = SSHKeyLogin(
hostname=args.host,
username=args.user,
key_file=args.key,
port=args.port,
passphrase=args.passphrase
)
try:
# 连接到远程主机
if not ssh.connect():
sys.exit(1)
# 执行命令
if args.command:
# 执行单个命令
exit_status, stdout, stderr = ssh.execute_command(args.command)
print("
--- 命令输出 ---")
if stdout:
print(stdout)
if stderr:
print("
--- 错误输出 ---")
print(stderr)
if exit_status != 0:
print(f"
命令失败,退出状态: {exit_status}")
sys.exit(exit_status)
elif args.file:
# 从文件读取命令并执行
try:
with open(args.file, 'r') as f:
commands = [line.strip() for line in f if line.strip() and not line.startswith('#')]
for i, cmd in enumerate(commands, 1):
print(f"
=== 执行命令 {i}/{len(commands)}: {cmd} ===")
exit_status, stdout, stderr = ssh.execute_command(cmd)
print("
--- 命令输出 ---")
if stdout:
print(stdout)
if stderr:
print("
--- 错误输出 ---")
print(stderr)
if exit_status != 0:
print(f"
命令失败,退出状态: {exit_status}")
if i < len(commands):
if input("是否继续执行后续命令? (y/n): ").lower() != 'y':
break
except FileNotFoundError:
logger.error(f"找不到命令文件: {args.file}")
sys.exit(1)
finally:
# 关闭连接
ssh.close()
if __name__ == "__main__":
main()
这个示例实现了以下功能:
使用SSH密钥登录远程主机
支持执行单个命令或从文件读取多个命令
详细的日志记录和错误处理
交互式提示输入私钥密码(如果需要)
使用方法示例:
bash
# 执行单个命令
python ssh_key_login.py -H server.example.com -u username -c "df -h"
# 从文件读取命令执行
python ssh_key_login.py -H server.example.com -u username -f commands.txt
# 指定私钥文件
python ssh_key_login.py -H server.example.com -u username -k ~/.ssh/custom_key -c "ls -l"
6.3.2 实现堡垒机模式下的远程命令执行
在企业环境中,经常需要通过堡垒机(跳板机)来访问内部服务器。以下示例展示了如何使用paramiko实现通过堡垒机执行远程命令:
python
#!/usr/bin/env python3
"""
通过堡垒机执行远程命令
"""
import paramiko
import os
import sys
import logging
import argparse
import getpass
import time
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('bastion_executor')
class BastionExecutor:
def __init__(self, bastion_host, bastion_user, target_host, target_user,
bastion_key=None, target_key=None, bastion_password=None,
target_password=None, bastion_port=22, target_port=22):
"""初始化堡垒机命令执行器
Args:
bastion_host: 堡垒机主机名或IP
bastion_user: 堡垒机用户名
target_host: 目标主机名或IP
target_user: 目标主机用户名
bastion_key: 堡垒机私钥文件路径
target_key: 目标主机私钥文件路径
bastion_password: 堡垒机密码
target_password: 目标主机密码
bastion_port: 堡垒机SSH端口
target_port: 目标主机SSH端口
"""
self.bastion_host = bastion_host
self.bastion_user = bastion_user
self.bastion_port = bastion_port
self.bastion_key = bastion_key
self.bastion_password = bastion_password
self.target_host = target_host
self.target_user = target_user
self.target_port = target_port
self.target_key = target_key
self.target_password = target_password
self.bastion_client = None
self.target_client = None
self.transport = None
def connect_to_bastion(self):
"""连接到堡垒机"""
try:
logger.info(f"正在连接到堡垒机: {self.bastion_user}@{self.bastion_host}:{self.bastion_port}")
self.bastion_client = paramiko.SSHClient()
self.bastion_client.load_system_host_keys()
self.bastion_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 尝试使用密钥登录
if self.bastion_key:
try:
self.bastion_client.connect(
hostname=self.bastion_host,
port=self.bastion_port,
username=self.bastion_user,
key_filename=self.bastion_key,
timeout=10
)
logger.info(f"使用密钥成功连接到堡垒机 {self.bastion_host}")
return True
except paramiko.AuthenticationException:
logger.warning("使用密钥认证堡垒机失败,尝试密码认证")
# 如果没有提供密钥或密钥认证失败,尝试密码认证
if not self.bastion_password:
self.bastion_password = getpass.getpass(f"请输入堡垒机 {self.bastion_user}@{self.bastion_host} 的密码: ")
self.bastion_client.connect(
hostname=self.bastion_host,
port=self.bastion_port,
username=self.bastion_user,
password=self.bastion_password,
timeout=10
)
logger.info(f"使用密码成功连接到堡垒机 {self.bastion_host}")
return True
except Exception as e:
logger.error(f"连接堡垒机失败: {str(e)}")
if self.bastion_client:
self.bastion_client.close()
return False
def connect_to_target(self):
"""通过堡垒机连接到目标主机"""
if not self.bastion_client:
logger.error("未连接到堡垒机")
return False
try:
logger.info(f"通过堡垒机连接到目标主机: {self.target_user}@{self.target_host}:{self.target_port}")
# 获取堡垒机的transport
self.transport = self.bastion_client.get_transport()
# 创建从堡垒机到目标主机的通道
dst_addr = (self.target_host, self.target_port)
src_addr = (self.bastion_host, self.bastion_port)
channel = self.transport.open_channel("direct-tcpip", dst_addr, src_addr)
# 创建到目标主机的SSH连接
self.target_client = paramiko.SSHClient()
self.target_client.load_system_host_keys()
self.target_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 尝试使用密钥登录目标主机
if self.target_key:
try:
key = paramiko.RSAKey.from_private_key_file(self.target_key)
self.target_client.connect(
hostname=self.target_host,
port=self.target_port,
username=self.target_user,
pkey=key,
sock=channel,
timeout=10
)
logger.info(f"使用密钥成功连接到目标主机 {self.target_host}")
return True
except paramiko.AuthenticationException:
logger.warning("使用密钥认证目标主机失败,尝试密码认证")
# 如果没有提供密钥或密钥认证失败,尝试密码认证
if not self.target_password:
self.target_password = getpass.getpass(f"请输入目标主机 {self.target_user}@{self.target_host} 的密码: ")
self.target_client.connect(
hostname=self.target_host,
port=self.target_port,
username=self.target_user,
password=self.target_password,
sock=channel,
timeout=10
)
logger.info(f"使用密码成功连接到目标主机 {self.target_host}")
return True
except Exception as e:
logger.error(f"连接目标主机失败: {str(e)}")
return False
def execute_command(self, command, timeout=30):
"""在目标主机上执行命令
Args:
command: 要执行的命令
timeout: 命令超时时间,默认30秒
Returns:
tuple: (exit_status, stdout, stderr)
"""
if not self.target_client:
logger.error("未连接到目标主机")
return -1, "", "未连接到目标主机"
try:
logger.info(f"在目标主机 {self.target_host} 上执行命令: {command}")
# 执行命令
stdin, stdout, stderr = self.target_client.exec_command(command, timeout=timeout)
# 获取退出状态
exit_status = stdout.channel.recv_exit_status()
# 读取输出
stdout_content = stdout.read().decode().strip()
stderr_content = stderr.read().decode().strip()
if exit_status == 0:
logger.info("命令执行成功")
else:
logger.warning(f"命令执行失败,退出状态: {exit_status}")
if stderr_content:
logger.warning(f"错误输出: {stderr_content}")
return exit_status, stdout_content, stderr_content
except Exception as e:
logger.error(f"执行命令时发生错误: {str(e)}")
return -1, "", str(e)
def close(self):
"""关闭所有连接"""
if self.target_client:
self.target_client.close()
logger.info(f"已关闭与目标主机 {self.target_host} 的连接")
if self.bastion_client:
self.bastion_client.close()
logger.info(f"已关闭与堡垒机 {self.bastion_host} 的连接")
def execute_commands(self, commands, continue_on_error=False):
"""执行多个命令
Args:
commands: 命令列表
continue_on_error: 如果为True,则在命令失败时继续执行
Returns:
list: 包含每个命令结果的元组列表[(exit_status, stdout, stderr), ...]
"""
results = []
for i, cmd in enumerate(commands, 1):
logger.info(f"执行命令 {i}/{len(commands)}: {cmd}")
exit_status, stdout, stderr = self.execute_command(cmd)
results.append((exit_status, stdout, stderr))
# 如果命令失败且不继续执行,则中断
if exit_status != 0 and not continue_on_error:
logger.warning(f"命令 '{cmd}' 失败,退出状态: {exit_status},停止执行后续命令")
break
return results
def main():
parser = argparse.ArgumentParser(description='通过堡垒机执行远程命令')
# 堡垒机参数
parser.add_argument('--bastion-host', required=True, help='堡垒机主机名或IP')
parser.add_argument('--bastion-user', required=True, help='堡垒机用户名')
parser.add_argument('--bastion-key', help='堡垒机私钥文件路径')
parser.add_argument('--bastion-password', help='堡垒机密码')
parser.add_argument('--bastion-port', type=int, default=22, help='堡垒机SSH端口,默认22')
# 目标主机参数
parser.add_argument('--target-host', required=True, help='目标主机名或IP')
parser.add_argument('--target-user', required=True, help='目标主机用户名')
parser.add_argument('--target-key', help='目标主机私钥文件路径')
parser.add_argument('--target-password', help='目标主机密码')
parser.add_argument('--target-port', type=int, default=22, help='目标主机SSH端口,默认22')
# 命令参数
parser.add_argument('-c', '--command', help='要执行的命令')
parser.add_argument('-f', '--file', help='从文件读取要执行的命令')
parser.add_argument('--continue-on-error', action='store_true', help='命令失败时继续执行')
args = parser.parse_args()
# 检查命令或命令文件
if not args.command and not args.file:
parser.error("必须提供要执行的命令(-c)或命令文件(-f)")
# 创建执行器
executor = BastionExecutor(
bastion_host=args.bastion_host,
bastion_user=args.bastion_user,
bastion_key=args.bastion_key,
bastion_password=args.bastion_password,
bastion_port=args.bastion_port,
target_host=args.target_host,
target_user=args.target_user,
target_key=args.target_key,
target_password=args.target_password,
target_port=args.target_port
)
try:
# 连接到堡垒机
if not executor.connect_to_bastion():
sys.exit(1)
# 通过堡垒机连接到目标主机
if not executor.connect_to_target():
sys.exit(1)
# 执行命令
if args.command:
# 执行单个命令
exit_status, stdout, stderr = executor.execute_command(args.command)
print("
--- 命令输出 ---")
if stdout:
print(stdout)
if stderr:
print("
--- 错误输出 ---")
print(stderr)
if exit_status != 0:
print(f"
命令失败,退出状态: {exit_status}")
sys.exit(exit_status)
elif args.file:
# 从文件读取命令并执行
try:
with open(args.file, 'r') as f:
commands = [line.strip() for line in f if line.strip() and not line.startswith('#')]
results = executor.execute_commands(commands, args.continue_on_error)
for i, (exit_status, stdout, stderr) in enumerate(results, 1):
print(f"
=== 命令 {i}/{len(results)} 结果 ===")
print("
--- 命令输出 ---")
if stdout:
print(stdout)
if stderr:
print("
--- 错误输出 ---")
print(stderr)
print(f"
退出状态: {exit_status}")
except FileNotFoundError:
logger.error(f"找不到命令文件: {args.file}")
sys.exit(1)
finally:
# 关闭连接
executor.close()
if __name__ == "__main__":
main()
这个示例实现了以下功能:
先连接到堡垒机,然后通过堡垒机连接到目标主机
支持密码和密钥两种认证方式
支持执行单个命令或从文件读取多个命令
支持在命令失败时继续执行或停止执行后续命令
详细的日志记录和错误处理
使用方法示例:
bash
# 使用密码认证(交互式输入密码)
python bastion_executor.py --bastion-host bastion.example.com --bastion-user admin
--target-host internal.example.com --target-user user
-c "df -h"
# 使用密钥认证
python bastion_executor.py --bastion-host bastion.example.com --bastion-user admin
--bastion-key ~/.ssh/bastion_key
--target-host internal.example.com --target-user user
--target-key ~/.ssh/target_key
-f commands.txt
6.3.3 实现堡垒机模式下的远程文件上传
在运维工作中,通过堡垒机上传文件到内部服务器是一个常见需求。以下是一个实现示例:
python
#!/usr/bin/env python3
"""
通过堡垒机上传文件到目标主机
"""
import paramiko
import os
import sys
import logging
import argparse
import getpass
import socket
import time
import tempfile
import uuid
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('bastion_file_transfer')
class BastionFileTransfer:
def __init__(self, bastion_host, bastion_user, target_host, target_user,
bastion_key=None, target_key=None, bastion_password=None,
target_password=None, bastion_port=22, target_port=22):
"""初始化堡垒机文件传输工具
Args:
bastion_host: 堡垒机主机名或IP
bastion_user: 堡垒机用户名
target_host: 目标主机名或IP
target_user: 目标主机用户名
bastion_key: 堡垒机私钥文件路径
target_key: 目标主机私钥文件路径
bastion_password: 堡垒机密码
target_password: 目标主机密码
bastion_port: 堡垒机SSH端口
target_port: 目标主机SSH端口
"""
self.bastion_host = bastion_host
self.bastion_user = bastion_user
self.bastion_port = bastion_port
self.bastion_key = bastion_key
self.bastion_password = bastion_password
self.target_host = target_host
self.target_user = target_user
self.target_port = target_port
self.target_key = target_key
self.target_password = target_password
self.bastion_client = None
self.target_client = None
self.bastion_sftp = None
self.target_sftp = None
self.transport = None
# 临时目录和文件
self.temp_dir = None
def connect_to_bastion(self):
"""连接到堡垒机"""
try:
logger.info(f"正在连接到堡垒机: {self.bastion_user}@{self.bastion_host}:{self.bastion_port}")
self.bastion_client = paramiko.SSHClient()
self.bastion_client.load_system_host_keys()
self.bastion_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 尝试使用密钥登录
if self.bastion_key:
try:
self.bastion_client.connect(
hostname=self.bastion_host,
port=self.bastion_port,
username=self.bastion_user,
key_filename=self.bastion_key,
timeout=10
)
logger.info(f"使用密钥成功连接到堡垒机 {self.bastion_host}")
self.bastion_sftp = self.bastion_client.open_sftp()
return True
except paramiko.AuthenticationException:
logger.warning("使用密钥认证堡垒机失败,尝试密码认证")
# 如果没有提供密钥或密钥认证失败,尝试密码认证
if not self.bastion_password:
self.bastion_password = getpass.getpass(f"请输入堡垒机 {self.bastion_user}@{self.bastion_host} 的密码: ")
self.bastion_client.connect(
hostname=self.bastion_host,
port=self.bastion_port,
username=self.bastion_user,
password=self.bastion_password,
timeout=10
)
logger.info(f"使用密码成功连接到堡垒机 {self.bastion_host}")
self.bastion_sftp = self.bastion_client.open_sftp()
return True
except Exception as e:
logger.error(f"连接堡垒机失败: {str(e)}")
if self.bastion_client:
self.bastion_client.close()
return False
def connect_to_target(self):
"""通过堡垒机连接到目标主机"""
if not self.bastion_client:
logger.error("未连接到堡垒机")
return False
try:
logger.info(f"通过堡垒机连接到目标主机: {self.target_user}@{self.target_host}:{self.target_port}")
# 获取堡垒机的transport
self.transport = self.bastion_client.get_transport()
# 创建从堡垒机到目标主机的通道
dst_addr = (self.target_host, self.target_port)
src_addr = (self.bastion_host, self.bastion_port)
channel = self.transport.open_channel("direct-tcpip", dst_addr, src_addr)
# 创建到目标主机的SSH连接
self.target_client = paramiko.SSHClient()
self.target_client.load_system_host_keys()
self.target_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 尝试使用密钥登录目标主机
if self.target_key:
try:
key = paramiko.RSAKey.from_private_key_file(self.target_key)
self.target_client.connect(
hostname=self.target_host,
port=self.target_port,
username=self.target_user,
pkey=key,
sock=channel,
timeout=10
)
logger.info(f"使用密钥成功连接到目标主机 {self.target_host}")
self.target_sftp = self.target_client.open_sftp()
return True
except paramiko.AuthenticationException:
logger.warning("使用密钥认证目标主机失败,尝试密码认证")
# 如果没有提供密钥或密钥认证失败,尝试密码认证
if not self.target_password:
self.target_password = getpass.getpass(f"请输入目标主机 {self.target_user}@{self.target_host} 的密码: ")
self.target_client.connect(
hostname=self.target_host,
port=self.target_port,
username=self.target_user,
password=self.target_password,
sock=channel,
timeout=10
)
logger.info(f"使用密码成功连接到目标主机 {self.target_host}")
self.target_sftp = self.target_client.open_sftp()
return True
except Exception as e:
logger.error(f"连接目标主机失败: {str(e)}")
return False
def create_temp_dir_on_bastion(self):
"""在堡垒机上创建临时目录"""
if not self.bastion_client:
logger.error("未连接到堡垒机")
return False
try:
# 生成唯一的临时目录名
temp_dir_name = f"/tmp/file_transfer_{uuid.uuid4().hex}"
# 创建临时目录
self.bastion_client.exec_command(f"mkdir -p {temp_dir_name}")
# 验证目录是否创建成功
stdin, stdout, stderr = self.bastion_client.exec_command(f"ls -d {temp_dir_name}")
if temp_dir_name in stdout.read().decode():
self.temp_dir = temp_dir_name
logger.info(f"在堡垒机上创建临时目录: {self.temp_dir}")
return True
else:
logger.error(f"在堡垒机上创建临时目录失败: {stderr.read().decode()}")
return False
except Exception as e:
logger.error(f"在堡垒机上创建临时目录时出错: {str(e)}")
return False
def upload_file(self, local_path, target_path, callback=None):
"""通过堡垒机上传文件到目标主机
Args:
local_path: 本地文件路径
target_path: 目标主机上的文件路径
callback: 上传进度回调函数,形如func(bytes_transferred, total_bytes)
Returns:
bool: 上传是否成功
"""
if not self.bastion_sftp or not self.target_sftp:
logger.error("SFTP连接未建立")
return False
if not os.path.exists(local_path):
logger.error(f"本地文件不存在: {local_path}")
return False
if not self.temp_dir:
if not self.create_temp_dir_on_bastion():
return False
try:
# 计算文件在堡垒机上的临时路径
filename = os.path.basename(local_path)
bastion_temp_path = os.path.join(self.temp_dir, filename)
file_size = os.path.getsize(local_path)
logger.info(f"开始上传文件: {local_path} -> {target_path} (文件大小: {file_size} 字节)")
# 第一步:将文件从本地上传到堡垒机
logger.info(f"上传到堡垒机: {local_path} -> {bastion_temp_path}")
start_time = time.time()
# 定义进度回调函数
def progress_callback(bytes_so_far, total_bytes):
if callback:
callback(bytes_so_far, total_bytes)
else:
percent = (bytes_so_far / total_bytes) * 100
sys.stdout.write(f"
上传到堡垒机: {percent:.2f}% ({bytes_so_far}/{total_bytes} 字节)")
sys.stdout.flush()
# 上传文件到堡垒机
self.bastion_sftp.put(local_path, bastion_temp_path, callback=progress_callback)
if not callback:
print() # 打印换行
logger.info(f"文件成功上传到堡垒机,耗时: {time.time() - start_time:.2f}秒")
# 第二步:将文件从堡垒机传输到目标主机
logger.info(f"从堡垒机传输到目标主机: {bastion_temp_path} -> {target_path}")
start_time = time.time()
# 检查目标路径是否是目录
try:
target_stat = self.target_sftp.stat(target_path)
is_dir = target_stat.st_mode & 0o40000 # 检查是否为目录
if is_dir:
# 如果是目录,使用原始文件名
target_path = os.path.join(target_path, filename)
except FileNotFoundError:
# 如果路径不存在,假设是文件路径
# 确保目标目录存在
target_dir = os.path.dirname(target_path)
if target_dir:
try:
self.target_sftp.stat(target_dir)
except FileNotFoundError:
# 创建目录
logger.info(f"在目标主机上创建目录: {target_dir}")
self.target_client.exec_command(f"mkdir -p {target_dir}")
# 获取堡垒机上临时文件的大小
bastion_file_size = self.bastion_sftp.stat(bastion_temp_path).st_size
# 读取堡垒机上的文件并写入目标主机
with self.bastion_sftp.open(bastion_temp_path, 'rb') as bastion_file:
with self.target_sftp.file(target_path, 'wb') as target_file:
bytes_so_far = 0
chunk_size = 32768 # 32KB
while True:
data = bastion_file.read(chunk_size)
if not data:
break
target_file.write(data)
bytes_so_far += len(data)
# 更新进度
if callback:
callback(bytes_so_far, bastion_file_size)
else:
percent = (bytes_so_far / bastion_file_size) * 100
sys.stdout.write(f"
从堡垒机传输到目标主机: {percent:.2f}% ({bytes_so_far}/{bastion_file_size} 字节)")
sys.stdout.flush()
if not callback:
print() # 打印换行
logger.info(f"文件成功传输到目标主机,耗时: {time.time() - start_time:.2f}秒")
# 清理堡垒机上的临时文件
logger.info(f"清理堡垒机上的临时文件: {bastion_temp_path}")
self.bastion_sftp.remove(bastion_temp_path)
# 验证目标文件大小
try:
target_file_size = self.target_sftp.stat(target_path).st_size
if target_file_size == file_size:
logger.info(f"文件大小验证成功: {target_file_size} 字节")
return True
else:
logger.error(f"文件大小不匹配: 原始大小 {file_size} 字节,目标大小 {target_file_size} 字节")
return False
except Exception as e:
logger.error(f"验证目标文件时出错: {str(e)}")
return False
except Exception as e:
logger.error(f"上传文件时出错: {str(e)}")
return False
def upload_directory(self, local_dir, target_dir, callback=None):
"""递归上传目录到目标主机
Args:
local_dir: 本地目录路径
target_dir: 目标主机上的目录路径
callback: 上传进度回调函数
Returns:
bool: 上传是否成功
"""
if not os.path.isdir(local_dir):
logger.error(f"本地目录不存在: {local_dir}")
return False
try:
# 确保目标目录存在
try:
self.target_sftp.stat(target_dir)
except FileNotFoundError:
logger.info(f"在目标主机上创建目录: {target_dir}")
self.target_client.exec_command(f"mkdir -p {target_dir}")
# 遍历本地目录
success = True
for root, dirs, files in os.walk(local_dir):
# 计算相对路径
rel_path = os.path.relpath(root, local_dir)
if rel_path == '.':
rel_path = ''
# 创建对应的远程目录
remote_dir = os.path.join(target_dir, rel_path).replace('\', '/')
if rel_path:
try:
self.target_sftp.stat(remote_dir)
except FileNotFoundError:
logger.info(f"在目标主机上创建目录: {remote_dir}")
self.target_client.exec_command(f"mkdir -p {remote_dir}")
# 上传文件
for file in files:
local_path = os.path.join(root, file)
remote_path = os.path.join(remote_dir, file).replace('\', '/')
if not self.upload_file(local_path, remote_path, callback):
logger.error(f"上传文件失败: {local_path} -> {remote_path}")
success = False
if not callback:
response = input("是否继续上传其他文件? (y/n): ")
if response.lower() != 'y':
return False
return success
except Exception as e:
logger.error(f"上传目录时出错: {str(e)}")
return False
def close(self):
"""关闭所有连接和清理临时文件"""
# 清理临时目录
if self.temp_dir and self.bastion_client and self.bastion_client.get_transport().is_active():
try:
logger.info(f"清理堡垒机上的临时目录: {self.temp_dir}")
self.bastion_client.exec_command(f"rm -rf {self.temp_dir}")
except:
pass
# 关闭SFTP连接
if self.target_sftp:
self.target_sftp.close()
if self.bastion_sftp:
self.bastion_sftp.close()
# 关闭SSH连接
if self.target_client:
self.target_client.close()
logger.info(f"已关闭与目标主机 {self.target_host} 的连接")
if self.bastion_client:
self.bastion_client.close()
logger.info(f"已关闭与堡垒机 {self.bastion_host} 的连接")
def main():
parser = argparse.ArgumentParser(description='通过堡垒机上传文件到目标主机')
# 堡垒机参数
parser.add_argument('--bastion-host', required=True, help='堡垒机主机名或IP')
parser.add_argument('--bastion-user', required=True, help='堡垒机用户名')
parser.add_argument('--bastion-key', help='堡垒机私钥文件路径')
parser.add_argument('--bastion-password', help='堡垒机密码')
parser.add_argument('--bastion-port', type=int, default=22, help='堡垒机SSH端口,默认22')
# 目标主机参数
parser.add_argument('--target-host', required=True, help='目标主机名或IP')
parser.add_argument('--target-user', required=True, help='目标主机用户名')
parser.add_argument('--target-key', help='目标主机私钥文件路径')
parser.add_argument('--target-password', help='目标主机密码')
parser.add_argument('--target-port', type=int, default=22, help='目标主机SSH端口,默认22')
# 文件传输参数
parser.add_argument('--local-path', required=True, help='要上传的本地文件或目录路径')
parser.add_argument('--remote-path', required=True, help='目标主机上的文件或目录路径')
parser.add_argument('--recursive', action='store_true', help='递归上传目录')
args = parser.parse_args()
# 创建文件传输工具
transfer = BastionFileTransfer(
bastion_host=args.bastion_host,
bastion_user=args.bastion_user,
bastion_key=args.bastion_key,
bastion_password=args.bastion_password,
bastion_port=args.bastion_port,
target_host=args.target_host,
target_user=args.target_user,
target_key=args.target_key,
target_password=args.target_password,
target_port=args.target_port
)
try:
# 连接到堡垒机
if not transfer.connect_to_bastion():
sys.exit(1)
# 通过堡垒机连接到目标主机
if not transfer.connect_to_target():
sys.exit(1)
# 上传文件或目录
if os.path.isdir(args.local_path):
if args.recursive:
print(f"递归上传目录: {args.local_path} -> {args.remote_path}")
if transfer.upload_directory(args.local_path, args.remote_path):
print("目录上传成功")
else:
print("目录上传失败")
sys.exit(1)
else:
print("指定的本地路径是目录,但未指定--recursive参数")
sys.exit(1)
else:
print(f"上传文件: {args.local_path} -> {args.remote_path}")
if transfer.upload_file(args.local_path, args.remote_path):
print("文件上传成功")
else:
print("文件上传失败")
sys.exit(1)
finally:
# 关闭连接和清理临时文件
transfer.close()
if __name__ == "__main__":
main()
这个示例实现了以下功能:
通过堡垒机将文件上传到目标主机
支持单个文件上传和递归目录上传
显示详细的上传进度
使用临时目录作为中转,并在完成后自动清理
验证上传文件的完整性
使用方法示例:
bash
# 上传单个文件
python bastion_file_transfer.py --bastion-host bastion.example.com --bastion-user admin
--target-host internal.example.com --target-user user
--local-path /path/to/local/file.txt --remote-path /path/on/target/file.txt
# 递归上传目录
python bastion_file_transfer.py --bastion-host bastion.example.com --bastion-user admin
--target-host internal.example.com --target-user user
--local-path /path/to/local/dir --remote-path /path/on/target/dir
--recursive
通过本章的详细介绍,您应该已经掌握了paramiko的基本原理和使用方法,包括SSHClient和SFTPClient等核心组件,以及它们在系统批量运维中的应用。相比于前一章介绍的pexpect,paramiko提供了更加稳定和高效的SSH操作能力,特别适合需要可靠文件传输和命令执行的场景。通过实现密钥登录、堡垒机模式下的命令执行和文件传输等实际案例,展示了paramiko在现代运维环境中的强大功能。
暂无评论内容