25.Python SSH 远程执行完全指南(SSHExecutor)

Python SSH 远程执行完全指南(SSHExecutor)

本文档面向零基础新手,目标是让你真正理解:

  • SSH 是什么,为什么需要用 Python 连接远程服务器
  • paramiko 库的安装与基础使用(Python SSH 的核心库)
  • 如何执行远程命令、读取返回结果、处理错误
  • 如何上传/下载文件(SFTP)
  • 如何封装一个好用的 SSHExecutor 类(含连接池、重试、超时)
  • fabric 库:更高层的 SSH 工具
  • asyncssh:异步 SSH 执行
  • 常见实际应用场景(批量部署、日志采集、自动化运维)
  • 常见错误与排查方法

配有大量可运行示例,全部从最基础讲起。


第一部分:SSH 基础知识

1.1 什么是 SSH?

SSH(Secure Shell,安全外壳协议) 是一种网络协议,用于在不安全的网络中安全地操控远程计算机。

日常类比:
  你的电脑 ─────────────────────────────► 远程服务器
  (本地)    SSH 加密隧道(端口22)        (Linux云主机)

  好比你用对讲机指挥远方的机器人工作,
  说的话经过加密,别人即使监听也看不懂。

SSH 能做什么:

✅ 在远程服务器上执行命令
✅ 上传/下载文件(SFTP/SCP)
✅ 建立端口转发(隧道)
✅ 运行脚本、启停服务
✅ 批量管理几十台甚至几千台服务器

1.2 为什么要用 Python 来操控 SSH?

手动 SSH 的痛点:
  ❌ 要登录10台服务器,就要手动操作10次
  ❌ 部署时容易漏步骤或出错
  ❌ 无法记录日志和自动化决策

Python SSH 的优势:
  ✅ 编写一次,批量执行到 N 台服务器
  ✅ 自动判断命令结果,失败时自动重试
  ✅ 与其他系统(数据库、API)无缝集成
  ✅ 完整的执行日志记录

1.3 Python SSH 的三种主流方式

① paramiko(最底层)
   - Python 原生 SSH 实现
   - 功能最全,控制力最强
   - 学习成本稍高,但是基础

② fabric(基于 paramiko 的高层封装)
   - API 简洁,适合部署脚本
   - 命令行工具友好

③ asyncssh(异步 SSH)
   - 基于 asyncio,适合高并发连接多台服务器
   - Python 3.5+ 支持

第二部分:paramiko 基础

2.1 安装

pip install paramiko

安装后验证:

import paramiko
print(paramiko.__version__)   # 如 3.4.0

2.2 最简单的 SSH 连接

import paramiko

# 第一步:创建 SSH 客户端对象
client = paramiko.SSHClient()

# 第二步:设置主机密钥策略
# AutoAddPolicy:自动接受陌生服务器的密钥(开发测试用,生产慎用)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# 第三步:建立连接(用密码认证)
client.connect(
    hostname='192.168.1.100',   # 服务器 IP 或域名
    port=22,                    # SSH 端口(默认22)
    username='root',            # 用户名
    password='your_password',   # 密码
    timeout=10                  # 连接超时秒数
)

# 第四步:执行命令
stdin, stdout, stderr = client.exec_command('ls -la /home')

# 第五步:读取结果
output = stdout.read().decode('utf-8')
error  = stderr.read().decode('utf-8')

print("命令输出:")
print(output)
if error:
    print("错误信息:")
    print(error)

# 第六步:关闭连接(重要!)
client.close()

2.3 用 SSH 密钥对认证(更安全)

密钥对认证比密码更安全,生产环境首选:

import paramiko

client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# 方式一:使用私钥文件路径
client.connect(
    hostname='192.168.1.100',
    username='ubuntu',
    key_filename='/home/user/.ssh/id_rsa',   # 私钥文件路径
    timeout=10
)

# 方式二:手动加载私钥
private_key = paramiko.RSAKey.from_private_key_file(
    '/home/user/.ssh/id_rsa',
    password='key_passphrase'   # 如果私钥有密码
)
client.connect(
    hostname='192.168.1.100',
    username='ubuntu',
    pkey=private_key,
    timeout=10
)

# 方式三:从字符串加载私钥(适合从环境变量/数据库读取)
import io
key_content = """-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEA...(私钥内容)
-----END RSA PRIVATE KEY-----"""

key_obj = paramiko.RSAKey.from_private_key(io.StringIO(key_content))
client.connect(
    hostname='192.168.1.100',
    username='ubuntu',
    pkey=key_obj
)

stdin, stdout, stderr = client.exec_command('whoami')
print(stdout.read().decode())   # ubuntu
client.close()

2.4 exec_command 详解

import paramiko

client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('192.168.1.100', username='root', password='password')

# exec_command 返回三个对象:
# stdin  - 标准输入(你可以写数据给命令)
# stdout - 标准输出(命令正常输出)
# stderr - 标准错误(命令报错输出)
stdin, stdout, stderr = client.exec_command('cat /etc/os-release')

# ===== 读取方式 =====

# 方式1:一次性读取全部(适合输出不太大的命令)
output = stdout.read().decode('utf-8')
print(output)

# 方式2:逐行读取(适合输出很多行的命令)
stdin2, stdout2, stderr2 = client.exec_command('ls /var/log')
for line in stdout2:
    print(line.strip())   # line 是 bytes,strip() 去掉换行符

# 方式3:读取退出码(判断命令是否成功)
stdin3, stdout3, stderr3 = client.exec_command('ping -c 1 google.com')
exit_code = stdout3.channel.recv_exit_status()   # 0 = 成功,非0 = 失败
print(f"退出码:{exit_code}")

# ===== 给命令输入数据 =====
# 例如:需要交互式输入的命令
stdin4, stdout4, stderr4 = client.exec_command('sudo cat /etc/shadow')
stdin4.write('sudo_passwordn')
stdin4.flush()
# ⚠️ 但对于 sudo 等交互命令,更推荐用 invoke_shell()

