006-系统批量运维管理器 paramiko 详解

系统批量运维管理器 paramiko 详解

Paramiko是Python的一个强大的SSH协议库,提供了SSH客户端和服务器的功能,是进行远程服务器管理、自动化运维的重要工具。与前一章介绍的pexpect不同,paramiko不依赖于交互式程序,而是直接实现了SSH协议,提供了更加稳定和灵活的远程操作能力。本章将详细介绍paramiko的安装、核心组件及其在系统批量运维中的应用。

6.1 paramiko 的安装

Paramiko是一个纯Python实现的SSH协议库,安装简便,兼容性良好。以下是详细的安装步骤和注意事项。

使用pip安装

最简单的安装方式是通过pip包管理器:

bash

pip install paramiko

对于特定Python版本,可以使用:

bash

pip3 install paramiko  # 指定使用Python 3

系统包管理器安装

在各种Linux发行版中,也可以使用系统包管理器安装:

Debian/Ubuntu:

bash

sudo apt-get install python3-paramiko

CentOS/RHEL:

bash

sudo yum install python3-paramiko

Fedora:

bash

sudo dnf install python3-paramiko

依赖关系

Paramiko依赖于以下Python库:

cryptography: 提供密码学算法支持
bcrypt: 用于密码散列
pynacl: 提供加密功能

安装paramiko时,pip通常会自动安装这些依赖。如果出现问题,可以手动安装:

bash

pip install cryptography bcrypt pynacl

验证安装

安装完成后,可以通过以下Python代码验证paramiko是否正确安装:

python

import paramiko
print(paramiko.__version__)

如果显示版本号,说明安装成功。

可能遇到的问题及解决方案

加密库编译问题

在某些系统上,cryptography库需要编译,可能会遇到缺少编译工具或开发库的问题。解决方案:

bash

# Debian/Ubuntu
sudo apt-get install build-essential libssl-dev libffi-dev python3-dev

# CentOS/RHEL
sudo yum install gcc openssl-devel libffi-devel python3-devel

Windows平台安装问题

在Windows上,可能需要安装Visual C++ Build Tools。最简单的方法是使用预编译的wheel包:

bash

pip install --only-binary=:all: paramiko

版本兼容性问题

如果遇到版本兼容性问题,可以尝试安装特定版本:

bash

pip install paramiko==2.7.2  # 安装特定版本

源码安装

对于需要最新开发版本或者特定修改的用户,也可以从源码安装:

bash

git clone https://github.com/paramiko/paramiko.git
cd paramiko
pip install -e .

6.2 paramiko 的核心组件

Paramiko提供了多个核心组件,用于实现SSH客户端和服务器的功能。下面将详细介绍这些组件的使用方法和关键特性。

6.2.1 SSHClient 类

SSHClient是paramiko最常用的类之一,提供了类似于OpenSSH的高级SSH客户端功能。

基本使用

以下是使用SSHClient的基本示例:

python

import paramiko

# 创建SSH客户端实例
client = paramiko.SSHClient()

# 设置未知主机密钥的策略
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

try:
    # 连接到远程服务器
    client.connect(
        hostname='example.com', 
        port=22, 
        username='user', 
        password='password'
    )
    
    # 执行命令
    stdin, stdout, stderr = client.exec_command('ls -l')
    
    # 获取命令输出
    output = stdout.read().decode()
    error = stderr.read().decode()
    
    # 打印输出
    print("输出:", output)
    if error:
        print("错误:", error)
        
finally:
    # 关闭连接
    client.close()
主要方法和参数

SSHClient类提供了以下重要方法:

connect(hostname, port=22, username=None, password=None, pkey=None, …)

建立与SSH服务器的连接。主要参数包括:

hostname: 主机名或IP地址
port: SSH端口,默认为22
username: 用户名
password: 密码(与pkey二选一)
pkey: 私钥对象(与password二选一)
key_filename: 私钥文件路径
timeout: 连接超时时间
allow_agent: 是否允许使用SSH代理
look_for_keys: 是否在~/.ssh中查找密钥文件

exec_command(command, bufsize=-1, timeout=None, …)

在远程服务器上执行命令,并返回stdin, stdout和stderr的文件对象。

open_sftp()

打开SFTP会话,返回SFTPClient对象,用于文件传输操作。

set_missing_host_key_policy(policy)

设置处理未知主机密钥的策略。常用的策略包括:

paramiko.AutoAddPolicy(): 自动添加新主机密钥
paramiko.RejectPolicy(): 拒绝未知主机
paramiko.WarningPolicy(): 记录警告但接受新主机

close()

关闭SSH连接。

高级功能:使用密钥认证

使用SSH密钥认证通常比密码认证更安全:

python

import paramiko

client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# 方法1:指定私钥文件路径
client.connect(
    hostname='example.com',
    username='user',
    key_filename='/path/to/private_key'
)

# 方法2:使用RSAKey对象
key = paramiko.RSAKey.from_private_key_file('/path/to/private_key', password='key_password')
client.connect(
    hostname='example.com',
    username='user',
    pkey=key
)
处理主机密钥

SSH安全的一个关键方面是验证主机密钥,防止中间人攻击。Paramiko提供了几种处理主机密钥的方式:

python

import paramiko
import os

client = paramiko.SSHClient()

# 方法1:自动添加(适合测试环境)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# 方法2:从known_hosts加载
client.load_system_host_keys()  # 加载系统级known_hosts
client.load_host_keys(os.path.expanduser('~/.ssh/known_hosts'))  # 加载用户级known_hosts

# 方法3:自定义处理策略
class MyPolicy(paramiko.MissingHostKeyPolicy):
    def missing_host_key(self, client, hostname, key):
        print(f"警告:未知主机 {hostname}!")
        print(f"指纹:{key.get_fingerprint().hex()}")
        response = input("是否继续连接? (yes/no): ")
        if response.lower() == 'yes':
            client._host_keys.add(hostname, key.get_name(), key)
            client._host_keys_filename = os.path.expanduser('~/.ssh/known_hosts')
            client.save_host_keys(client._host_keys_filename)
            return
        raise paramiko.SSHException("主机密钥未被验证")

