Python SSH 远程执行完全指南(SSHExecutor)
本文档面向零基础新手,目标是让你真正理解:
- SSH 是什么,为什么需要用 Python 连接远程服务器
paramiko库的安装与基础使用(Python SSH 的核心库)- 如何执行远程命令、读取返回结果、处理错误
- 如何上传/下载文件(SFTP)
- 如何封装一个好用的
SSHExecutor类(含连接池、重试、超时) fabric库:更高层的 SSH 工具asyncssh:异步 SSH 执行- 常见实际应用场景(批量部署、日志采集、自动化运维)
- 常见错误与排查方法
配有大量可运行示例,全部从最基础讲起。
第一部分:SSH 基础知识
1.1 什么是 SSH?
SSH(Secure Shell,安全外壳协议) 是一种网络协议,用于在不安全的网络中安全地操控远程计算机。
日常类比:
你的电脑 ─────────────────────────────► 远程服务器
(本地) SSH 加密隧道(端口22) (Linux云主机)
好比你用对讲机指挥远方的机器人工作,
说的话经过加密,别人即使监听也看不懂。
SSH 能做什么:
✅ 在远程服务器上执行命令
✅ 上传/下载文件(SFTP/SCP)
✅ 建立端口转发(隧道)
✅ 运行脚本、启停服务
✅ 批量管理几十台甚至几千台服务器
1.2 为什么要用 Python 来操控 SSH?
手动 SSH 的痛点:
❌ 要登录10台服务器,就要手动操作10次
❌ 部署时容易漏步骤或出错
❌ 无法记录日志和自动化决策
Python SSH 的优势:
✅ 编写一次,批量执行到 N 台服务器
✅ 自动判断命令结果,失败时自动重试
✅ 与其他系统(数据库、API)无缝集成
✅ 完整的执行日志记录
1.3 Python SSH 的三种主流方式
① paramiko(最底层)
- Python 原生 SSH 实现
- 功能最全,控制力最强
- 学习成本稍高,但是基础
② fabric(基于 paramiko 的高层封装)
- API 简洁,适合部署脚本
- 命令行工具友好
③ asyncssh(异步 SSH)
- 基于 asyncio,适合高并发连接多台服务器
- Python 3.5+ 支持
第二部分:paramiko 基础
2.1 安装
pip install paramiko
安装后验证:
import paramiko
print(paramiko.__version__) # 如 3.4.0
2.2 最简单的 SSH 连接
import paramiko
# 第一步:创建 SSH 客户端对象
client = paramiko.SSHClient()
# 第二步:设置主机密钥策略
# AutoAddPolicy:自动接受陌生服务器的密钥(开发测试用,生产慎用)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 第三步:建立连接(用密码认证)
client.connect(
hostname='192.168.1.100', # 服务器 IP 或域名
port=22, # SSH 端口(默认22)
username='root', # 用户名
password='your_password', # 密码
timeout=10 # 连接超时秒数
)
# 第四步:执行命令
stdin, stdout, stderr = client.exec_command('ls -la /home')
# 第五步:读取结果
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
print("命令输出:")
print(output)
if error:
print("错误信息:")
print(error)
# 第六步:关闭连接(重要!)
client.close()
2.3 用 SSH 密钥对认证(更安全)
密钥对认证比密码更安全,生产环境首选:
import paramiko
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 方式一:使用私钥文件路径
client.connect(
hostname='192.168.1.100',
username='ubuntu',
key_filename='/home/user/.ssh/id_rsa', # 私钥文件路径
timeout=10
)
# 方式二:手动加载私钥
private_key = paramiko.RSAKey.from_private_key_file(
'/home/user/.ssh/id_rsa',
password='key_passphrase' # 如果私钥有密码
)
client.connect(
hostname='192.168.1.100',
username='ubuntu',
pkey=private_key,
timeout=10
)
# 方式三:从字符串加载私钥(适合从环境变量/数据库读取)
import io
key_content = """-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEA...(私钥内容)
-----END RSA PRIVATE KEY-----"""
key_obj = paramiko.RSAKey.from_private_key(io.StringIO(key_content))
client.connect(
hostname='192.168.1.100',
username='ubuntu',
pkey=key_obj
)
stdin, stdout, stderr = client.exec_command('whoami')
print(stdout.read().decode()) # ubuntu
client.close()
2.4 exec_command 详解
import paramiko
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('192.168.1.100', username='root', password='password')
# exec_command 返回三个对象:
# stdin - 标准输入(你可以写数据给命令)
# stdout - 标准输出(命令正常输出)
# stderr - 标准错误(命令报错输出)
stdin, stdout, stderr = client.exec_command('cat /etc/os-release')
# ===== 读取方式 =====
# 方式1:一次性读取全部(适合输出不太大的命令)
output = stdout.read().decode('utf-8')
print(output)
# 方式2:逐行读取(适合输出很多行的命令)
stdin2, stdout2, stderr2 = client.exec_command('ls /var/log')
for line in stdout2:
print(line.strip()) # line 是 bytes,strip() 去掉换行符
# 方式3:读取退出码(判断命令是否成功)
stdin3, stdout3, stderr3 = client.exec_command('ping -c 1 google.com')
exit_code = stdout3.channel.recv_exit_status() # 0 = 成功,非0 = 失败
print(f"退出码:{exit_code}")
# ===== 给命令输入数据 =====
# 例如:需要交互式输入的命令
stdin4, stdout4, stderr4 = client.exec_command('sudo cat /etc/shadow')
stdin4.write('sudo_passwordn')
stdin4.flush()
# ⚠️ 但对于 sudo 等交互命令,更推荐用 invoke_shell()
# ===== 设置命令超时 =====
stdin5, stdout5, stderr5 = client.exec_command(
'sleep 100',
timeout=5 # 5秒后超时,抛出 socket.timeout
)
try:
stdout5.read()
except Exception as e:
print(f"超时:{e}")
client.close()
2.5 invoke_shell——交互式 Shell
对于需要交互的复杂操作(如 sudo、数据库命令行、菜单程序),用 invoke_shell:
import paramiko
import time
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('192.168.1.100', username='root', password='password')
# 开启交互式 shell(像真实终端一样)
shell = client.invoke_shell()
time.sleep(0.5) # 等待 shell 初始化
def send_command(shell, command, wait=1.0):
"""发送命令并等待输出"""
shell.send(command + 'n')
time.sleep(wait)
output = b''
while shell.recv_ready():
output += shell.recv(4096)
return output.decode('utf-8', errors='replace')
# 示例:用 sudo 执行命令
print(send_command(shell, 'sudo whoami', wait=0.5))
# 如果提示输入密码:
print(send_command(shell, 'your_sudo_password', wait=0.5))
# 多步操作:连接到 MySQL
print(send_command(shell, 'mysql -u root -ppassword', wait=1.0))
print(send_command(shell, 'show databases;', wait=0.5))
print(send_command(shell, 'exit', wait=0.5))
shell.close()
client.close()
第三部分:SFTP 文件传输
3.1 上传文件到服务器
import paramiko
import os
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('192.168.1.100', username='root', password='password')
# 创建 SFTP 客户端
sftp = client.open_sftp()
# ===== 上传文件 =====
# 基础上传(本地路径 → 远程路径)
sftp.put('/local/path/config.json', '/remote/path/config.json')
print("上传完成!")
# 带进度条的上传
def upload_with_progress(sftp, local_path, remote_path):
file_size = os.path.getsize(local_path)
transferred = [0] # 用列表存储,因为回调函数需要修改它
def progress_callback(bytes_sent, total_bytes):
transferred[0] = bytes_sent
pct = bytes_sent / total_bytes * 100
bar = "█" * int(pct / 2)
print(f"r 上传进度:[{bar:<50}] {pct:.1f}% "
f"({bytes_sent/1024:.0f}KB / {total_bytes/1024:.0f}KB)", end='')
sftp.put(local_path, remote_path, callback=progress_callback)
print() # 换行
upload_with_progress(sftp, '/local/bigfile.zip', '/remote/bigfile.zip')
# 上传整个目录
def upload_directory(sftp, local_dir, remote_dir):
"""递归上传本地目录到远程"""
try:
sftp.mkdir(remote_dir)
except IOError:
pass # 目录已存在
for item in os.listdir(local_dir):
local_path = os.path.join(local_dir, item)
remote_path = remote_dir + '/' + item
if os.path.isdir(local_path):
upload_directory(sftp, local_path, remote_path)
else:
print(f" 上传:{local_path} → {remote_path}")
sftp.put(local_path, remote_path)
sftp.close()
client.close()
3.2 从服务器下载文件
import paramiko
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('192.168.1.100', username='root', password='password')
sftp = client.open_sftp()
# ===== 下载文件 =====
sftp.get('/remote/path/log.txt', '/local/path/log.txt')
print("下载完成!")
# 下载到内存(不写磁盘,直接处理内容)
import io
buffer = io.BytesIO()
sftp.getfo('/remote/path/config.json', buffer)
buffer.seek(0)
import json
config = json.loads(buffer.read().decode('utf-8'))
print(f"服务器配置:{config}")
# 列出远程目录内容
files = sftp.listdir('/var/log')
print(f"日志文件:{files}")
# 获取文件详情(类似 ls -la)
for attr in sftp.listdir_attr('/var/log'):
print(f" {attr.filename:30s} 大小:{attr.st_size:>10} bytes")
# 检查远程文件是否存在
def remote_file_exists(sftp, path):
try:
sftp.stat(path)
return True
except FileNotFoundError:
return False
print(remote_file_exists(sftp, '/etc/passwd')) # True
print(remote_file_exists(sftp, '/not/exist')) # False
sftp.close()
client.close()
第四部分:封装 SSHExecutor 类
4.1 为什么要封装?
直接使用 paramiko 的问题:
❌ 每次执行都要写一堆重复代码(connect → exec → close)
❌ 没有自动重连机制
❌ 没有统一的错误处理和日志
❌ 多次调用时每次都新建连接,效率低
封装 SSHExecutor 的好处:
✅ 一行代码执行远程命令
✅ 自动管理连接生命周期
✅ 统一的错误处理
✅ 支持 with 语句自动关闭
✅ 自动重试失败的连接
4.2 基础版 SSHExecutor
import paramiko
import logging
from typing import Optional, Tuple
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)
class SSHExecutor:
"""
SSH 远程命令执行器——基础版
使用方式:
# 方式1:with 语句(推荐)
with SSHExecutor('192.168.1.100', 'root', password='xxx') as ssh:
stdout, stderr, code = ssh.run('ls -la')
print(stdout)
# 方式2:手动管理
ssh = SSHExecutor('192.168.1.100', 'root', password='xxx')
ssh.connect()
stdout, stderr, code = ssh.run('whoami')
ssh.close()
"""
def __init__(
self,
hostname: str,
username: str,
password: Optional[str] = None,
key_filename: Optional[str] = None,
port: int = 22,
timeout: float = 10.0,
default_timeout: float = 30.0
):
self.hostname = hostname
self.username = username
self.password = password
self.key_filename = key_filename
self.port = port
self.timeout = timeout # 连接超时
self.default_timeout = default_timeout # 命令执行默认超时
self._client: Optional[paramiko.SSHClient] = None
self._sftp: Optional[paramiko.SFTPClient] = None
# ==================== 连接管理 ====================
def connect(self):
"""建立 SSH 连接"""
if self._client and self._client.get_transport() and
self._client.get_transport().is_active():
return # 已经连接,不重复连接
logger.info(f"正在连接 {self.username}@{self.hostname}:{self.port}...")
self._client = paramiko.SSHClient()
self._client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
connect_kwargs = {
'hostname': self.hostname,
'port': self.port,
'username': self.username,
'timeout': self.timeout,
'allow_agent': False,
'look_for_keys': False,
}
if self.password:
connect_kwargs['password'] = self.password
if self.key_filename:
connect_kwargs['key_filename'] = self.key_filename
self._client.connect(**connect_kwargs)
logger.info(f"✅ 连接成功:{self.hostname}")
def close(self):
"""关闭连接"""
if self._sftp:
self._sftp.close()
self._sftp = None
if self._client:
self._client.close()
self._client = None
logger.info(f"🔌 连接已关闭:{self.hostname}")
def is_connected(self) -> bool:
"""检查连接是否有效"""
if not self._client:
return False
transport = self._client.get_transport()
return transport is not None and transport.is_active()
# ==================== 命令执行 ====================
def run(
self,
command: str,
timeout: Optional[float] = None,
get_pty: bool = False
) -> Tuple[str, str, int]:
"""
执行远程命令
参数:
command - 要执行的 Shell 命令
timeout - 超时时间(秒),None 使用默认超时
get_pty - 是否分配伪终端(sudo 等命令有时需要)
返回:
(stdout, stderr, exit_code)
stdout - 标准输出字符串
stderr - 标准错误字符串
exit_code - 退出码(0 = 成功,非0 = 失败)
"""
if not self.is_connected():
self.connect()
t = timeout if timeout is not None else self.default_timeout
logger.debug(f"执行命令:{command}")
stdin, stdout, stderr = self._client.exec_command(
command,
timeout=t,
get_pty=get_pty
)
out = stdout.read().decode('utf-8', errors='replace')
err = stderr.read().decode('utf-8', errors='replace')
code = stdout.channel.recv_exit_status()
logger.debug(f"退出码:{code}")
return out, err, code
def run_checked(self, command: str, **kwargs) -> str:
"""
执行命令,失败时自动抛出异常(类似 subprocess.check_output)
返回:stdout 字符串
抛出:RuntimeError(命令失败时)
"""
stdout, stderr, code = self.run(command, **kwargs)
if code != 0:
raise RuntimeError(
f"命令执行失败(退出码 {code})n"
f"命令:{command}n"
f"错误:{stderr.strip()}"
)
return stdout
def run_many(self, commands: list) -> list:
"""
按顺序执行多条命令
返回:[(stdout, stderr, code), ...] 列表
"""
results = []
for cmd in commands:
result = self.run(cmd)
results.append(result)
if result[2] != 0:
logger.warning(f"命令失败:{cmd}(退出码 {result[2]})")
return results
# ==================== 文件操作 ====================
@property
def sftp(self) -> paramiko.SFTPClient:
"""懒加载 SFTP 客户端"""
if not self._sftp:
if not self.is_connected():
self.connect()
self._sftp = self._client.open_sftp()
return self._sftp
def upload(self, local_path: str, remote_path: str, verbose: bool = False):
"""上传本地文件到远程服务器"""
import os
def _progress(sent, total):
if verbose:
pct = sent / total * 100
print(f"r 上传 {os.path.basename(local_path)}:"
f"{pct:.1f}%", end='', flush=True)
self.sftp.put(local_path, remote_path,
callback=_progress if verbose else None)
if verbose:
print()
logger.info(f"📤 上传完成:{local_path} → {self.hostname}:{remote_path}")
def download(self, remote_path: str, local_path: str):
"""从远程服务器下载文件"""
self.sftp.get(remote_path, local_path)
logger.info(f"📥 下载完成:{self.hostname}:{remote_path} → {local_path}")
def read_remote_file(self, remote_path: str) -> str:
"""读取远程文件内容,返回字符串"""
with self.sftp.open(remote_path, 'r') as f:
return f.read().decode('utf-8')
def write_remote_file(self, remote_path: str, content: str):
"""将字符串内容写入远程文件"""
with self.sftp.open(remote_path, 'w') as f:
f.write(content.encode('utf-8'))
logger.info(f"✏️ 写入远程文件:{self.hostname}:{remote_path}")
def file_exists(self, remote_path: str) -> bool:
"""检查远程文件/目录是否存在"""
try:
self.sftp.stat(remote_path)
return True
except FileNotFoundError:
return False
# ==================== 上下文管理器 ====================
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False # 不压制异常
def __repr__(self):
status = "已连接" if self.is_connected() else "未连接"
return f"SSHExecutor({self.username}@{self.hostname}:{self.port}, {status})"
# ==================== 使用示例 ====================
if __name__ == '__main__':
# 基础使用(with 语句)
with SSHExecutor('192.168.1.100', 'root', password='password123') as ssh:
# 执行简单命令
out, err, code = ssh.run('uname -a')
print(f"系统信息:{out.strip()}")
# 执行并检查结果
try:
result = ssh.run_checked('python3 --version')
print(f"Python 版本:{result.strip()}")
except RuntimeError as e:
print(f"错误:{e}")
# 批量执行命令
commands = [
'df -h /', # 磁盘使用
'free -m', # 内存使用
'uptime', # 系统运行时间
'cat /proc/cpuinfo | grep "model name" | head -1'
]
results = ssh.run_many(commands)
labels = ['磁盘', '内存', '运行时间', 'CPU']
for label, (out, err, code) in zip(labels, results):
print(f"[{label}] {out.strip()[:60]}")
# 文件操作
ssh.upload('/local/deploy.sh', '/tmp/deploy.sh')
ssh.run_checked('chmod +x /tmp/deploy.sh')
out, err, code = ssh.run('/tmp/deploy.sh')
print(f"部署脚本输出:n{out}")
4.3 进阶版 SSHExecutor(带重试和连接池)
import paramiko
import time
import logging
import threading
from typing import Optional, List, Tuple, Dict
from dataclasses import dataclass, field
from contextlib import contextmanager
logger = logging.getLogger(__name__)
@dataclass
class SSHResult:
"""SSH 命令执行结果"""
command: str
stdout: str
stderr: str
exit_code: int
duration: float # 执行耗时(秒)
@property
def success(self) -> bool:
return self.exit_code == 0
@property
def output(self) -> str:
"""stdout 不为空则返回 stdout,否则返回 stderr"""
return self.stdout if self.stdout.strip() else self.stderr
def __str__(self):
status = "✅" if self.success else "❌"
return (f"{status} [{self.exit_code}] {self.command!r} "
f"({self.duration:.2f}s)")
def raise_for_status(self):
"""失败时抛出异常"""
if not self.success:
raise RuntimeError(
f"命令失败(退出码 {self.exit_code}):{self.command}n"
f"错误输出:{self.stderr.strip()}"
)
class SSHExecutorAdvanced:
"""
SSH 远程命令执行器——进阶版
新增特性:
- 自动重试连接(retries 参数)
- 结构化返回结果(SSHResult)
- 连接状态检测与自动重连
- 支持 sudo 命令
- 批量并发执行
"""
def __init__(
self,
hostname: str,
username: str,
password: Optional[str] = None,
key_filename: Optional[str] = None,
port: int = 22,
timeout: float = 10.0,
retries: int = 3,
retry_delay: float = 2.0
):
self.hostname = hostname
self.username = username
self.password = password
self.key_filename = key_filename
self.port = port
self.timeout = timeout
self.retries = retries
self.retry_delay = retry_delay
self._client: Optional[paramiko.SSHClient] = None
self._lock = threading.Lock() # 线程锁
def connect(self):
"""带重试的连接"""
last_error = None
for attempt in range(1, self.retries + 1):
try:
logger.info(f"连接 {self.hostname}(第{attempt}次尝试)...")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
kwargs = {
'hostname': self.hostname,
'port': self.port,
'username': self.username,
'timeout': self.timeout,
'allow_agent': False,
'look_for_keys': False,
}
if self.password: kwargs['password'] = self.password
if self.key_filename: kwargs['key_filename'] = self.key_filename
client.connect(**kwargs)
self._client = client
logger.info(f"✅ 连接成功:{self.hostname}")
return
except paramiko.AuthenticationException as e:
# 认证失败,不重试
raise RuntimeError(f"认证失败:{e}") from e
except Exception as e:
last_error = e
logger.warning(f"连接失败({e}),{self.retry_delay}秒后重试...")
time.sleep(self.retry_delay)
raise RuntimeError(
f"连接 {self.hostname} 失败,已重试{self.retries}次:{last_error}"
)
def _ensure_connected(self):
"""确保连接有效,必要时重连"""
with self._lock:
if not self._is_active():
logger.info("连接断开,正在重连...")
self.connect()
def _is_active(self) -> bool:
if not self._client:
return False
t = self._client.get_transport()
return t is not None and t.is_active()
def run(
self,
command: str,
timeout: float = 30.0,
sudo: bool = False,
sudo_password: Optional[str] = None,
env: Optional[Dict[str, str]] = None
) -> SSHResult:
"""
执行远程命令,返回 SSHResult 对象
参数:
command - Shell 命令
timeout - 执行超时秒数
sudo - 是否用 sudo 执行
sudo_password - sudo 密码(为空则尝试免密 sudo)
env - 附加环境变量字典
"""
self._ensure_connected()
# 构建最终命令
final_cmd = command
if env:
env_str = ' '.join(f'{k}={v}' for k, v in env.items())
final_cmd = f'export {env_str} && {command}'
if sudo:
if sudo_password:
# 通过 echo 管道传递密码(避免交互式提示)
final_cmd = f"echo '{sudo_password}' | sudo -S {final_cmd}"
else:
final_cmd = f"sudo {final_cmd}"
start = time.time()
try:
stdin, stdout, stderr = self._client.exec_command(
final_cmd, timeout=timeout
)
out = stdout.read().decode('utf-8', errors='replace')
err = stderr.read().decode('utf-8', errors='replace')
code = stdout.channel.recv_exit_status()
except Exception as e:
raise RuntimeError(f"命令执行异常:{command}n{e}") from e
duration = time.time() - start
result = SSHResult(command, out, err, code, duration)
logger.debug(str(result))
return result
def run_script(self, script: str, **kwargs) -> SSHResult:
"""
执行多行 Shell 脚本
将脚本上传到临时文件,然后执行
"""
remote_path = f'/tmp/_ssh_script_{int(time.time())}.sh'
try:
sftp = self._client.open_sftp()
with sftp.open(remote_path, 'w') as f:
f.write(script.encode('utf-8'))
sftp.chmod(remote_path, 0o755)
sftp.close()
return self.run(f'bash {remote_path}', **kwargs)
finally:
# 清理临时文件
try:
self.run(f'rm -f {remote_path}', timeout=5)
except Exception:
pass
def close(self):
if self._client:
self._client.close()
self._client = None
def __enter__(self):
self.connect()
return self
def __exit__(self, *args):
self.close()
def __repr__(self):
status = "已连接" if self._is_active() else "未连接"
return f"SSHExecutorAdvanced({self.username}@{self.hostname}, {status})"
# ==================== 使用示例 ====================
def demo_advanced():
with SSHExecutorAdvanced(
'192.168.1.100',
username='ubuntu',
key_filename='/home/user/.ssh/id_rsa',
retries=3
) as ssh:
# 执行命令并使用结构化结果
result = ssh.run('df -h /')
print(f"命令结果:{result}")
print(f"是否成功:{result.success}")
print(f"耗时:{result.duration:.2f}秒")
print(f"输出:{result.stdout}")
# 失败时自动抛出异常
try:
ssh.run('cat /nonexistent/file').raise_for_status()
except RuntimeError as e:
print(f"捕获错误:{e}")
# 执行多行脚本
script = """
#!/bin/bash
echo "开始系统检查..."
echo "主机名:$(hostname)"
echo "磁盘:$(df -h / | tail -1)"
echo "内存:$(free -m | grep Mem)"
echo "负载:$(uptime | awk '{print $NF}')"
echo "检查完成!"
"""
result = ssh.run_script(script)
print(f"n系统检查结果:n{result.stdout}")
# 使用 sudo 执行
result = ssh.run(
'cat /etc/shadow',
sudo=True,
sudo_password='sudo_pass'
)
print(f"sudo 结果:{result}")
# 设置环境变量
result = ssh.run(
'echo $MY_VAR',
env={'MY_VAR': 'Hello from Python!'}
)
print(f"环境变量:{result.stdout.strip()}")
第五部分:批量管理多台服务器
5.1 串行执行(简单版)
import paramiko
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
# 定义服务器列表
SERVERS = [
{'hostname': '192.168.1.100', 'username': 'root', 'password': 'pass1'},
{'hostname': '192.168.1.101', 'username': 'root', 'password': 'pass2'},
{'hostname': '192.168.1.102', 'username': 'root', 'password': 'pass3'},
{'hostname': '192.168.1.103', 'username': 'ubuntu', 'key_filename': '/home/user/.ssh/id_rsa'},
]
def run_on_server(server_info: dict, command: str) -> dict:
"""在单台服务器上执行命令,返回结果字典"""
host = server_info['hostname']
try:
with SSHExecutor(**server_info) as ssh:
out, err, code = ssh.run(command, timeout=15)
return {
'host': host,
'success': code == 0,
'output': out.strip(),
'error': err.strip(),
'code': code
}
except Exception as e:
return {
'host': host,
'success': False,
'output': '',
'error': str(e),
'code': -1
}
# 串行执行
def run_on_all_serial(servers: list, command: str):
"""串行在所有服务器上执行命令(一台接一台)"""
results = []
for server in servers:
print(f" 正在执行:{server['hostname']}...")
result = run_on_server(server, command)
results.append(result)
return results
print("=== 串行执行 ===")
results = run_on_all_serial(SERVERS, 'uptime')
for r in results:
status = "✅" if r['success'] else "❌"
print(f"{status} {r['host']:20s}: {r['output'][:50]}")
5.2 并发执行(推荐)
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def run_on_all_parallel(servers: list, command: str, max_workers: int = 10):
"""并发在所有服务器上执行命令"""
results = []
start = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_server = {
executor.submit(run_on_server, server, command): server
for server in servers
}
# 按完成顺序获取结果
for future in as_completed(future_to_server):
server = future_to_server[future]
try:
result = future.result(timeout=30)
except Exception as e:
result = {
'host': server['hostname'],
'success': False,
'output': '',
'error': str(e),
'code': -1
}
results.append(result)
elapsed = time.time() - start
print(f"n并发执行完毕,共{len(servers)}台,耗时{elapsed:.2f}秒")
return results
# 统计执行结果
def print_summary(results: list):
succeeded = [r for r in results if r['success']]
failed = [r for r in results if not r['success']]
print(f"n{'='*50}")
print(f"执行摘要:成功 {len(succeeded)}/{len(results)} 台")
print(f"{'='*50}")
if succeeded:
print(f"n✅ 成功({len(succeeded)}台):")
for r in succeeded:
print(f" {r['host']:20s}: {r['output'][:60]}")
if failed:
print(f"n❌ 失败({len(failed)}台):")
for r in failed:
print(f" {r['host']:20s}: {r['error'][:60]}")
# 使用示例
print("=== 并发执行(检查各服务器磁盘)===")
results = run_on_all_parallel(SERVERS, "df -h / | tail -1 | awk '{print $5}'")
print_summary(results)
5.3 实战:批量部署脚本
import paramiko
from concurrent.futures import ThreadPoolExecutor
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
DEPLOY_SCRIPT = """#!/bin/bash
set -e # 任何命令失败立即退出
echo "[1/4] 拉取最新代码..."
cd /opt/myapp
git pull origin main
echo "[2/4] 安装依赖..."
pip3 install -r requirements.txt -q
echo "[3/4] 数据库迁移..."
python3 manage.py migrate --no-input
echo "[4/4] 重启服务..."
systemctl restart myapp
echo "部署完成!版本:$(git rev-parse --short HEAD)"
"""
def deploy_to_server(server: dict) -> dict:
"""部署到单台服务器"""
host = server['hostname']
logging.info(f"开始部署到 {host}...")
try:
with SSHExecutorAdvanced(**server) as ssh:
# 先检查服务状态
check = ssh.run('systemctl is-active myapp')
was_running = check.stdout.strip() == 'active'
# 执行部署脚本
result = ssh.run_script(DEPLOY_SCRIPT, timeout=120)
if result.success:
logging.info(f"✅ {host} 部署成功")
return {'host': host, 'success': True, 'output': result.stdout}
else:
# 部署失败,如果之前在运行就回滚
if was_running:
logging.warning(f"⚠️ {host} 部署失败,执行回滚...")
ssh.run('cd /opt/myapp && git reset --hard HEAD~1')
ssh.run('systemctl restart myapp')
return {'host': host, 'success': False, 'error': result.stderr}
except Exception as e:
logging.error(f"❌ {host} 部署异常:{e}")
return {'host': host, 'success': False, 'error': str(e)}
# 并发部署到所有服务器
production_servers = [
{'hostname': f'10.0.0.{i}', 'username': 'deploy',
'key_filename': '/home/ci/.ssh/deploy_key'}
for i in range(1, 6) # 5台生产服务器
]
with ThreadPoolExecutor(max_workers=3) as executor: # 每次最多部署3台
futures = [executor.submit(deploy_to_server, s) for s in production_servers]
results = [f.result() for f in futures]
success_count = sum(1 for r in results if r['success'])
print(f"n部署完成:{success_count}/{len(production_servers)} 台成功")
第六部分:fabric——更优雅的 SSH 工具
6.1 安装
pip install fabric
6.2 fabric 基础使用
from fabric import Connection, Config
# 基础连接
conn = Connection(
host='192.168.1.100',
user='ubuntu',
connect_kwargs={'password': 'mypassword'}
)
# 执行命令(result 对象更丰富)
result = conn.run('uname -a', hide=True) # hide=True 不打印到控制台
print(f"输出:{result.stdout.strip()}")
print(f"退出码:{result.return_code}")
print(f"是否成功:{result.ok}")
# 执行 sudo 命令
# 方式1:通过 Config 配置 sudo 密码
config = Config(overrides={'sudo': {'password': 'sudo_pass'}})
conn2 = Connection('192.168.1.100', user='ubuntu', config=config,
connect_kwargs={'password': 'mypassword'})
conn2.sudo('apt-get update', hide=True)
# 方式2:直接传密码
conn.sudo('systemctl restart nginx', password='sudo_pass')
# 文件上传/下载
conn.put('/local/config.json', '/remote/config.json') # 上传
conn.get('/remote/logs/app.log', '/local/app.log') # 下载
# 关闭连接
conn.close()
6.3 fabric 上下文管理器
from fabric import Connection
# with 语句自动管理连接
with Connection('192.168.1.100', user='root',
connect_kwargs={'password': 'password'}) as conn:
# 改变远程工作目录
with conn.cd('/opt/myapp'):
conn.run('git status')
conn.run('ls -la')
# 在本地执行命令
conn.local('echo "本地命令:$(pwd)"')
# 传输文件并设置权限
conn.put('/local/script.sh', '/tmp/script.sh')
conn.run('chmod +x /tmp/script.sh && /tmp/script.sh')
6.4 fabric 批量管理
from fabric import Connection, SerialGroup, ThreadingGroup
servers = ['web1.example.com', 'web2.example.com', 'web3.example.com']
# 串行执行(一台接一台)
group_serial = SerialGroup(
*servers,
user='ubuntu',
connect_kwargs={'key_filename': '/home/user/.ssh/id_rsa'}
)
group_serial.run('uptime')
# 并发执行(所有服务器同时执行)
group_parallel = ThreadingGroup(
*servers,
user='ubuntu',
connect_kwargs={'key_filename': '/home/user/.ssh/id_rsa'}
)
results = group_parallel.run('df -h /', hide=True)
for conn, result in results.items():
print(f"{conn.host}: {result.stdout.strip().splitlines()[-1]}")
第七部分:asyncssh——异步高并发 SSH
7.1 安装
pip install asyncssh
7.2 asyncssh 基础使用
import asyncio
import asyncssh
async def run_command_async(host, username, password, command):
"""异步执行单条命令"""
async with asyncssh.connect(
host,
username=username,
password=password,
known_hosts=None # 开发环境不验证主机密钥
) as conn:
result = await conn.run(command)
return {
'host': host,
'stdout': result.stdout,
'stderr': result.stderr,
'code': result.exit_status
}
async def run_on_many_servers(servers, command):
"""并发在多台服务器上执行命令"""
tasks = [
run_command_async(s['host'], s['username'], s['password'], command)
for s in servers
]
# gather 并发执行所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# 运行
servers = [
{'host': '192.168.1.100', 'username': 'root', 'password': 'pass1'},
{'host': '192.168.1.101', 'username': 'root', 'password': 'pass2'},
{'host': '192.168.1.102', 'username': 'root', 'password': 'pass3'},
]
async def main():
print("并发检查所有服务器内存...")
results = await run_on_many_servers(servers, 'free -m | grep Mem')
for r in results:
if isinstance(r, Exception):
print(f"❌ 异常:{r}")
else:
print(f"✅ {r['host']}: {r['stdout'].strip()}")
asyncio.run(main())
7.3 asyncssh 带限速的批量执行
import asyncio
import asyncssh
async def run_with_semaphore(semaphore, host, command, conn_kwargs):
"""用信号量限制并发数"""
async with semaphore:
try:
async with asyncssh.connect(host, **conn_kwargs) as conn:
result = await conn.run(command)
return host, result.stdout.strip(), None
except Exception as e:
return host, None, str(e)
async def batch_execute(hosts: list, command: str, max_concurrent: int = 50):
"""
批量并发执行,最多同时连接 max_concurrent 台服务器
"""
semaphore = asyncio.Semaphore(max_concurrent)
conn_kwargs = {
'username': 'ubuntu',
'known_hosts': None,
'client_keys': ['/home/user/.ssh/id_rsa']
}
tasks = [
run_with_semaphore(semaphore, host, command, conn_kwargs)
for host in hosts
]
results = await asyncio.gather(*tasks)
return results
# 100台服务器并发检查,最多同时50个连接
all_hosts = [f'10.0.0.{i}' for i in range(1, 101)]
async def main():
import time
start = time.time()
results = await batch_execute(all_hosts, 'uptime', max_concurrent=50)
elapsed = time.time() - start
successes = sum(1 for _, out, err in results if err is None)
print(f"完成:{successes}/{len(all_hosts)} 台成功,耗时 {elapsed:.1f}秒")
asyncio.run(main())
第八部分:常见实际应用
8.1 自动化日志采集
from datetime import datetime, timedelta
import os
def collect_logs(servers: list, log_path: str, local_dir: str, date: str = None):
"""
从多台服务器采集指定日期的日志文件
参数:
servers - 服务器列表
log_path - 远程日志路径(支持 {date} 占位符)
local_dir - 本地保存目录
date - 日期字符串(默认昨天)
"""
if date is None:
date = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')
os.makedirs(local_dir, exist_ok=True)
for server in servers:
host = server['hostname']
remote_path = log_path.format(date=date)
local_path = os.path.join(local_dir, f"{host}_{date}_app.log")
print(f"采集 {host}:{remote_path} ...")
try:
with SSHExecutor(**server) as ssh:
# 先检查文件是否存在
if not ssh.file_exists(remote_path):
print(f" ⚠️ 文件不存在:{remote_path}")
continue
# 获取文件大小
out, _, _ = ssh.run(f'du -sh {remote_path}')
size = out.split()[0]
# 下载
ssh.download(remote_path, local_path)
print(f" ✅ 已保存:{local_path}({size})")
except Exception as e:
print(f" ❌ 采集失败:{e}")
# 采集最近7天的日志
for i in range(7):
date = (datetime.now() - timedelta(days=i)).strftime('%Y%m%d')
collect_logs(
servers=SERVERS,
log_path='/var/log/myapp/app_{date}.log',
local_dir='/backup/logs/',
date=date
)
8.2 服务器健康检查
from dataclasses import dataclass
from typing import List
import time
@dataclass
class HealthStatus:
host: str
reachable: bool
cpu_usage: float = 0.0
mem_usage: float = 0.0
disk_usage: float = 0.0
load_avg: str = ""
error: str = ""
def is_healthy(self) -> bool:
return (self.reachable and
self.cpu_usage < 90 and
self.mem_usage < 90 and
self.disk_usage < 85)
def __str__(self):
if not self.reachable:
return f"❌ {self.host:20s} 不可达:{self.error}"
status = "✅" if self.is_healthy() else "⚠️"
return (f"{status} {self.host:20s} "
f"CPU:{self.cpu_usage:5.1f}% "
f"内存:{self.mem_usage:5.1f}% "
f"磁盘:{self.disk_usage:5.1f}% "
f"负载:{self.load_avg}")
def check_server_health(server: dict) -> HealthStatus:
host = server['hostname']
status = HealthStatus(host=host, reachable=False)
try:
with SSHExecutor(**server, timeout=5) as ssh:
status.reachable = True
# CPU 使用率
out, _, _ = ssh.run(
"top -bn1 | grep 'Cpu(s)' | awk '{print $2+$4}'",
timeout=5
)
try:
status.cpu_usage = float(out.strip())
except ValueError:
pass
# 内存使用率
out, _, _ = ssh.run(
"free | awk '/Mem:/{printf "%.1f", ($3/$2)*100}'",
timeout=5
)
try:
status.mem_usage = float(out.strip())
except ValueError:
pass
# 磁盘使用率(根目录)
out, _, _ = ssh.run(
"df / | awk 'NR==2{print $5}' | tr -d '%'",
timeout=5
)
try:
status.disk_usage = float(out.strip())
except ValueError:
pass
# 系统负载
out, _, _ = ssh.run("uptime | awk -F'load average:' '{print $2}'", timeout=5)
status.load_avg = out.strip()
except Exception as e:
status.error = str(e)
return status
# 健康检查主函数
from concurrent.futures import ThreadPoolExecutor
def health_check_all(servers: list):
print(f"{'='*70}")
print(f" 服务器健康检查 共{len(servers)}台 {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'='*70}")
with ThreadPoolExecutor(max_workers=20) as executor:
statuses = list(executor.map(check_server_health, servers))
for s in statuses:
print(s)
unhealthy = [s for s in statuses if not s.is_healthy()]
print(f"n{'='*70}")
print(f"健康:{len(statuses)-len(unhealthy)}/{len(statuses)} 台")
if unhealthy:
print(f"⚠️ 需要关注:{', '.join(s.host for s in unhealthy)}")
health_check_all(SERVERS)
第九部分:常见错误与排查
9.1 错误速查表
import paramiko
import socket
def safe_connect_demo():
"""演示各类错误的捕获和处理"""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect('192.168.1.100', username='root', password='wrong', timeout=5)
except paramiko.AuthenticationException:
# 用户名或密码错误;或者服务器不允许密码登录
print("❌ 认证失败:用户名/密码不对,或服务器禁用了密码登录")
print(" 解决方法:检查用户名密码;或改用密钥认证")
except paramiko.SSHException as e:
# SSH 协议层面的错误(如服务器版本不兼容)
print(f"❌ SSH 错误:{e}")
except socket.timeout:
# 连接超时(服务器不可达、防火墙拦截)
print("❌ 连接超时:服务器无响应")
print(" 解决方法:检查 IP/端口、防火墙规则、网络连通性")
except ConnectionRefusedError:
# 端口关闭(SSH 服务未启动)
print("❌ 连接被拒绝:目标端口未开放")
print(" 解决方法:检查 sshd 是否运行(systemctl status sshd)")
except socket.gaierror:
# DNS 解析失败(域名不存在)
print("❌ 域名解析失败")
print(" 解决方法:检查主机名是否正确")
except OSError as e:
# 网络不可达等系统级错误
print(f"❌ 网络错误:{e}")
finally:
client.close()
9.2 主机密钥警告问题
import paramiko
# 方式1:AutoAddPolicy(开发用,不验证)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# ⚠️ 安全风险:可能受到中间人攻击
# 方式2:RejectPolicy(最安全,默认)
client.set_missing_host_key_policy(paramiko.RejectPolicy())
# 需要先把服务器公钥加到 known_hosts
# 方式3:加载系统 known_hosts(推荐生产)
client.load_system_host_keys() # 加载 ~/.ssh/known_hosts
client.set_missing_host_key_policy(paramiko.RejectPolicy())
# 方式4:手动指定受信任的主机密钥
host_key = paramiko.RSAKey(data=bytes.fromhex('AABB...'))
client.get_host_keys().add('192.168.1.100', 'ssh-rsa', host_key)
9.3 编码问题
# 问题:服务器输出包含中文或特殊字符,decode 失败
# ❌ 可能报错
# output = stdout.read().decode('utf-8')
# ✅ 安全解码:遇到无法解码的字节用替换符
output = stdout.read().decode('utf-8', errors='replace')
# ✅ 也可以先尝试 utf-8,失败则用 latin-1
raw = stdout.read()
try:
output = raw.decode('utf-8')
except UnicodeDecodeError:
output = raw.decode('latin-1')
# ✅ 服务器端指定语言环境(最根本的解决方案)
stdin, stdout, stderr = client.exec_command(
'LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8 your_command'
)
9.4 调试技巧
import paramiko
import logging
# 开启 paramiko 详细日志(调试连接问题很有用)
logging.basicConfig()
logging.getLogger("paramiko").setLevel(logging.DEBUG)
# 测试连接的简单脚本
def test_connection(host, port=22, username='root', password=None, key_file=None):
"""快速测试 SSH 连接是否正常"""
import socket
print(f"测试连接:{username}@{host}:{port}")
# 1. 先测试 TCP 端口
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
try:
sock.connect((host, port))
print(f" ✅ TCP 端口 {port} 可达")
except Exception as e:
print(f" ❌ TCP 连接失败:{e}")
return False
finally:
sock.close()
# 2. 测试 SSH 认证
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
kwargs = {'hostname': host, 'port': port, 'username': username, 'timeout': 5}
if password: kwargs['password'] = password
if key_file: kwargs['key_filename'] = key_file
client.connect(**kwargs)
print(f" ✅ SSH 认证成功")
# 3. 测试命令执行
_, out, _ = client.exec_command('echo hello')
print(f" ✅ 命令执行正常:{out.read().decode().strip()}")
return True
except paramiko.AuthenticationException:
print(f" ❌ 认证失败")
except Exception as e:
print(f" ❌ 连接失败:{e}")
finally:
client.close()
return False
test_connection('192.168.1.100', username='root', password='password')
第十部分:完整速查表
10.1 paramiko 核心 API
📌 连接管理
SSHClient() → 创建客户端
.set_missing_host_key_policy(...) → 设置主机密钥策略
.load_system_host_keys() → 加载系统 known_hosts
.connect(hostname, port, ...) → 建立连接
.close() → 关闭连接
.get_transport().is_active() → 检查连接是否活跃
📌 命令执行
.exec_command(cmd, timeout=...) → 执行命令,返回 (stdin, stdout, stderr)
stdout.read() → 读取全部输出(bytes)
stdout.channel.recv_exit_status() → 获取退出码
.invoke_shell() → 开启交互式 shell
📌 文件传输(SFTP)
.open_sftp() → 创建 SFTP 客户端
sftp.put(local, remote) → 上传文件
sftp.get(remote, local) → 下载文件
sftp.open(path, 'r') → 打开远程文件
sftp.listdir(path) → 列出目录
sftp.listdir_attr(path) → 列出目录(含详情)
sftp.mkdir(path) → 创建目录
sftp.stat(path) → 获取文件信息
sftp.chmod(path, mode) → 修改权限
📌 密钥认证
paramiko.RSAKey.from_private_key_file(path) → 加载 RSA 私钥
paramiko.Ed25519Key.from_private_key_file() → 加载 Ed25519 私钥
paramiko.AutoAddPolicy() → 自动接受新主机(开发用)
paramiko.RejectPolicy() → 拒绝未知主机(生产用)
10.2 方案选型建议
你的需求是...
简单执行几条命令?
→ paramiko(直接用,几行代码搞定)
写部署/运维脚本?
→ fabric(API 简洁,代码可读性好)
批量管理几十~几百台服务器?
→ SSHExecutor + ThreadPoolExecutor(多线程并发)
批量管理几百~几千台服务器?
→ asyncssh + asyncio(异步高并发)
需要可重试、自动重连的生产级工具?
→ 自己封装 SSHExecutor 类(本文第四部分)
企业级服务器管理?
→ Ansible(基于 paramiko 的专业自动化工具)
总结
学完本章,你应该掌握:
- SSH 基础概念:SSH 协议的作用、Python 需要 SSH 的场景
- paramiko 基础:连接(密码/密钥)、执行命令、读取结果、关闭连接
- exec_command 详解:stdout/stderr/exit_code 的读取,超时设置
- SFTP 文件传输:upload/download/read/write/exists 操作
- SSHExecutor 封装:用类封装常用操作,支持 with 语句、重试、结构化结果
- 批量并发执行:ThreadPoolExecutor 并发管理多台服务器
- fabric:更高层的 SSH 工具,适合部署脚本
- asyncssh:异步 SSH,适合超大规模并发
- 常见错误处理:各类连接/认证/编码错误的识别和解决
- 实战案例:日志采集、健康检查、批量部署的完整示例