# ===== 设置命令超时 =====
stdin5, stdout5, stderr5 = client.exec_command(
    'sleep 100',
    timeout=5   # 5秒后超时,抛出 socket.timeout
)
try:
    stdout5.read()
except Exception as e:
    print(f"超时:{e}")

client.close()

2.5 invoke_shell——交互式 Shell

对于需要交互的复杂操作(如 sudo、数据库命令行、菜单程序),用 invoke_shell

import paramiko
import time

client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('192.168.1.100', username='root', password='password')

# 开启交互式 shell(像真实终端一样)
shell = client.invoke_shell()
time.sleep(0.5)   # 等待 shell 初始化

def send_command(shell, command, wait=1.0):
    """发送命令并等待输出"""
    shell.send(command + 'n')
    time.sleep(wait)
    output = b''
    while shell.recv_ready():
        output += shell.recv(4096)
    return output.decode('utf-8', errors='replace')

# 示例:用 sudo 执行命令
print(send_command(shell, 'sudo whoami', wait=0.5))
# 如果提示输入密码:
print(send_command(shell, 'your_sudo_password', wait=0.5))

# 多步操作:连接到 MySQL
print(send_command(shell, 'mysql -u root -ppassword', wait=1.0))
print(send_command(shell, 'show databases;', wait=0.5))
print(send_command(shell, 'exit', wait=0.5))

shell.close()
client.close()

第三部分:SFTP 文件传输

3.1 上传文件到服务器

import paramiko
import os

client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('192.168.1.100', username='root', password='password')

# 创建 SFTP 客户端
sftp = client.open_sftp()

# ===== 上传文件 =====

# 基础上传(本地路径 → 远程路径)
sftp.put('/local/path/config.json', '/remote/path/config.json')
print("上传完成!")

# 带进度条的上传
def upload_with_progress(sftp, local_path, remote_path):
    file_size = os.path.getsize(local_path)
    transferred = [0]   # 用列表存储,因为回调函数需要修改它

    def progress_callback(bytes_sent, total_bytes):
        transferred[0] = bytes_sent
        pct = bytes_sent / total_bytes * 100
        bar = "█" * int(pct / 2)
        print(f"r  上传进度:[{bar:<50}] {pct:.1f}%  "
              f"({bytes_sent/1024:.0f}KB / {total_bytes/1024:.0f}KB)", end='')

    sftp.put(local_path, remote_path, callback=progress_callback)
    print()   # 换行

upload_with_progress(sftp, '/local/bigfile.zip', '/remote/bigfile.zip')

# 上传整个目录
def upload_directory(sftp, local_dir, remote_dir):
    """递归上传本地目录到远程"""
    try:
        sftp.mkdir(remote_dir)
    except IOError:
        pass   # 目录已存在

    for item in os.listdir(local_dir):
        local_path  = os.path.join(local_dir, item)
        remote_path = remote_dir + '/' + item

        if os.path.isdir(local_path):
            upload_directory(sftp, local_path, remote_path)
        else:
            print(f"  上传:{local_path} → {remote_path}")
            sftp.put(local_path, remote_path)

sftp.close()
client.close()

3.2 从服务器下载文件

import paramiko

client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('192.168.1.100', username='root', password='password')

sftp = client.open_sftp()

# ===== 下载文件 =====
sftp.get('/remote/path/log.txt', '/local/path/log.txt')
print("下载完成!")

# 下载到内存(不写磁盘,直接处理内容)
import io
buffer = io.BytesIO()
sftp.getfo('/remote/path/config.json', buffer)
buffer.seek(0)

import json
config = json.loads(buffer.read().decode('utf-8'))
print(f"服务器配置:{config}")

# 列出远程目录内容
files = sftp.listdir('/var/log')
print(f"日志文件:{files}")

# 获取文件详情(类似 ls -la)
for attr in sftp.listdir_attr('/var/log'):
    print(f"  {attr.filename:30s}  大小:{attr.st_size:>10} bytes")

# 检查远程文件是否存在
def remote_file_exists(sftp, path):
    try:
        sftp.stat(path)
        return True
    except FileNotFoundError:
        return False

print(remote_file_exists(sftp, '/etc/passwd'))   # True
print(remote_file_exists(sftp, '/not/exist'))    # False

sftp.close()
client.close()

第四部分:封装 SSHExecutor 类

4.1 为什么要封装?

直接使用 paramiko 的问题:
  ❌ 每次执行都要写一堆重复代码(connect → exec → close)
  ❌ 没有自动重连机制
  ❌ 没有统一的错误处理和日志
  ❌ 多次调用时每次都新建连接,效率低

封装 SSHExecutor 的好处:
  ✅ 一行代码执行远程命令
  ✅ 自动管理连接生命周期
  ✅ 统一的错误处理
  ✅ 支持 with 语句自动关闭
  ✅ 自动重试失败的连接

4.2 基础版 SSHExecutor

import paramiko
import logging
from typing import Optional, Tuple

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)