client.set_missing_host_key_policy(MyPolicy())

6.2.2 SFTPClient 类

SFTPClient提供了SFTP协议的实现,用于安全的文件传输操作。

基本使用

通常,SFTPClient对象是通过SSHClientopen_sftp方法获取的:

python

import paramiko

# 创建SSH客户端
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('example.com', username='user', password='password')

# 打开SFTP会话
sftp = client.open_sftp()

try:
    # 上传文件
    sftp.put('local_file.txt', 'remote_file.txt')
    
    # 下载文件
    sftp.get('remote_file2.txt', 'local_file2.txt')
    
    # 列出目录内容
    file_list = sftp.listdir('.')
    print("远程目录内容:", file_list)
    
finally:
    # 关闭SFTP会话
    sftp.close()
    
    # 关闭SSH连接
    client.close()
主要方法

SFTPClient类提供了以下重要方法:

put(localpath, remotepath, callback=None, confirm=True)

将本地文件上传到远程服务器。参数说明:

localpath: 本地文件路径
remotepath: 远程文件路径
callback: 进度回调函数,形如func(bytes_transferred, total_bytes)
confirm: 是否检查远程文件大小以确认传输成功

get(remotepath, localpath, callback=None)

从远程服务器下载文件到本地。参数说明:

remotepath: 远程文件路径
localpath: 本地文件路径
callback: 进度回调函数

listdir(path='.')

列出指定目录的内容,返回文件名列表。

mkdir(path, mode=511)

在远程服务器上创建目录。

remove(path)

删除远程文件。

rename(oldpath, newpath)

重命名远程文件或目录。

stat(path)

获取远程文件的状态,类似于os.stat()。

chmod(path, mode)

更改远程文件的权限。

chown(path, uid, gid)

更改远程文件的所有者和组。

处理大文件和显示传输进度

对于大文件传输,可以使用回调函数显示进度:

python

import paramiko
import os