class SSHExecutor:
    """
    SSH 远程命令执行器——基础版

    使用方式:
        # 方式1:with 语句(推荐)
        with SSHExecutor('192.168.1.100', 'root', password='xxx') as ssh:
            stdout, stderr, code = ssh.run('ls -la')
            print(stdout)

        # 方式2:手动管理
        ssh = SSHExecutor('192.168.1.100', 'root', password='xxx')
        ssh.connect()
        stdout, stderr, code = ssh.run('whoami')
        ssh.close()
    """

    def __init__(
        self,
        hostname: str,
        username: str,
        password: Optional[str] = None,
        key_filename: Optional[str] = None,
        port: int = 22,
        timeout: float = 10.0,
        default_timeout: float = 30.0
    ):
        self.hostname        = hostname
        self.username        = username
        self.password        = password
        self.key_filename    = key_filename
        self.port            = port
        self.timeout         = timeout         # 连接超时
        self.default_timeout = default_timeout # 命令执行默认超时

        self._client: Optional[paramiko.SSHClient] = None
        self._sftp:   Optional[paramiko.SFTPClient] = None

    # ==================== 连接管理 ====================

    def connect(self):
        """建立 SSH 连接"""
        if self._client and self._client.get_transport() and 
           self._client.get_transport().is_active():
            return   # 已经连接,不重复连接

        logger.info(f"正在连接 {self.username}@{self.hostname}:{self.port}...")

        self._client = paramiko.SSHClient()
        self._client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        connect_kwargs = {
            'hostname':     self.hostname,
            'port':         self.port,
            'username':     self.username,
            'timeout':      self.timeout,
            'allow_agent':  False,
            'look_for_keys': False,
        }

        if self.password:
            connect_kwargs['password'] = self.password
        if self.key_filename:
            connect_kwargs['key_filename'] = self.key_filename

        self._client.connect(**connect_kwargs)
        logger.info(f"✅ 连接成功:{self.hostname}")

    def close(self):
        """关闭连接"""
        if self._sftp:
            self._sftp.close()
            self._sftp = None
        if self._client:
            self._client.close()
            self._client = None
            logger.info(f"🔌 连接已关闭:{self.hostname}")

    def is_connected(self) -> bool:
        """检查连接是否有效"""
        if not self._client:
            return False
        transport = self._client.get_transport()
        return transport is not None and transport.is_active()

    # ==================== 命令执行 ====================

    def run(
        self,
        command: str,
        timeout: Optional[float] = None,
        get_pty: bool = False
    ) -> Tuple[str, str, int]:
        """
        执行远程命令

        参数:
            command  - 要执行的 Shell 命令
            timeout  - 超时时间(秒),None 使用默认超时
            get_pty  - 是否分配伪终端(sudo 等命令有时需要)

        返回:
            (stdout, stderr, exit_code)
            stdout    - 标准输出字符串
            stderr    - 标准错误字符串
            exit_code - 退出码(0 = 成功,非0 = 失败)
        """
        if not self.is_connected():
            self.connect()

        t = timeout if timeout is not None else self.default_timeout
        logger.debug(f"执行命令:{command}")

        stdin, stdout, stderr = self._client.exec_command(
            command,
            timeout=t,
            get_pty=get_pty
        )

        out  = stdout.read().decode('utf-8', errors='replace')
        err  = stderr.read().decode('utf-8', errors='replace')
        code = stdout.channel.recv_exit_status()

        logger.debug(f"退出码:{code}")
        return out, err, code

    def run_checked(self, command: str, **kwargs) -> str:
        """
        执行命令,失败时自动抛出异常(类似 subprocess.check_output)

        返回:stdout 字符串
        抛出:RuntimeError(命令失败时)
        """
        stdout, stderr, code = self.run(command, **kwargs)
        if code != 0:
            raise RuntimeError(
                f"命令执行失败(退出码 {code})n"
                f"命令:{command}n"
                f"错误:{stderr.strip()}"
            )
        return stdout

    def run_many(self, commands: list) -> list:
        """
        按顺序执行多条命令

        返回:[(stdout, stderr, code), ...] 列表
        """
        results = []
        for cmd in commands:
            result = self.run(cmd)
            results.append(result)
            if result[2] != 0:
                logger.warning(f"命令失败:{cmd}(退出码 {result[2]})")
        return results

    # ==================== 文件操作 ====================

    @property
    def sftp(self) -> paramiko.SFTPClient:
        """懒加载 SFTP 客户端"""
        if not self._sftp:
            if not self.is_connected():
                self.connect()
            self._sftp = self._client.open_sftp()
        return self._sftp

    def upload(self, local_path: str, remote_path: str, verbose: bool = False):
        """上传本地文件到远程服务器"""
        import os

        def _progress(sent, total):
            if verbose:
                pct = sent / total * 100
                print(f"r  上传 {os.path.basename(local_path)}:"
                      f"{pct:.1f}%", end='', flush=True)

        self.sftp.put(local_path, remote_path,
                      callback=_progress if verbose else None)
        if verbose:
            print()
        logger.info(f"📤 上传完成:{local_path} → {self.hostname}:{remote_path}")

    def download(self, remote_path: str, local_path: str):
        """从远程服务器下载文件"""
        self.sftp.get(remote_path, local_path)
        logger.info(f"📥 下载完成:{self.hostname}:{remote_path} → {local_path}")

    def read_remote_file(self, remote_path: str) -> str:
        """读取远程文件内容,返回字符串"""
        with self.sftp.open(remote_path, 'r') as f:
            return f.read().decode('utf-8')

    def write_remote_file(self, remote_path: str, content: str):
        """将字符串内容写入远程文件"""
        with self.sftp.open(remote_path, 'w') as f:
            f.write(content.encode('utf-8'))
        logger.info(f"✏️  写入远程文件:{self.hostname}:{remote_path}")

    def file_exists(self, remote_path: str) -> bool:
        """检查远程文件/目录是否存在"""
        try:
            self.sftp.stat(remote_path)
            return True
        except FileNotFoundError:
            return False

    # ==================== 上下文管理器 ====================

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()
        return False   # 不压制异常

    def __repr__(self):
        status = "已连接" if self.is_connected() else "未连接"
        return f"SSHExecutor({self.username}@{self.hostname}:{self.port}, {status})"

# ==================== 使用示例 ====================

if __name__ == '__main__':
    # 基础使用(with 语句)
    with SSHExecutor('192.168.1.100', 'root', password='password123') as ssh:

        # 执行简单命令
        out, err, code = ssh.run('uname -a')
        print(f"系统信息:{out.strip()}")

        # 执行并检查结果
        try:
            result = ssh.run_checked('python3 --version')
            print(f"Python 版本:{result.strip()}")
        except RuntimeError as e:
            print(f"错误:{e}")

        # 批量执行命令
        commands = [
            'df -h /',          # 磁盘使用
            'free -m',          # 内存使用
            'uptime',           # 系统运行时间
            'cat /proc/cpuinfo | grep "model name" | head -1'
        ]
        results = ssh.run_many(commands)
        labels  = ['磁盘', '内存', '运行时间', 'CPU']
        for label, (out, err, code) in zip(labels, results):
            print(f"[{label}] {out.strip()[:60]}")

        # 文件操作
        ssh.upload('/local/deploy.sh', '/tmp/deploy.sh')
        ssh.run_checked('chmod +x /tmp/deploy.sh')
        out, err, code = ssh.run('/tmp/deploy.sh')
        print(f"部署脚本输出:n{out}")

4.3 进阶版 SSHExecutor(带重试和连接池)

import paramiko
import time
import logging
import threading
from typing import Optional, List, Tuple, Dict
from dataclasses import dataclass, field
from contextlib import contextmanager

logger = logging.getLogger(__name__)

@dataclass
class SSHResult:
    """SSH 命令执行结果"""
    command:   str
    stdout:    str
    stderr:    str
    exit_code: int
    duration:  float   # 执行耗时(秒)

    @property
    def success(self) -> bool:
        return self.exit_code == 0

    @property
    def output(self) -> str:
        """stdout 不为空则返回 stdout,否则返回 stderr"""
        return self.stdout if self.stdout.strip() else self.stderr

    def __str__(self):
        status = "✅" if self.success else "❌"
        return (f"{status} [{self.exit_code}] {self.command!r} "
                f"({self.duration:.2f}s)")

    def raise_for_status(self):
        """失败时抛出异常"""
        if not self.success:
            raise RuntimeError(
                f"命令失败(退出码 {self.exit_code}):{self.command}n"
                f"错误输出:{self.stderr.strip()}"
            )

class SSHExecutorAdvanced:
    """
    SSH 远程命令执行器——进阶版

    新增特性:
      - 自动重试连接(retries 参数)
      - 结构化返回结果(SSHResult)
      - 连接状态检测与自动重连
      - 支持 sudo 命令
      - 批量并发执行
    """

    def __init__(
        self,
        hostname: str,
        username: str,
        password: Optional[str] = None,
        key_filename: Optional[str] = None,
        port: int = 22,
        timeout: float = 10.0,
        retries: int = 3,
        retry_delay: float = 2.0
    ):
        self.hostname    = hostname
        self.username    = username
        self.password    = password
        self.key_filename = key_filename
        self.port        = port
        self.timeout     = timeout
        self.retries     = retries
        self.retry_delay = retry_delay

        self._client: Optional[paramiko.SSHClient] = None
        self._lock   = threading.Lock()   # 线程锁

    def connect(self):
        """带重试的连接"""
        last_error = None

        for attempt in range(1, self.retries + 1):
            try:
                logger.info(f"连接 {self.hostname}(第{attempt}次尝试)...")
                client = paramiko.SSHClient()
                client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

                kwargs = {
                    'hostname':      self.hostname,
                    'port':          self.port,
                    'username':      self.username,
                    'timeout':       self.timeout,
                    'allow_agent':   False,
                    'look_for_keys': False,
                }
                if self.password:    kwargs['password']    = self.password
                if self.key_filename: kwargs['key_filename'] = self.key_filename

                client.connect(**kwargs)
                self._client = client
                logger.info(f"✅ 连接成功:{self.hostname}")
                return

            except paramiko.AuthenticationException as e:
                # 认证失败,不重试
                raise RuntimeError(f"认证失败:{e}") from e

            except Exception as e:
                last_error = e
                logger.warning(f"连接失败({e}),{self.retry_delay}秒后重试...")
                time.sleep(self.retry_delay)

        raise RuntimeError(
            f"连接 {self.hostname} 失败,已重试{self.retries}次:{last_error}"
        )

    def _ensure_connected(self):
        """确保连接有效,必要时重连"""
        with self._lock:
            if not self._is_active():
                logger.info("连接断开,正在重连...")
                self.connect()

    def _is_active(self) -> bool:
        if not self._client:
            return False
        t = self._client.get_transport()
        return t is not None and t.is_active()

    def run(
        self,
        command: str,
        timeout: float = 30.0,
        sudo: bool = False,
        sudo_password: Optional[str] = None,
        env: Optional[Dict[str, str]] = None
    ) -> SSHResult:
        """
        执行远程命令,返回 SSHResult 对象

        参数:
            command       - Shell 命令
            timeout       - 执行超时秒数
            sudo          - 是否用 sudo 执行
            sudo_password - sudo 密码(为空则尝试免密 sudo)
            env           - 附加环境变量字典
        """
        self._ensure_connected()

        # 构建最终命令
        final_cmd = command
        if env:
            env_str  = ' '.join(f'{k}={v}' for k, v in env.items())
            final_cmd = f'export {env_str} && {command}'

        if sudo:
            if sudo_password:
                # 通过 echo 管道传递密码(避免交互式提示)
                final_cmd = f"echo '{sudo_password}' | sudo -S {final_cmd}"
            else:
                final_cmd = f"sudo {final_cmd}"

        start = time.time()
        try:
            stdin, stdout, stderr = self._client.exec_command(
                final_cmd, timeout=timeout
            )
            out  = stdout.read().decode('utf-8', errors='replace')
            err  = stderr.read().decode('utf-8', errors='replace')
            code = stdout.channel.recv_exit_status()
        except Exception as e:
            raise RuntimeError(f"命令执行异常:{command}n{e}") from e

        duration = time.time() - start
        result   = SSHResult(command, out, err, code, duration)
        logger.debug(str(result))
        return result

    def run_script(self, script: str, **kwargs) -> SSHResult:
        """
        执行多行 Shell 脚本

        将脚本上传到临时文件,然后执行
        """
        remote_path = f'/tmp/_ssh_script_{int(time.time())}.sh'
        try:
            sftp = self._client.open_sftp()
            with sftp.open(remote_path, 'w') as f:
                f.write(script.encode('utf-8'))
            sftp.chmod(remote_path, 0o755)
            sftp.close()

            return self.run(f'bash {remote_path}', **kwargs)
        finally:
            # 清理临时文件
            try:
                self.run(f'rm -f {remote_path}', timeout=5)
            except Exception:
                pass

    def close(self):
        if self._client:
            self._client.close()
            self._client = None

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, *args):
        self.close()

    def __repr__(self):
        status = "已连接" if self._is_active() else "未连接"
        return f"SSHExecutorAdvanced({self.username}@{self.hostname}, {status})"