def progress_callback(bytes_transferred, total_bytes):
    percentage = (bytes_transferred / total_bytes) * 100
    print(f"
传输进度: {percentage:.2f}%", end="")

client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('example.com', username='user', password='password')

sftp = client.open_sftp()

try:
    # 获取本地文件大小
    local_file = 'large_file.zip'
    file_size = os.path.getsize(local_file)
    
    print(f"开始上传文件: {local_file} ({file_size} 字节)")
    
    # 上传文件并显示进度
    sftp.put(local_file, 'remote_large_file.zip', callback=progress_callback)
    
    print("
文件上传完成!")
    
finally:
    sftp.close()
    client.close()
递归操作目录

Paramiko的SFTP客户端没有直接提供递归目录操作的方法,但可以自己实现:

python

import paramiko
import os

def upload_directory(sftp, local_dir, remote_dir):
    """递归上传目录内容"""
    # 确保远程目录存在
    try:
        sftp.stat(remote_dir)
    except FileNotFoundError:
        sftp.mkdir(remote_dir)
    
    # 遍历本地目录
    for item in os.listdir(local_dir):
        local_path = os.path.join(local_dir, item)
        remote_path = os.path.join(remote_dir, item)
        
        if os.path.isfile(local_path):
            # 上传文件
            sftp.put(local_path, remote_path)
            print(f"上传文件: {local_path} -> {remote_path}")
        elif os.path.isdir(local_path):
            # 递归上传子目录
            upload_directory(sftp, local_path, remote_path)

def download_directory(sftp, remote_dir, local_dir):
    """递归下载目录内容"""
    # 确保本地目录存在
    if not os.path.exists(local_dir):
        os.makedirs(local_dir)
    
    # 遍历远程目录
    for item in sftp.listdir(remote_dir):
        remote_path = os.path.join(remote_dir, item)
        local_path = os.path.join(local_dir, item)
        
        try:
            # 检查是否为目录
            sftp.stat(remote_path)
            
            # 尝试作为目录列出内容
            try:
                sftp.listdir(remote_path)
                # 是目录,递归下载
                download_directory(sftp, remote_path, local_path)
            except (IOError, PermissionError):
                # 是文件,直接下载
                sftp.get(remote_path, local_path)
                print(f"下载文件: {remote_path} -> {local_path}")
        except (IOError, PermissionError):
            # 是文件,直接下载
            sftp.get(remote_path, local_path)
            print(f"下载文件: {remote_path} -> {local_path}")

# 使用示例
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('example.com', username='user', password='password')

sftp = client.open_sftp()

try:
    # 上传整个目录
    upload_directory(sftp, './local_folder', '/remote/folder')
    
    # 下载整个目录
    download_directory(sftp, '/remote/folder', './downloaded_folder')
    
finally:
    sftp.close()
    client.close()

6.3 paramiko 应用示例

下面通过实际的应用示例,展示paramiko在系统批量运维中的强大功能。

6.3.1 实现密钥方式登录远程主机

使用SSH密钥登录是一种更安全的认证方式,下面是一个完整的实现示例:

python

#!/usr/bin/env python3
"""
使用SSH密钥登录远程主机并执行命令
"""

import paramiko
import os
import sys
import argparse
import getpass
import logging
import socket
import time

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('ssh_key_login')

class SSHKeyLogin:
    def __init__(self, hostname, username, key_file=None, port=22, passphrase=None):
        """初始化SSH密钥登录客户端
        
        Args:
            hostname: 远程主机名或IP
            username: 用户名
            key_file: 私钥文件路径,默认为~/.ssh/id_rsa
            port: SSH端口,默认为22
            passphrase: 私钥密码,如果有的话
        """
        self.hostname = hostname
        self.username = username
        self.port = port
        self.key_file = key_file or os.path.expanduser('~/.ssh/id_rsa')
        self.passphrase = passphrase
        self.client = None
    
    def connect(self):
        """连接到远程主机"""
        try:
            # 创建SSH客户端
            self.client = paramiko.SSHClient()
            
            # 加载系统主机密钥
            self.client.load_system_host_keys()
            
            # 对于未知的主机,添加密钥
            self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            
            # 加载私钥
            try:
                key = paramiko.RSAKey.from_private_key_file(self.key_file, password=self.passphrase)
            except paramiko.ssh_exception.PasswordRequiredException:
                # 如果私钥需要密码但未提供
                if not self.passphrase:
                    self.passphrase = getpass.getpass(f"请输入私钥'{self.key_file}'的密码: ")
                    key = paramiko.RSAKey.from_private_key_file(self.key_file, password=self.passphrase)
            
            # 建立连接
            logger.info(f"正在连接到 {self.username}@{self.hostname}:{self.port}")
            
            self.client.connect(
                hostname=self.hostname,
                port=self.port,
                username=self.username,
                pkey=key,
                timeout=10
            )
            
            logger.info(f"成功连接到 {self.hostname}")
            return True
        
        except paramiko.AuthenticationException:
            logger.error(f"认证失败: {self.username}@{self.hostname}")
            return False
        except paramiko.SSHException as e:
            logger.error(f"SSH连接错误: {str(e)}")
            return False
        except socket.error as e:
            logger.error(f"网络连接错误: {str(e)}")
            return False
        except Exception as e:
            logger.error(f"连接时发生错误: {str(e)}")
            return False
    
    def execute_command(self, command, timeout=30):
        """在远程主机上执行命令
        
        Args:
            command: 要执行的命令
            timeout: 命令超时时间,默认30秒
            
        Returns:
            tuple: (exit_status, stdout, stderr)
        """
        if not self.client:
            logger.error("未连接到远程主机")
            return -1, "", "未连接到远程主机"
        
        try:
            logger.info(f"执行命令: {command}")
            
            # 执行命令
            stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
            
            # 获取退出状态
            exit_status = stdout.channel.recv_exit_status()
            
            # 读取输出
            stdout_content = stdout.read().decode().strip()
            stderr_content = stderr.read().decode().strip()
            
            if exit_status == 0:
                logger.info("命令执行成功")
            else:
                logger.warning(f"命令执行失败,退出状态: {exit_status}")
                if stderr_content:
                    logger.warning(f"错误输出: {stderr_content}")
            
            return exit_status, stdout_content, stderr_content
            
        except socket.timeout:
            logger.error(f"命令执行超时: {command}")
            return -1, "", "命令执行超时"
        except Exception as e:
            logger.error(f"执行命令时发生错误: {str(e)}")
            return -1, "", str(e)
    
    def close(self):
        """关闭SSH连接"""
        if self.client:
            self.client.close()
            logger.info(f"已关闭与 {self.hostname} 的连接")

def main():
    parser = argparse.ArgumentParser(description='使用SSH密钥登录远程主机并执行命令')
    parser.add_argument('-H', '--host', required=True, help='远程主机名或IP')
    parser.add_argument('-u', '--user', required=True, help='用户名')
    parser.add_argument('-k', '--key', help='私钥文件路径,默认为~/.ssh/id_rsa')
    parser.add_argument('-p', '--port', type=int, default=22, help='SSH端口,默认为22')
    parser.add_argument('-c', '--command', help='要执行的命令')
    parser.add_argument('-f', '--file', help='从文件读取要执行的命令')
    parser.add_argument('--passphrase', help='私钥密码')
    
    args = parser.parse_args()
    
    # 检查命令或命令文件
    if not args.command and not args.file:
        parser.error("必须提供要执行的命令(-c)或命令文件(-f)")
    
    # 创建SSH连接
    ssh = SSHKeyLogin(
        hostname=args.host,
        username=args.user,
        key_file=args.key,
        port=args.port,
        passphrase=args.passphrase
    )
    
    try:
        # 连接到远程主机
        if not ssh.connect():
            sys.exit(1)
        
        # 执行命令
        if args.command:
            # 执行单个命令
            exit_status, stdout, stderr = ssh.execute_command(args.command)
            
            print("
--- 命令输出 ---")
            if stdout:
                print(stdout)
            
            if stderr:
                print("
--- 错误输出 ---")
                print(stderr)
            
            if exit_status != 0:
                print(f"
命令失败,退出状态: {exit_status}")
                sys.exit(exit_status)
        
        elif args.file:
            # 从文件读取命令并执行
            try:
                with open(args.file, 'r') as f:
                    commands = [line.strip() for line in f if line.strip() and not line.startswith('#')]
                
                for i, cmd in enumerate(commands, 1):
                    print(f"
=== 执行命令 {i}/{len(commands)}: {cmd} ===")
                    exit_status, stdout, stderr = ssh.execute_command(cmd)
                    
                    print("
--- 命令输出 ---")
                    if stdout:
                        print(stdout)
                    
                    if stderr:
                        print("
--- 错误输出 ---")
                        print(stderr)
                    
                    if exit_status != 0:
                        print(f"
命令失败,退出状态: {exit_status}")
                        if i < len(commands):
                            if input("是否继续执行后续命令? (y/n): ").lower() != 'y':
                                break
            
            except FileNotFoundError:
                logger.error(f"找不到命令文件: {args.file}")
                sys.exit(1)
        
    finally:
        # 关闭连接
        ssh.close()

if __name__ == "__main__":
    main()

这个示例实现了以下功能:

使用SSH密钥登录远程主机
支持执行单个命令或从文件读取多个命令
详细的日志记录和错误处理
交互式提示输入私钥密码(如果需要)

使用方法示例:

bash

# 执行单个命令
python ssh_key_login.py -H server.example.com -u username -c "df -h"

# 从文件读取命令执行
python ssh_key_login.py -H server.example.com -u username -f commands.txt

# 指定私钥文件
python ssh_key_login.py -H server.example.com -u username -k ~/.ssh/custom_key -c "ls -l"

6.3.2 实现堡垒机模式下的远程命令执行

在企业环境中,经常需要通过堡垒机(跳板机)来访问内部服务器。以下示例展示了如何使用paramiko实现通过堡垒机执行远程命令:

python

#!/usr/bin/env python3
"""
通过堡垒机执行远程命令
"""

import paramiko
import os
import sys
import logging
import argparse
import getpass
import time

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('bastion_executor')

class BastionExecutor:
    def __init__(self, bastion_host, bastion_user, target_host, target_user, 
                 bastion_key=None, target_key=None, bastion_password=None, 
                 target_password=None, bastion_port=22, target_port=22):
        """初始化堡垒机命令执行器
        
        Args:
            bastion_host: 堡垒机主机名或IP
            bastion_user: 堡垒机用户名
            target_host: 目标主机名或IP
            target_user: 目标主机用户名
            bastion_key: 堡垒机私钥文件路径
            target_key: 目标主机私钥文件路径
            bastion_password: 堡垒机密码
            target_password: 目标主机密码
            bastion_port: 堡垒机SSH端口
            target_port: 目标主机SSH端口
        """
        self.bastion_host = bastion_host
        self.bastion_user = bastion_user
        self.bastion_port = bastion_port
        self.bastion_key = bastion_key
        self.bastion_password = bastion_password
        
        self.target_host = target_host
        self.target_user = target_user
        self.target_port = target_port
        self.target_key = target_key
        self.target_password = target_password
        
        self.bastion_client = None
        self.target_client = None
        self.transport = None
    
    def connect_to_bastion(self):
        """连接到堡垒机"""
        try:
            logger.info(f"正在连接到堡垒机: {self.bastion_user}@{self.bastion_host}:{self.bastion_port}")
            
            self.bastion_client = paramiko.SSHClient()
            self.bastion_client.load_system_host_keys()
            self.bastion_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            
            # 尝试使用密钥登录
            if self.bastion_key:
                try:
                    self.bastion_client.connect(
                        hostname=self.bastion_host,
                        port=self.bastion_port,
                        username=self.bastion_user,
                        key_filename=self.bastion_key,
                        timeout=10
                    )
                    logger.info(f"使用密钥成功连接到堡垒机 {self.bastion_host}")
                    return True
                except paramiko.AuthenticationException:
                    logger.warning("使用密钥认证堡垒机失败,尝试密码认证")
            
            # 如果没有提供密钥或密钥认证失败,尝试密码认证
            if not self.bastion_password:
                self.bastion_password = getpass.getpass(f"请输入堡垒机 {self.bastion_user}@{self.bastion_host} 的密码: ")
            
            self.bastion_client.connect(
                hostname=self.bastion_host,
                port=self.bastion_port,
                username=self.bastion_user,
                password=self.bastion_password,
                timeout=10
            )
            
            logger.info(f"使用密码成功连接到堡垒机 {self.bastion_host}")
            return True
            
        except Exception as e:
            logger.error(f"连接堡垒机失败: {str(e)}")
            if self.bastion_client:
                self.bastion_client.close()
            return False
    
    def connect_to_target(self):
        """通过堡垒机连接到目标主机"""
        if not self.bastion_client:
            logger.error("未连接到堡垒机")
            return False
        
        try:
            logger.info(f"通过堡垒机连接到目标主机: {self.target_user}@{self.target_host}:{self.target_port}")
            
            # 获取堡垒机的transport
            self.transport = self.bastion_client.get_transport()
            
            # 创建从堡垒机到目标主机的通道
            dst_addr = (self.target_host, self.target_port)
            src_addr = (self.bastion_host, self.bastion_port)
            channel = self.transport.open_channel("direct-tcpip", dst_addr, src_addr)
            
            # 创建到目标主机的SSH连接
            self.target_client = paramiko.SSHClient()
            self.target_client.load_system_host_keys()
            self.target_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            
            # 尝试使用密钥登录目标主机
            if self.target_key:
                try:
                    key = paramiko.RSAKey.from_private_key_file(self.target_key)
                    self.target_client.connect(
                        hostname=self.target_host,
                        port=self.target_port,
                        username=self.target_user,
                        pkey=key,
                        sock=channel,
                        timeout=10
                    )
                    logger.info(f"使用密钥成功连接到目标主机 {self.target_host}")
                    return True
                except paramiko.AuthenticationException:
                    logger.warning("使用密钥认证目标主机失败,尝试密码认证")
            
            # 如果没有提供密钥或密钥认证失败,尝试密码认证
            if not self.target_password:
                self.target_password = getpass.getpass(f"请输入目标主机 {self.target_user}@{self.target_host} 的密码: ")
            
            self.target_client.connect(
                hostname=self.target_host,
                port=self.target_port,
                username=self.target_user,
                password=self.target_password,
                sock=channel,
                timeout=10
            )
            
            logger.info(f"使用密码成功连接到目标主机 {self.target_host}")
            return True
            
        except Exception as e:
            logger.error(f"连接目标主机失败: {str(e)}")
            return False
    
    def execute_command(self, command, timeout=30):
        """在目标主机上执行命令
        
        Args:
            command: 要执行的命令
            timeout: 命令超时时间,默认30秒
            
        Returns:
            tuple: (exit_status, stdout, stderr)
        """
        if not self.target_client:
            logger.error("未连接到目标主机")
            return -1, "", "未连接到目标主机"
        
        try:
            logger.info(f"在目标主机 {self.target_host} 上执行命令: {command}")
            
            # 执行命令
            stdin, stdout, stderr = self.target_client.exec_command(command, timeout=timeout)
            
            # 获取退出状态
            exit_status = stdout.channel.recv_exit_status()
            
            # 读取输出
            stdout_content = stdout.read().decode().strip()
            stderr_content = stderr.read().decode().strip()
            
            if exit_status == 0:
                logger.info("命令执行成功")
            else:
                logger.warning(f"命令执行失败,退出状态: {exit_status}")
                if stderr_content:
                    logger.warning(f"错误输出: {stderr_content}")
            
            return exit_status, stdout_content, stderr_content
            
        except Exception as e:
            logger.error(f"执行命令时发生错误: {str(e)}")
            return -1, "", str(e)
    
    def close(self):
        """关闭所有连接"""
        if self.target_client:
            self.target_client.close()
            logger.info(f"已关闭与目标主机 {self.target_host} 的连接")
        
        if self.bastion_client:
            self.bastion_client.close()
            logger.info(f"已关闭与堡垒机 {self.bastion_host} 的连接")
    
    def execute_commands(self, commands, continue_on_error=False):
        """执行多个命令
        
        Args:
            commands: 命令列表
            continue_on_error: 如果为True,则在命令失败时继续执行
            
        Returns:
            list: 包含每个命令结果的元组列表[(exit_status, stdout, stderr), ...]
        """
        results = []
        
        for i, cmd in enumerate(commands, 1):
            logger.info(f"执行命令 {i}/{len(commands)}: {cmd}")
            exit_status, stdout, stderr = self.execute_command(cmd)
            results.append((exit_status, stdout, stderr))
            
            # 如果命令失败且不继续执行,则中断
            if exit_status != 0 and not continue_on_error:
                logger.warning(f"命令 '{cmd}' 失败,退出状态: {exit_status},停止执行后续命令")
                break
        
        return results

def main():
    parser = argparse.ArgumentParser(description='通过堡垒机执行远程命令')
    
    # 堡垒机参数
    parser.add_argument('--bastion-host', required=True, help='堡垒机主机名或IP')
    parser.add_argument('--bastion-user', required=True, help='堡垒机用户名')
    parser.add_argument('--bastion-key', help='堡垒机私钥文件路径')
    parser.add_argument('--bastion-password', help='堡垒机密码')
    parser.add_argument('--bastion-port', type=int, default=22, help='堡垒机SSH端口,默认22')
    
    # 目标主机参数
    parser.add_argument('--target-host', required=True, help='目标主机名或IP')
    parser.add_argument('--target-user', required=True, help='目标主机用户名')
    parser.add_argument('--target-key', help='目标主机私钥文件路径')
    parser.add_argument('--target-password', help='目标主机密码')
    parser.add_argument('--target-port', type=int, default=22, help='目标主机SSH端口,默认22')
    
    # 命令参数
    parser.add_argument('-c', '--command', help='要执行的命令')
    parser.add_argument('-f', '--file', help='从文件读取要执行的命令')
    parser.add_argument('--continue-on-error', action='store_true', help='命令失败时继续执行')
    
    args = parser.parse_args()
    
    # 检查命令或命令文件
    if not args.command and not args.file:
        parser.error("必须提供要执行的命令(-c)或命令文件(-f)")
    
    # 创建执行器
    executor = BastionExecutor(
        bastion_host=args.bastion_host,
        bastion_user=args.bastion_user,
        bastion_key=args.bastion_key,
        bastion_password=args.bastion_password,
        bastion_port=args.bastion_port,
        target_host=args.target_host,
        target_user=args.target_user,
        target_key=args.target_key,
        target_password=args.target_password,
        target_port=args.target_port
    )
    
    try:
        # 连接到堡垒机
        if not executor.connect_to_bastion():
            sys.exit(1)
        
        # 通过堡垒机连接到目标主机
        if not executor.connect_to_target():
            sys.exit(1)
        
        # 执行命令
        if args.command:
            # 执行单个命令
            exit_status, stdout, stderr = executor.execute_command(args.command)
            
            print("
--- 命令输出 ---")
            if stdout:
                print(stdout)
            
            if stderr:
                print("
--- 错误输出 ---")
                print(stderr)
            
            if exit_status != 0:
                print(f"
命令失败,退出状态: {exit_status}")
                sys.exit(exit_status)
        
        elif args.file:
            # 从文件读取命令并执行
            try:
                with open(args.file, 'r') as f:
                    commands = [line.strip() for line in f if line.strip() and not line.startswith('#')]
                
                results = executor.execute_commands(commands, args.continue_on_error)
                
                for i, (exit_status, stdout, stderr) in enumerate(results, 1):
                    print(f"
=== 命令 {i}/{len(results)} 结果 ===")
                    
                    print("
--- 命令输出 ---")
                    if stdout:
                        print(stdout)
                    
                    if stderr:
                        print("
--- 错误输出 ---")
                        print(stderr)
                    
                    print(f"
退出状态: {exit_status}")
            
            except FileNotFoundError:
                logger.error(f"找不到命令文件: {args.file}")
                sys.exit(1)
        
    finally:
        # 关闭连接
        executor.close()

if __name__ == "__main__":
    main()

这个示例实现了以下功能:

先连接到堡垒机,然后通过堡垒机连接到目标主机
支持密码和密钥两种认证方式
支持执行单个命令或从文件读取多个命令
支持在命令失败时继续执行或停止执行后续命令
详细的日志记录和错误处理

使用方法示例:

bash

# 使用密码认证(交互式输入密码)
python bastion_executor.py --bastion-host bastion.example.com --bastion-user admin 
                          --target-host internal.example.com --target-user user 
                          -c "df -h"

# 使用密钥认证
python bastion_executor.py --bastion-host bastion.example.com --bastion-user admin 
                          --bastion-key ~/.ssh/bastion_key 
                          --target-host internal.example.com --target-user user 
                          --target-key ~/.ssh/target_key 
                          -f commands.txt

6.3.3 实现堡垒机模式下的远程文件上传

在运维工作中,通过堡垒机上传文件到内部服务器是一个常见需求。以下是一个实现示例:

python

#!/usr/bin/env python3
"""
通过堡垒机上传文件到目标主机
"""

import paramiko
import os
import sys
import logging
import argparse
import getpass
import socket
import time
import tempfile
import uuid

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('bastion_file_transfer')

class BastionFileTransfer:
    def __init__(self, bastion_host, bastion_user, target_host, target_user, 
                 bastion_key=None, target_key=None, bastion_password=None, 
                 target_password=None, bastion_port=22, target_port=22):
        """初始化堡垒机文件传输工具
        
        Args:
            bastion_host: 堡垒机主机名或IP
            bastion_user: 堡垒机用户名
            target_host: 目标主机名或IP
            target_user: 目标主机用户名
            bastion_key: 堡垒机私钥文件路径
            target_key: 目标主机私钥文件路径
            bastion_password: 堡垒机密码
            target_password: 目标主机密码
            bastion_port: 堡垒机SSH端口
            target_port: 目标主机SSH端口
        """
        self.bastion_host = bastion_host
        self.bastion_user = bastion_user
        self.bastion_port = bastion_port
        self.bastion_key = bastion_key
        self.bastion_password = bastion_password
        
        self.target_host = target_host
        self.target_user = target_user
        self.target_port = target_port
        self.target_key = target_key
        self.target_password = target_password
        
        self.bastion_client = None
        self.target_client = None
        self.bastion_sftp = None
        self.target_sftp = None
        self.transport = None
        
        # 临时目录和文件
        self.temp_dir = None
    
    def connect_to_bastion(self):
        """连接到堡垒机"""
        try:
            logger.info(f"正在连接到堡垒机: {self.bastion_user}@{self.bastion_host}:{self.bastion_port}")
            
            self.bastion_client = paramiko.SSHClient()
            self.bastion_client.load_system_host_keys()
            self.bastion_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            
            # 尝试使用密钥登录
            if self.bastion_key:
                try:
                    self.bastion_client.connect(
                        hostname=self.bastion_host,
                        port=self.bastion_port,
                        username=self.bastion_user,
                        key_filename=self.bastion_key,
                        timeout=10
                    )
                    logger.info(f"使用密钥成功连接到堡垒机 {self.bastion_host}")
                    self.bastion_sftp = self.bastion_client.open_sftp()
                    return True
                except paramiko.AuthenticationException:
                    logger.warning("使用密钥认证堡垒机失败,尝试密码认证")
            
            # 如果没有提供密钥或密钥认证失败,尝试密码认证
            if not self.bastion_password:
                self.bastion_password = getpass.getpass(f"请输入堡垒机 {self.bastion_user}@{self.bastion_host} 的密码: ")
            
            self.bastion_client.connect(
                hostname=self.bastion_host,
                port=self.bastion_port,
                username=self.bastion_user,
                password=self.bastion_password,
                timeout=10
            )
            
            logger.info(f"使用密码成功连接到堡垒机 {self.bastion_host}")
            self.bastion_sftp = self.bastion_client.open_sftp()
            return True
            
        except Exception as e:
            logger.error(f"连接堡垒机失败: {str(e)}")
            if self.bastion_client:
                self.bastion_client.close()
            return False
    
    def connect_to_target(self):
        """通过堡垒机连接到目标主机"""
        if not self.bastion_client:
            logger.error("未连接到堡垒机")
            return False
        
        try:
            logger.info(f"通过堡垒机连接到目标主机: {self.target_user}@{self.target_host}:{self.target_port}")
            
            # 获取堡垒机的transport
            self.transport = self.bastion_client.get_transport()
            
            # 创建从堡垒机到目标主机的通道
            dst_addr = (self.target_host, self.target_port)
            src_addr = (self.bastion_host, self.bastion_port)
            channel = self.transport.open_channel("direct-tcpip", dst_addr, src_addr)
            
            # 创建到目标主机的SSH连接
            self.target_client = paramiko.SSHClient()
            self.target_client.load_system_host_keys()
            self.target_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            
            # 尝试使用密钥登录目标主机
            if self.target_key:
                try:
                    key = paramiko.RSAKey.from_private_key_file(self.target_key)
                    self.target_client.connect(
                        hostname=self.target_host,
                        port=self.target_port,
                        username=self.target_user,
                        pkey=key,
                        sock=channel,
                        timeout=10
                    )
                    logger.info(f"使用密钥成功连接到目标主机 {self.target_host}")
                    self.target_sftp = self.target_client.open_sftp()
                    return True
                except paramiko.AuthenticationException:
                    logger.warning("使用密钥认证目标主机失败,尝试密码认证")
            
            # 如果没有提供密钥或密钥认证失败,尝试密码认证
            if not self.target_password:
                self.target_password = getpass.getpass(f"请输入目标主机 {self.target_user}@{self.target_host} 的密码: ")
            
            self.target_client.connect(
                hostname=self.target_host,
                port=self.target_port,
                username=self.target_user,
                password=self.target_password,
                sock=channel,
                timeout=10
            )
            
            logger.info(f"使用密码成功连接到目标主机 {self.target_host}")
            self.target_sftp = self.target_client.open_sftp()
            return True
            
        except Exception as e:
            logger.error(f"连接目标主机失败: {str(e)}")
            return False
    
    def create_temp_dir_on_bastion(self):
        """在堡垒机上创建临时目录"""
        if not self.bastion_client:
            logger.error("未连接到堡垒机")
            return False
        
        try:
            # 生成唯一的临时目录名
            temp_dir_name = f"/tmp/file_transfer_{uuid.uuid4().hex}"
            
            # 创建临时目录
            self.bastion_client.exec_command(f"mkdir -p {temp_dir_name}")
            
            # 验证目录是否创建成功
            stdin, stdout, stderr = self.bastion_client.exec_command(f"ls -d {temp_dir_name}")
            if temp_dir_name in stdout.read().decode():
                self.temp_dir = temp_dir_name
                logger.info(f"在堡垒机上创建临时目录: {self.temp_dir}")
                return True
            else:
                logger.error(f"在堡垒机上创建临时目录失败: {stderr.read().decode()}")
                return False
            
        except Exception as e:
            logger.error(f"在堡垒机上创建临时目录时出错: {str(e)}")
            return False
    
    def upload_file(self, local_path, target_path, callback=None):
        """通过堡垒机上传文件到目标主机
        
        Args:
            local_path: 本地文件路径
            target_path: 目标主机上的文件路径
            callback: 上传进度回调函数,形如func(bytes_transferred, total_bytes)
        
        Returns:
            bool: 上传是否成功
        """
        if not self.bastion_sftp or not self.target_sftp:
            logger.error("SFTP连接未建立")
            return False
        
        if not os.path.exists(local_path):
            logger.error(f"本地文件不存在: {local_path}")
            return False
        
        if not self.temp_dir:
            if not self.create_temp_dir_on_bastion():
                return False
        
        try:
            # 计算文件在堡垒机上的临时路径
            filename = os.path.basename(local_path)
            bastion_temp_path = os.path.join(self.temp_dir, filename)
            
            file_size = os.path.getsize(local_path)
            logger.info(f"开始上传文件: {local_path} -> {target_path} (文件大小: {file_size} 字节)")
            
            # 第一步:将文件从本地上传到堡垒机
            logger.info(f"上传到堡垒机: {local_path} -> {bastion_temp_path}")
            start_time = time.time()
            
            # 定义进度回调函数
            def progress_callback(bytes_so_far, total_bytes):
                if callback:
                    callback(bytes_so_far, total_bytes)
                else:
                    percent = (bytes_so_far / total_bytes) * 100
                    sys.stdout.write(f"
上传到堡垒机: {percent:.2f}% ({bytes_so_far}/{total_bytes} 字节)")
                    sys.stdout.flush()
            
            # 上传文件到堡垒机
            self.bastion_sftp.put(local_path, bastion_temp_path, callback=progress_callback)
            
            if not callback:
                print()  # 打印换行
            
            logger.info(f"文件成功上传到堡垒机,耗时: {time.time() - start_time:.2f}秒")
            
            # 第二步:将文件从堡垒机传输到目标主机
            logger.info(f"从堡垒机传输到目标主机: {bastion_temp_path} -> {target_path}")
            start_time = time.time()
            
            # 检查目标路径是否是目录
            try:
                target_stat = self.target_sftp.stat(target_path)
                is_dir = target_stat.st_mode & 0o40000  # 检查是否为目录
                
                if is_dir:
                    # 如果是目录,使用原始文件名
                    target_path = os.path.join(target_path, filename)
            except FileNotFoundError:
                # 如果路径不存在,假设是文件路径
                # 确保目标目录存在
                target_dir = os.path.dirname(target_path)
                if target_dir:
                    try:
                        self.target_sftp.stat(target_dir)
                    except FileNotFoundError:
                        # 创建目录
                        logger.info(f"在目标主机上创建目录: {target_dir}")
                        self.target_client.exec_command(f"mkdir -p {target_dir}")
            
            # 获取堡垒机上临时文件的大小
            bastion_file_size = self.bastion_sftp.stat(bastion_temp_path).st_size
            
            # 读取堡垒机上的文件并写入目标主机
            with self.bastion_sftp.open(bastion_temp_path, 'rb') as bastion_file:
                with self.target_sftp.file(target_path, 'wb') as target_file:
                    bytes_so_far = 0
                    chunk_size = 32768  # 32KB
                    
                    while True:
                        data = bastion_file.read(chunk_size)
                        if not data:
                            break
                        
                        target_file.write(data)
                        bytes_so_far += len(data)
                        
                        # 更新进度
                        if callback:
                            callback(bytes_so_far, bastion_file_size)
                        else:
                            percent = (bytes_so_far / bastion_file_size) * 100
                            sys.stdout.write(f"
从堡垒机传输到目标主机: {percent:.2f}% ({bytes_so_far}/{bastion_file_size} 字节)")
                            sys.stdout.flush()
            
            if not callback:
                print()  # 打印换行
            
            logger.info(f"文件成功传输到目标主机,耗时: {time.time() - start_time:.2f}秒")
            
            # 清理堡垒机上的临时文件
            logger.info(f"清理堡垒机上的临时文件: {bastion_temp_path}")
            self.bastion_sftp.remove(bastion_temp_path)
            
            # 验证目标文件大小
            try:
                target_file_size = self.target_sftp.stat(target_path).st_size
                if target_file_size == file_size:
                    logger.info(f"文件大小验证成功: {target_file_size} 字节")
                    return True
                else:
                    logger.error(f"文件大小不匹配: 原始大小 {file_size} 字节,目标大小 {target_file_size} 字节")
                    return False
            except Exception as e:
                logger.error(f"验证目标文件时出错: {str(e)}")
                return False
            
        except Exception as e:
            logger.error(f"上传文件时出错: {str(e)}")
            return False
    
    def upload_directory(self, local_dir, target_dir, callback=None):
        """递归上传目录到目标主机
        
        Args:
            local_dir: 本地目录路径
            target_dir: 目标主机上的目录路径
            callback: 上传进度回调函数
        
        Returns:
            bool: 上传是否成功
        """
        if not os.path.isdir(local_dir):
            logger.error(f"本地目录不存在: {local_dir}")
            return False
        
        try:
            # 确保目标目录存在
            try:
                self.target_sftp.stat(target_dir)
            except FileNotFoundError:
                logger.info(f"在目标主机上创建目录: {target_dir}")
                self.target_client.exec_command(f"mkdir -p {target_dir}")
            
            # 遍历本地目录
            success = True
            for root, dirs, files in os.walk(local_dir):
                # 计算相对路径
                rel_path = os.path.relpath(root, local_dir)
                if rel_path == '.':
                    rel_path = ''
                
                # 创建对应的远程目录
                remote_dir = os.path.join(target_dir, rel_path).replace('\', '/')
                if rel_path:
                    try:
                        self.target_sftp.stat(remote_dir)
                    except FileNotFoundError:
                        logger.info(f"在目标主机上创建目录: {remote_dir}")
                        self.target_client.exec_command(f"mkdir -p {remote_dir}")
                
                # 上传文件
                for file in files:
                    local_path = os.path.join(root, file)
                    remote_path = os.path.join(remote_dir, file).replace('\', '/')
                    
                    if not self.upload_file(local_path, remote_path, callback):
                        logger.error(f"上传文件失败: {local_path} -> {remote_path}")
                        success = False
                        if not callback:
                            response = input("是否继续上传其他文件? (y/n): ")
                            if response.lower() != 'y':
                                return False
            
            return success
            
        except Exception as e:
            logger.error(f"上传目录时出错: {str(e)}")
            return False
    
    def close(self):
        """关闭所有连接和清理临时文件"""
        # 清理临时目录
        if self.temp_dir and self.bastion_client and self.bastion_client.get_transport().is_active():
            try:
                logger.info(f"清理堡垒机上的临时目录: {self.temp_dir}")
                self.bastion_client.exec_command(f"rm -rf {self.temp_dir}")
            except:
                pass
        
        # 关闭SFTP连接
        if self.target_sftp:
            self.target_sftp.close()
        
        if self.bastion_sftp:
            self.bastion_sftp.close()
        
        # 关闭SSH连接
        if self.target_client:
            self.target_client.close()
            logger.info(f"已关闭与目标主机 {self.target_host} 的连接")
        
        if self.bastion_client:
            self.bastion_client.close()
            logger.info(f"已关闭与堡垒机 {self.bastion_host} 的连接")

def main():
    parser = argparse.ArgumentParser(description='通过堡垒机上传文件到目标主机')
    
    # 堡垒机参数
    parser.add_argument('--bastion-host', required=True, help='堡垒机主机名或IP')
    parser.add_argument('--bastion-user', required=True, help='堡垒机用户名')
    parser.add_argument('--bastion-key', help='堡垒机私钥文件路径')
    parser.add_argument('--bastion-password', help='堡垒机密码')
    parser.add_argument('--bastion-port', type=int, default=22, help='堡垒机SSH端口,默认22')
    
    # 目标主机参数
    parser.add_argument('--target-host', required=True, help='目标主机名或IP')
    parser.add_argument('--target-user', required=True, help='目标主机用户名')
    parser.add_argument('--target-key', help='目标主机私钥文件路径')
    parser.add_argument('--target-password', help='目标主机密码')
    parser.add_argument('--target-port', type=int, default=22, help='目标主机SSH端口,默认22')
    
    # 文件传输参数
    parser.add_argument('--local-path', required=True, help='要上传的本地文件或目录路径')
    parser.add_argument('--remote-path', required=True, help='目标主机上的文件或目录路径')
    parser.add_argument('--recursive', action='store_true', help='递归上传目录')
    
    args = parser.parse_args()
    
    # 创建文件传输工具
    transfer = BastionFileTransfer(
        bastion_host=args.bastion_host,
        bastion_user=args.bastion_user,
        bastion_key=args.bastion_key,
        bastion_password=args.bastion_password,
        bastion_port=args.bastion_port,
        target_host=args.target_host,
        target_user=args.target_user,
        target_key=args.target_key,
        target_password=args.target_password,
        target_port=args.target_port
    )
    
    try:
        # 连接到堡垒机
        if not transfer.connect_to_bastion():
            sys.exit(1)
        
        # 通过堡垒机连接到目标主机
        if not transfer.connect_to_target():
            sys.exit(1)
        
        # 上传文件或目录
        if os.path.isdir(args.local_path):
            if args.recursive:
                print(f"递归上传目录: {args.local_path} -> {args.remote_path}")
                if transfer.upload_directory(args.local_path, args.remote_path):
                    print("目录上传成功")
                else:
                    print("目录上传失败")
                    sys.exit(1)
            else:
                print("指定的本地路径是目录,但未指定--recursive参数")
                sys.exit(1)
        else:
            print(f"上传文件: {args.local_path} -> {args.remote_path}")
            if transfer.upload_file(args.local_path, args.remote_path):
                print("文件上传成功")
            else:
                print("文件上传失败")
                sys.exit(1)
        
    finally:
        # 关闭连接和清理临时文件
        transfer.close()

if __name__ == "__main__":
    main()

这个示例实现了以下功能:

通过堡垒机将文件上传到目标主机
支持单个文件上传和递归目录上传
显示详细的上传进度
使用临时目录作为中转,并在完成后自动清理
验证上传文件的完整性

使用方法示例:

bash

# 上传单个文件
python bastion_file_transfer.py --bastion-host bastion.example.com --bastion-user admin 
                               --target-host internal.example.com --target-user user 
                               --local-path /path/to/local/file.txt --remote-path /path/on/target/file.txt

# 递归上传目录
python bastion_file_transfer.py --bastion-host bastion.example.com --bastion-user admin 
                               --target-host internal.example.com --target-user user 
                               --local-path /path/to/local/dir --remote-path /path/on/target/dir 
                               --recursive

通过本章的详细介绍,您应该已经掌握了paramiko的基本原理和使用方法,包括SSHClient和SFTPClient等核心组件,以及它们在系统批量运维中的应用。相比于前一章介绍的pexpect,paramiko提供了更加稳定和高效的SSH操作能力,特别适合需要可靠文件传输和命令执行的场景。通过实现密钥登录、堡垒机模式下的命令执行和文件传输等实际案例,展示了paramiko在现代运维环境中的强大功能。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容