# ==================== 使用示例 ====================

def demo_advanced():
    with SSHExecutorAdvanced(
        '192.168.1.100',
        username='ubuntu',
        key_filename='/home/user/.ssh/id_rsa',
        retries=3
    ) as ssh:

        # 执行命令并使用结构化结果
        result = ssh.run('df -h /')
        print(f"命令结果:{result}")
        print(f"是否成功:{result.success}")
        print(f"耗时:{result.duration:.2f}秒")
        print(f"输出:{result.stdout}")

        # 失败时自动抛出异常
        try:
            ssh.run('cat /nonexistent/file').raise_for_status()
        except RuntimeError as e:
            print(f"捕获错误:{e}")

        # 执行多行脚本
        script = """
#!/bin/bash
echo "开始系统检查..."
echo "主机名:$(hostname)"
echo "磁盘:$(df -h / | tail -1)"
echo "内存:$(free -m | grep Mem)"
echo "负载:$(uptime | awk '{print $NF}')"
echo "检查完成!"
"""
        result = ssh.run_script(script)
        print(f"n系统检查结果:n{result.stdout}")

        # 使用 sudo 执行
        result = ssh.run(
            'cat /etc/shadow',
            sudo=True,
            sudo_password='sudo_pass'
        )
        print(f"sudo 结果:{result}")

        # 设置环境变量
        result = ssh.run(
            'echo $MY_VAR',
            env={'MY_VAR': 'Hello from Python!'}
        )
        print(f"环境变量:{result.stdout.strip()}")

第五部分:批量管理多台服务器

5.1 串行执行(简单版)

import paramiko
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict

# 定义服务器列表
SERVERS = [
    {'hostname': '192.168.1.100', 'username': 'root', 'password': 'pass1'},
    {'hostname': '192.168.1.101', 'username': 'root', 'password': 'pass2'},
    {'hostname': '192.168.1.102', 'username': 'root', 'password': 'pass3'},
    {'hostname': '192.168.1.103', 'username': 'ubuntu', 'key_filename': '/home/user/.ssh/id_rsa'},
]

def run_on_server(server_info: dict, command: str) -> dict:
    """在单台服务器上执行命令,返回结果字典"""
    host = server_info['hostname']
    try:
        with SSHExecutor(**server_info) as ssh:
            out, err, code = ssh.run(command, timeout=15)
            return {
                'host':    host,
                'success': code == 0,
                'output':  out.strip(),
                'error':   err.strip(),
                'code':    code
            }
    except Exception as e:
        return {
            'host':    host,
            'success': False,
            'output':  '',
            'error':   str(e),
            'code':    -1
        }

# 串行执行
def run_on_all_serial(servers: list, command: str):
    """串行在所有服务器上执行命令(一台接一台)"""
    results = []
    for server in servers:
        print(f"  正在执行:{server['hostname']}...")
        result = run_on_server(server, command)
        results.append(result)
    return results

print("=== 串行执行 ===")
results = run_on_all_serial(SERVERS, 'uptime')
for r in results:
    status = "✅" if r['success'] else "❌"
    print(f"{status} {r['host']:20s}: {r['output'][:50]}")

5.2 并发执行(推荐)

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def run_on_all_parallel(servers: list, command: str, max_workers: int = 10):
    """并发在所有服务器上执行命令"""
    results  = []
    start    = time.time()

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交所有任务
        future_to_server = {
            executor.submit(run_on_server, server, command): server
            for server in servers
        }

        # 按完成顺序获取结果
        for future in as_completed(future_to_server):
            server = future_to_server[future]
            try:
                result = future.result(timeout=30)
            except Exception as e:
                result = {
                    'host':    server['hostname'],
                    'success': False,
                    'output':  '',
                    'error':   str(e),
                    'code':    -1
                }
            results.append(result)

    elapsed = time.time() - start
    print(f"n并发执行完毕,共{len(servers)}台,耗时{elapsed:.2f}秒")
    return results

# 统计执行结果
def print_summary(results: list):
    succeeded = [r for r in results if r['success']]
    failed    = [r for r in results if not r['success']]

    print(f"n{'='*50}")
    print(f"执行摘要:成功 {len(succeeded)}/{len(results)} 台")
    print(f"{'='*50}")

    if succeeded:
        print(f"n✅ 成功({len(succeeded)}台):")
        for r in succeeded:
            print(f"   {r['host']:20s}: {r['output'][:60]}")

    if failed:
        print(f"n❌ 失败({len(failed)}台):")
        for r in failed:
            print(f"   {r['host']:20s}: {r['error'][:60]}")

# 使用示例
print("=== 并发执行(检查各服务器磁盘)===")
results = run_on_all_parallel(SERVERS, "df -h / | tail -1 | awk '{print $5}'")
print_summary(results)

5.3 实战:批量部署脚本

import paramiko
from concurrent.futures import ThreadPoolExecutor
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')

DEPLOY_SCRIPT = """#!/bin/bash
set -e   # 任何命令失败立即退出

echo "[1/4] 拉取最新代码..."
cd /opt/myapp
git pull origin main

echo "[2/4] 安装依赖..."
pip3 install -r requirements.txt -q

echo "[3/4] 数据库迁移..."
python3 manage.py migrate --no-input

echo "[4/4] 重启服务..."
systemctl restart myapp

echo "部署完成!版本:$(git rev-parse --short HEAD)"
"""

def deploy_to_server(server: dict) -> dict:
    """部署到单台服务器"""
    host = server['hostname']
    logging.info(f"开始部署到 {host}...")

    try:
        with SSHExecutorAdvanced(**server) as ssh:
            # 先检查服务状态
            check = ssh.run('systemctl is-active myapp')
            was_running = check.stdout.strip() == 'active'

            # 执行部署脚本
            result = ssh.run_script(DEPLOY_SCRIPT, timeout=120)

            if result.success:
                logging.info(f"✅ {host} 部署成功")
                return {'host': host, 'success': True, 'output': result.stdout}
            else:
                # 部署失败,如果之前在运行就回滚
                if was_running:
                    logging.warning(f"⚠️ {host} 部署失败,执行回滚...")
                    ssh.run('cd /opt/myapp && git reset --hard HEAD~1')
                    ssh.run('systemctl restart myapp')
                return {'host': host, 'success': False, 'error': result.stderr}

    except Exception as e:
        logging.error(f"❌ {host} 部署异常:{e}")
        return {'host': host, 'success': False, 'error': str(e)}

# 并发部署到所有服务器
production_servers = [
    {'hostname': f'10.0.0.{i}', 'username': 'deploy',
     'key_filename': '/home/ci/.ssh/deploy_key'}
    for i in range(1, 6)   # 5台生产服务器
]

with ThreadPoolExecutor(max_workers=3) as executor:   # 每次最多部署3台
    futures = [executor.submit(deploy_to_server, s) for s in production_servers]
    results = [f.result() for f in futures]

success_count = sum(1 for r in results if r['success'])
print(f"n部署完成:{success_count}/{len(production_servers)} 台成功")

第六部分:fabric——更优雅的 SSH 工具

6.1 安装

pip install fabric

6.2 fabric 基础使用

from fabric import Connection, Config

# 基础连接
conn = Connection(
    host='192.168.1.100',
    user='ubuntu',
    connect_kwargs={'password': 'mypassword'}
)

# 执行命令(result 对象更丰富)
result = conn.run('uname -a', hide=True)   # hide=True 不打印到控制台
print(f"输出:{result.stdout.strip()}")
print(f"退出码:{result.return_code}")
print(f"是否成功:{result.ok}")

# 执行 sudo 命令
# 方式1:通过 Config 配置 sudo 密码
config = Config(overrides={'sudo': {'password': 'sudo_pass'}})
conn2  = Connection('192.168.1.100', user='ubuntu', config=config,
                    connect_kwargs={'password': 'mypassword'})
conn2.sudo('apt-get update', hide=True)

# 方式2:直接传密码
conn.sudo('systemctl restart nginx', password='sudo_pass')

# 文件上传/下载
conn.put('/local/config.json', '/remote/config.json')   # 上传
conn.get('/remote/logs/app.log', '/local/app.log')      # 下载

# 关闭连接
conn.close()

6.3 fabric 上下文管理器

from fabric import Connection

# with 语句自动管理连接
with Connection('192.168.1.100', user='root',
                connect_kwargs={'password': 'password'}) as conn:

    # 改变远程工作目录
    with conn.cd('/opt/myapp'):
        conn.run('git status')
        conn.run('ls -la')

    # 在本地执行命令
    conn.local('echo "本地命令:$(pwd)"')

    # 传输文件并设置权限
    conn.put('/local/script.sh', '/tmp/script.sh')
    conn.run('chmod +x /tmp/script.sh && /tmp/script.sh')

6.4 fabric 批量管理

from fabric import Connection, SerialGroup, ThreadingGroup

servers = ['web1.example.com', 'web2.example.com', 'web3.example.com']

# 串行执行(一台接一台)
group_serial = SerialGroup(
    *servers,
    user='ubuntu',
    connect_kwargs={'key_filename': '/home/user/.ssh/id_rsa'}
)
group_serial.run('uptime')

# 并发执行(所有服务器同时执行)
group_parallel = ThreadingGroup(
    *servers,
    user='ubuntu',
    connect_kwargs={'key_filename': '/home/user/.ssh/id_rsa'}
)
results = group_parallel.run('df -h /', hide=True)

for conn, result in results.items():
    print(f"{conn.host}: {result.stdout.strip().splitlines()[-1]}")

第七部分:asyncssh——异步高并发 SSH

7.1 安装

pip install asyncssh

7.2 asyncssh 基础使用

import asyncio
import asyncssh

async def run_command_async(host, username, password, command):
    """异步执行单条命令"""
    async with asyncssh.connect(
        host,
        username=username,
        password=password,
        known_hosts=None   # 开发环境不验证主机密钥
    ) as conn:
        result = await conn.run(command)
        return {
            'host':   host,
            'stdout': result.stdout,
            'stderr': result.stderr,
            'code':   result.exit_status
        }

async def run_on_many_servers(servers, command):
    """并发在多台服务器上执行命令"""
    tasks = [
        run_command_async(s['host'], s['username'], s['password'], command)
        for s in servers
    ]
    # gather 并发执行所有任务
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

# 运行
servers = [
    {'host': '192.168.1.100', 'username': 'root', 'password': 'pass1'},
    {'host': '192.168.1.101', 'username': 'root', 'password': 'pass2'},
    {'host': '192.168.1.102', 'username': 'root', 'password': 'pass3'},
]

async def main():
    print("并发检查所有服务器内存...")
    results = await run_on_many_servers(servers, 'free -m | grep Mem')

    for r in results:
        if isinstance(r, Exception):
            print(f"❌ 异常:{r}")
        else:
            print(f"✅ {r['host']}: {r['stdout'].strip()}")

asyncio.run(main())

7.3 asyncssh 带限速的批量执行

import asyncio
import asyncssh

async def run_with_semaphore(semaphore, host, command, conn_kwargs):
    """用信号量限制并发数"""
    async with semaphore:
        try:
            async with asyncssh.connect(host, **conn_kwargs) as conn:
                result = await conn.run(command)
                return host, result.stdout.strip(), None
        except Exception as e:
            return host, None, str(e)

async def batch_execute(hosts: list, command: str, max_concurrent: int = 50):
    """
    批量并发执行,最多同时连接 max_concurrent 台服务器
    """
    semaphore   = asyncio.Semaphore(max_concurrent)
    conn_kwargs = {
        'username':    'ubuntu',
        'known_hosts': None,
        'client_keys': ['/home/user/.ssh/id_rsa']
    }

    tasks = [
        run_with_semaphore(semaphore, host, command, conn_kwargs)
        for host in hosts
    ]

    results = await asyncio.gather(*tasks)
    return results

# 100台服务器并发检查,最多同时50个连接
all_hosts = [f'10.0.0.{i}' for i in range(1, 101)]

async def main():
    import time
    start = time.time()

    results = await batch_execute(all_hosts, 'uptime', max_concurrent=50)

    elapsed   = time.time() - start
    successes = sum(1 for _, out, err in results if err is None)
    print(f"完成:{successes}/{len(all_hosts)} 台成功,耗时 {elapsed:.1f}秒")

asyncio.run(main())

第八部分:常见实际应用

8.1 自动化日志采集

from datetime import datetime, timedelta
import os

def collect_logs(servers: list, log_path: str, local_dir: str, date: str = None):
    """
    从多台服务器采集指定日期的日志文件

    参数:
        servers   - 服务器列表
        log_path  - 远程日志路径(支持 {date} 占位符)
        local_dir - 本地保存目录
        date      - 日期字符串(默认昨天)
    """
    if date is None:
        date = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')

    os.makedirs(local_dir, exist_ok=True)

    for server in servers:
        host        = server['hostname']
        remote_path = log_path.format(date=date)
        local_path  = os.path.join(local_dir, f"{host}_{date}_app.log")

        print(f"采集 {host}:{remote_path} ...")
        try:
            with SSHExecutor(**server) as ssh:
                # 先检查文件是否存在
                if not ssh.file_exists(remote_path):
                    print(f"  ⚠️ 文件不存在:{remote_path}")
                    continue

                # 获取文件大小
                out, _, _ = ssh.run(f'du -sh {remote_path}')
                size      = out.split()[0]

                # 下载
                ssh.download(remote_path, local_path)
                print(f"  ✅ 已保存:{local_path}({size})")

        except Exception as e:
            print(f"  ❌ 采集失败:{e}")

# 采集最近7天的日志
for i in range(7):
    date = (datetime.now() - timedelta(days=i)).strftime('%Y%m%d')
    collect_logs(
        servers=SERVERS,
        log_path='/var/log/myapp/app_{date}.log',
        local_dir='/backup/logs/',
        date=date
    )

8.2 服务器健康检查

from dataclasses import dataclass
from typing import List
import time

@dataclass
class HealthStatus:
    host:        str
    reachable:   bool
    cpu_usage:   float = 0.0
    mem_usage:   float = 0.0
    disk_usage:  float = 0.0
    load_avg:    str   = ""
    error:       str   = ""

    def is_healthy(self) -> bool:
        return (self.reachable and
                self.cpu_usage  < 90 and
                self.mem_usage  < 90 and
                self.disk_usage < 85)

    def __str__(self):
        if not self.reachable:
            return f"❌ {self.host:20s} 不可达:{self.error}"
        status = "✅" if self.is_healthy() else "⚠️"
        return (f"{status} {self.host:20s}  "
                f"CPU:{self.cpu_usage:5.1f}%  "
                f"内存:{self.mem_usage:5.1f}%  "
                f"磁盘:{self.disk_usage:5.1f}%  "
                f"负载:{self.load_avg}")

def check_server_health(server: dict) -> HealthStatus:
    host   = server['hostname']
    status = HealthStatus(host=host, reachable=False)

    try:
        with SSHExecutor(**server, timeout=5) as ssh:
            status.reachable = True

            # CPU 使用率
            out, _, _ = ssh.run(
                "top -bn1 | grep 'Cpu(s)' | awk '{print $2+$4}'",
                timeout=5
            )
            try:
                status.cpu_usage = float(out.strip())
            except ValueError:
                pass

            # 内存使用率
            out, _, _ = ssh.run(
                "free | awk '/Mem:/{printf "%.1f", ($3/$2)*100}'",
                timeout=5
            )
            try:
                status.mem_usage = float(out.strip())
            except ValueError:
                pass

            # 磁盘使用率(根目录)
            out, _, _ = ssh.run(
                "df / | awk 'NR==2{print $5}' | tr -d '%'",
                timeout=5
            )
            try:
                status.disk_usage = float(out.strip())
            except ValueError:
                pass

            # 系统负载
            out, _, _ = ssh.run("uptime | awk -F'load average:' '{print $2}'", timeout=5)
            status.load_avg = out.strip()

    except Exception as e:
        status.error = str(e)

    return status

# 健康检查主函数
from concurrent.futures import ThreadPoolExecutor

def health_check_all(servers: list):
    print(f"{'='*70}")
    print(f"  服务器健康检查  共{len(servers)}台  {time.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"{'='*70}")

    with ThreadPoolExecutor(max_workers=20) as executor:
        statuses = list(executor.map(check_server_health, servers))

    for s in statuses:
        print(s)

    unhealthy = [s for s in statuses if not s.is_healthy()]
    print(f"n{'='*70}")
    print(f"健康:{len(statuses)-len(unhealthy)}/{len(statuses)} 台")
    if unhealthy:
        print(f"⚠️  需要关注:{', '.join(s.host for s in unhealthy)}")

health_check_all(SERVERS)

第九部分:常见错误与排查

9.1 错误速查表

import paramiko
import socket

def safe_connect_demo():
    """演示各类错误的捕获和处理"""
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    try:
        client.connect('192.168.1.100', username='root', password='wrong', timeout=5)

    except paramiko.AuthenticationException:
        # 用户名或密码错误;或者服务器不允许密码登录
        print("❌ 认证失败:用户名/密码不对,或服务器禁用了密码登录")
        print("   解决方法:检查用户名密码;或改用密钥认证")

    except paramiko.SSHException as e:
        # SSH 协议层面的错误(如服务器版本不兼容)
        print(f"❌ SSH 错误:{e}")

    except socket.timeout:
        # 连接超时(服务器不可达、防火墙拦截)
        print("❌ 连接超时:服务器无响应")
        print("   解决方法:检查 IP/端口、防火墙规则、网络连通性")

    except ConnectionRefusedError:
        # 端口关闭(SSH 服务未启动)
        print("❌ 连接被拒绝:目标端口未开放")
        print("   解决方法:检查 sshd 是否运行(systemctl status sshd)")

    except socket.gaierror:
        # DNS 解析失败(域名不存在)
        print("❌ 域名解析失败")
        print("   解决方法:检查主机名是否正确")

    except OSError as e:
        # 网络不可达等系统级错误
        print(f"❌ 网络错误:{e}")

    finally:
        client.close()

9.2 主机密钥警告问题

import paramiko

# 方式1:AutoAddPolicy(开发用,不验证)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# ⚠️ 安全风险:可能受到中间人攻击

# 方式2:RejectPolicy(最安全,默认)
client.set_missing_host_key_policy(paramiko.RejectPolicy())
# 需要先把服务器公钥加到 known_hosts

# 方式3:加载系统 known_hosts(推荐生产)
client.load_system_host_keys()   # 加载 ~/.ssh/known_hosts
client.set_missing_host_key_policy(paramiko.RejectPolicy())

# 方式4:手动指定受信任的主机密钥
host_key = paramiko.RSAKey(data=bytes.fromhex('AABB...'))
client.get_host_keys().add('192.168.1.100', 'ssh-rsa', host_key)

9.3 编码问题

# 问题:服务器输出包含中文或特殊字符,decode 失败

# ❌ 可能报错
# output = stdout.read().decode('utf-8')

# ✅ 安全解码:遇到无法解码的字节用替换符
output = stdout.read().decode('utf-8', errors='replace')

# ✅ 也可以先尝试 utf-8,失败则用 latin-1
raw = stdout.read()
try:
    output = raw.decode('utf-8')
except UnicodeDecodeError:
    output = raw.decode('latin-1')

# ✅ 服务器端指定语言环境(最根本的解决方案)
stdin, stdout, stderr = client.exec_command(
    'LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8 your_command'
)

9.4 调试技巧

import paramiko
import logging

# 开启 paramiko 详细日志(调试连接问题很有用)
logging.basicConfig()
logging.getLogger("paramiko").setLevel(logging.DEBUG)

# 测试连接的简单脚本
def test_connection(host, port=22, username='root', password=None, key_file=None):
    """快速测试 SSH 连接是否正常"""
    import socket

    print(f"测试连接:{username}@{host}:{port}")

    # 1. 先测试 TCP 端口
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(5)
    try:
        sock.connect((host, port))
        print(f"  ✅ TCP 端口 {port} 可达")
    except Exception as e:
        print(f"  ❌ TCP 连接失败:{e}")
        return False
    finally:
        sock.close()

    # 2. 测试 SSH 认证
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        kwargs = {'hostname': host, 'port': port, 'username': username, 'timeout': 5}
        if password:   kwargs['password']    = password
        if key_file:   kwargs['key_filename'] = key_file
        client.connect(**kwargs)
        print(f"  ✅ SSH 认证成功")

        # 3. 测试命令执行
        _, out, _ = client.exec_command('echo hello')
        print(f"  ✅ 命令执行正常:{out.read().decode().strip()}")
        return True

    except paramiko.AuthenticationException:
        print(f"  ❌ 认证失败")
    except Exception as e:
        print(f"  ❌ 连接失败:{e}")
    finally:
        client.close()

    return False

test_connection('192.168.1.100', username='root', password='password')

第十部分:完整速查表

10.1 paramiko 核心 API

📌 连接管理
  SSHClient()                        → 创建客户端
  .set_missing_host_key_policy(...)  → 设置主机密钥策略
  .load_system_host_keys()           → 加载系统 known_hosts
  .connect(hostname, port, ...)      → 建立连接
  .close()                           → 关闭连接
  .get_transport().is_active()       → 检查连接是否活跃

📌 命令执行
  .exec_command(cmd, timeout=...)    → 执行命令,返回 (stdin, stdout, stderr)
  stdout.read()                      → 读取全部输出(bytes)
  stdout.channel.recv_exit_status()  → 获取退出码
  .invoke_shell()                    → 开启交互式 shell

📌 文件传输(SFTP)
  .open_sftp()                       → 创建 SFTP 客户端
  sftp.put(local, remote)            → 上传文件
  sftp.get(remote, local)            → 下载文件
  sftp.open(path, 'r')               → 打开远程文件
  sftp.listdir(path)                 → 列出目录
  sftp.listdir_attr(path)            → 列出目录(含详情)
  sftp.mkdir(path)                   → 创建目录
  sftp.stat(path)                    → 获取文件信息
  sftp.chmod(path, mode)             → 修改权限

📌 密钥认证
  paramiko.RSAKey.from_private_key_file(path) → 加载 RSA 私钥
  paramiko.Ed25519Key.from_private_key_file() → 加载 Ed25519 私钥
  paramiko.AutoAddPolicy()           → 自动接受新主机(开发用)
  paramiko.RejectPolicy()            → 拒绝未知主机(生产用)

10.2 方案选型建议

你的需求是...

简单执行几条命令?
  → paramiko(直接用,几行代码搞定)

写部署/运维脚本?
  → fabric(API 简洁,代码可读性好)

批量管理几十~几百台服务器?
  → SSHExecutor + ThreadPoolExecutor(多线程并发)

批量管理几百~几千台服务器?
  → asyncssh + asyncio(异步高并发)

需要可重试、自动重连的生产级工具?
  → 自己封装 SSHExecutor 类(本文第四部分)

企业级服务器管理?
  → Ansible(基于 paramiko 的专业自动化工具)

总结

学完本章,你应该掌握:

  1. SSH 基础概念:SSH 协议的作用、Python 需要 SSH 的场景
  2. paramiko 基础:连接(密码/密钥)、执行命令、读取结果、关闭连接
  3. exec_command 详解:stdout/stderr/exit_code 的读取,超时设置
  4. SFTP 文件传输:upload/download/read/write/exists 操作
  5. SSHExecutor 封装:用类封装常用操作,支持 with 语句、重试、结构化结果
  6. 批量并发执行:ThreadPoolExecutor 并发管理多台服务器
  7. fabric:更高层的 SSH 工具,适合部署脚本
  8. asyncssh:异步 SSH,适合超大规模并发
  9. 常见错误处理:各类连接/认证/编码错误的识别和解决
  10. 实战案例:日志采集、健康检查、批量部署的完整示例

发表评论