23.pytest的接口自动化封装实战

Pytest 接口自动化封装实战 – 新手详细教程

1. 前言

1.1 本教程适合谁

本教程专为新手设计,如果你:

  • 刚开始学习 pytest 接口自动化测试
  • 想要了解如何封装一个完整的接口测试框架
  • 需要详细的示例和步骤说明
  • 希望掌握从零到一的完整实战过程

那么这份教程非常适合你!

1.2 本教程包含什么

本教程将带你从零开始,构建一个完整的接口自动化测试框架,包括:

  1. 使用 YAML 作为用例:将测试用例与代码分离
  2. 使用 Requests:发送 HTTP 请求
  3. 自动请求接口:封装统一的请求方法
  4. 断言接口 responses_validator:验证响应数据
  5. 变量提取 jsonpath:从响应中提取数据
  6. 自动记录 HTTP 报文 logging:记录请求和响应
  7. 自动生成 Allure 测试报告:生成美观的测试报告
  8. 文件上传测试:支持文件上传接口测试
  9. 数据驱动测试:使用 YAML 实现数据驱动
  10. 自定义断言:扩展断言功能
  11. 数据库查询:集成数据库验证

1.3 学习目标

完成本教程后,你将能够:

  • 理解接口自动化测试的封装思路
  • 搭建一个完整的接口测试框架
  • 编写和维护 YAML 格式的测试用例
  • 使用各种工具进行接口测试
  • 生成专业的测试报告

2. 环境准备

2.1 Python 环境

确保你已经安装了 Python 3.8 或更高版本:

python --version
# 应该显示:Python 3.8.x 或更高版本

2.2 安装依赖包

创建 requirements.txt 文件:

# 测试框架
pytest==7.4.3
pytest-html==4.1.1
pytest-xdist==3.5.0
pytest-rerunfailures==12.0

# HTTP 请求
requests==2.31.0

# 数据处理
pyyaml==6.0.1
jsonpath==0.82

# 响应验证
responses-validator==0.3.0

# 日志
loguru==0.7.2

# 测试报告
allure-pytest==2.13.2

# 数据库
pymysql==1.1.0
sqlalchemy==2.0.23

# 其他工具
python-dotenv==1.0.0

安装所有依赖:

pip install -r requirements.txt

2.3 安装 Allure

Windows 系统:

  1. 下载 Allure:https://github.com/allure-framework/allure2/releases
  2. 解压到某个目录(如 C:allure
  3. C:allurebin 添加到系统环境变量 PATH 中
  4. 验证安装:
allure --version

Mac/Linux 系统:

# 使用 Homebrew (Mac)
brew install allure

# 或使用 Scoop (Windows)
scoop install allure

2.4 项目结构

创建以下项目结构:

api_test_framework/
├── config/                      # 配置目录
│   ├── __init__.py
│   ├── config.py               # 配置管理类
│   ├── test_config.yaml        # 测试环境配置
│   └── dev_config.yaml         # 开发环境配置
│
├── clients/                    # HTTP 客户端目录
│   ├── __init__.py
│   └── http_client.py          # HTTP 客户端封装
│
├── api/                        # 业务接口目录
│   ├── __init__.py
│   ├── base_api.py             # 基础 API 类
│   └── user_api.py             # 用户相关接口
│
├── utils/                      # 工具类目录
│   ├── __init__.py
│   ├── logger.py               # 日志工具
│   ├── response_handler.py     # 响应处理器
│   ├── assertion.py            # 自定义断言
│   └── db_helper.py            # 数据库辅助工具
│
├── tests/                      # 测试用例目录
│   ├── __init__.py
│   ├── conftest.py             # pytest 配置
│   ├── test_cases/             # YAML 测试用例
│   │   ├── test_login.yaml
│   │   ├── test_user.yaml
│   │   └── test_upload.yaml
│   └── test_user_api.py        # 测试文件
│
├── data/                       # 测试数据目录
│   ├── test_users.yaml
│   └── upload_files/           # 上传文件目录
│       └── test.txt
│
├── logs/                       # 日志目录(自动生成)
│   └── .gitkeep
│
├── reports/                    # 测试报告目录(自动生成)
│   └── .gitkeep
│
├── requirements.txt            # 依赖文件
├── pytest.ini                  # pytest 配置
└── README.md                   # 项目说明

3. 核心模块封装

3.1 配置管理模块

3.1.1 配置文件示例

创建 config/test_config.yaml

# 测试环境配置
api:
  base_url: https://api.test.example.com
  timeout: 30

database:
  host: localhost
  port: 3306
  username: test_user
  password: test_password
  database: test_db

logging:
  level: DEBUG
  file: logs/test.log
  format: "%(asctime)s [%(levelname)8s] %(name)s: %(message)s"
  date_format: "%Y-%m-%d %H:%M:%S"

allure:
  report_dir: reports/allure-results
  report_url: reports/allure-report

3.1.2 配置管理类

创建 config/config.py

"""
配置管理模块
用于加载和管理测试配置
"""
import os
import yaml
from pathlib import Path
from typing import Dict, Any, Optional

class Config:
    """配置管理类"""

    def __init__(self, env: str = "test"):
        """
        初始化配置

        参数:
            env: 环境名称(test、dev、prod)
        """
        self.env = env
        self.base_dir = Path(__file__).parent.parent
        self.config_file = self.base_dir / "config" / f"{env}_config.yaml"
        self._config = self._load_config()

    def _load_config(self) -> Dict[str, Any]:
        """
        加载配置文件

        返回:
            配置字典
        """
        if not self.config_file.exists():
            raise FileNotFoundError(f"配置文件不存在: {self.config_file}")

        with open(self.config_file, "r", encoding="utf-8") as f:
            config = yaml.safe_load(f)

        # 环境变量可以覆盖配置文件
        if "API_BASE_URL" in os.environ:
            config["api"]["base_url"] = os.environ["API_BASE_URL"]

        return config

    @property
    def base_url(self) -> str:
        """获取 API 基础 URL"""
        return self._config["api"]["base_url"]

    @property
    def timeout(self) -> int:
        """获取超时时间"""
        return self._config["api"]["timeout"]

    @property
    def db_config(self) -> Dict[str, Any]:
        """获取数据库配置"""
        return self._config["database"]

    @property
    def logging_config(self) -> Dict[str, Any]:
        """获取日志配置"""
        return self._config["logging"]

    @property
    def allure_config(self) -> Dict[str, Any]:
        """获取 Allure 配置"""
        return self._config.get("allure", {})

    def get(self, key: str, default: Any = None) -> Any:
        """
        获取配置值(支持点号分隔的键)

        参数:
            key: 配置键,支持点号分隔,如 "api.base_url"
            default: 默认值

        返回:
            配置值
        示例:
            config.get("api.base_url")
            config.get("database.host")
        """
        keys = key.split(".")
        value = self._config

        for k in keys:
            if isinstance(value, dict) and k in value:
                value = value[k]
            else:
                return default

        return value

3.2 日志模块封装

3.2.1 日志工具类

创建 utils/logger.py

"""
日志工具模块
用于记录 HTTP 请求和响应的详细信息
"""
import logging
import json
import os
from pathlib import Path
from typing import Optional
from datetime import datetime

class HTTPLogger:
    """HTTP 日志记录器"""

    def __init__(self, name: str = "http_client", config: Optional[Dict] = None):
        """
        初始化日志记录器

        参数:
            name: 日志记录器名称
            config: 日志配置字典
        """
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.DEBUG)

        # 避免重复添加处理器
        if not self.logger.handlers:
            self._setup_handlers(config)

    def _setup_handlers(self, config: Optional[Dict] = None):
        """设置日志处理器"""
        # 默认配置
        log_level = logging.DEBUG
        log_file = "logs/http_client.log"
        log_format = "%(asctime)s [%(levelname)8s] %(name)s: %(message)s"
        date_format = "%Y-%m-%d %H:%M:%S"

        # 使用传入的配置
        if config:
            level_map = {
                "DEBUG": logging.DEBUG,
                "INFO": logging.INFO,
                "WARNING": logging.WARNING,
                "ERROR": logging.ERROR,
                "CRITICAL": logging.CRITICAL
            }
            log_level = level_map.get(config.get("level", "DEBUG"), logging.DEBUG)
            log_file = config.get("file", log_file)
            log_format = config.get("format", log_format)
            date_format = config.get("date_format", date_format)

        # 确保日志目录存在
        log_path = Path(log_file)
        log_path.parent.mkdir(parents=True, exist_ok=True)

        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        console_formatter = logging.Formatter(log_format, date_format)
        console_handler.setFormatter(console_formatter)

        # 文件处理器
        file_handler = logging.FileHandler(log_file, encoding="utf-8")
        file_handler.setLevel(log_level)
        file_formatter = logging.Formatter(log_format, date_format)
        file_handler.setFormatter(file_formatter)

        # 添加处理器
        self.logger.addHandler(console_handler)
        self.logger.addHandler(file_handler)

    def log_request(self, method: str, url: str, headers: Optional[Dict] = None,
                   params: Optional[Dict] = None, json_data: Optional[Dict] = None,
                   data: Optional[Dict] = None, files: Optional[Dict] = None):
        """
        记录 HTTP 请求

        参数:
            method: HTTP 方法
            url: 请求 URL
            headers: 请求头
            params: URL 参数
            json_data: JSON 数据
            data: 表单数据
            files: 文件数据
        """
        self.logger.info("=" * 80)
        self.logger.info(f"[请求] {method} {url}")
        self.logger.info("-" * 80)

        if headers:
            self.logger.debug(f"[请求头]n{json.dumps(dict(headers), ensure_ascii=False, indent=2)}")

        if params:
            self.logger.debug(f"[URL 参数]n{json.dumps(params, ensure_ascii=False, indent=2)}")

        if json_data:
            self.logger.debug(f"[请求体 JSON]n{json.dumps(json_data, ensure_ascii=False, indent=2)}")

        if data:
            self.logger.debug(f"[请求体 Form]n{json.dumps(data, ensure_ascii=False, indent=2)}")

        if files:
            file_info = {k: f"<文件: {v.name if hasattr(v, 'name') else 'binary'}>" 
                        for k, v in files.items()}
            self.logger.debug(f"[上传文件]n{json.dumps(file_info, ensure_ascii=False, indent=2)}")

        self.logger.info("-" * 80)

    def log_response(self, response, elapsed_time: Optional[float] = None):
        """
        记录 HTTP 响应

        参数:
            response: requests.Response 对象
            elapsed_time: 响应时间(秒)
        """
        self.logger.info("-" * 80)
        self.logger.info(f"[响应] 状态码: {response.status_code}")

        if elapsed_time:
            self.logger.info(f"[响应时间] {elapsed_time:.3f} 秒")

        self.logger.debug(f"[响应头]n{json.dumps(dict(response.headers), ensure_ascii=False, indent=2)}")

        # 尝试解析 JSON
        try:
            response_json = response.json()
            self.logger.debug(f"[响应体 JSON]n{json.dumps(response_json, ensure_ascii=False, indent=2)}")
        except:
            # 如果不是 JSON,记录文本内容(限制长度)
            text = response.text
            if len(text) > 1000:
                text = text[:1000] + "... (内容过长,已截断)"
            self.logger.debug(f"[响应体 Text]n{text}")

        self.logger.info("=" * 80)

    def log_error(self, message: str, exc_info: bool = False):
        """
        记录错误信息

        参数:
            message: 错误消息
            exc_info: 是否记录异常信息
        """
        self.logger.error(message, exc_info=exc_info)

    def info(self, message: str):
        """记录 INFO 级别日志"""
        self.logger.info(message)

    def debug(self, message: str):
        """记录 DEBUG 级别日志"""
        self.logger.debug(message)

    def warning(self, message: str):
        """记录 WARNING 级别日志"""
        self.logger.warning(message)

    def error(self, message: str):
        """记录 ERROR 级别日志"""
        self.logger.error(message)

3.3 HTTP 客户端封装

3.3.1 HTTP 客户端类

创建 clients/http_client.py

"""
HTTP 客户端封装模块
提供统一的 HTTP 请求接口,自动记录日志
"""
import requests
import json
import time
from typing import Dict, Optional, Any
from utils.logger import HTTPLogger

class HttpClient:
    """HTTP 客户端封装类"""

    def __init__(self, base_url: str, timeout: int = 30, logger: Optional[HTTPLogger] = None):
        """
        初始化 HTTP 客户端

        参数:
            base_url: 基础 URL
            timeout: 超时时间(秒)
            logger: 日志记录器
        """
        self.base_url = base_url.rstrip("/")
        self.timeout = timeout
        self.session = requests.Session()
        self.logger = logger or HTTPLogger("http_client")

        # 设置默认请求头
        self.session.headers.update({
            "Content-Type": "application/json",
            "User-Agent": "Python-Automation-Test/1.0"
        })

    def _build_url(self, endpoint: str) -> str:
        """
        构建完整 URL

        参数:
            endpoint: 接口路径

        返回:
            完整 URL
        """
        endpoint = endpoint.lstrip("/")
        return f"{self.base_url}/{endpoint}"

    def request(
        self,
        method: str,
        endpoint: str,
        params: Optional[Dict] = None,
        json_data: Optional[Dict] = None,
        data: Optional[Dict] = None,
        headers: Optional[Dict] = None,
        files: Optional[Dict] = None,
        **kwargs
    ) -> requests.Response:
        """
        发送 HTTP 请求

        参数:
            method: HTTP 方法(GET、POST、PUT、DELETE 等)
            endpoint: 接口路径
            params: URL 参数
            json_data: JSON 格式的请求体
            data: 表单格式的请求体
            headers: 请求头
            files: 文件数据
            **kwargs: 其他 requests 参数

        返回:
            响应对象
        """
        url = self._build_url(endpoint)

        # 合并请求头
        request_headers = self.session.headers.copy()
        if headers:
            request_headers.update(headers)

        # 记录请求日志
        self.logger.log_request(
            method=method,
            url=url,
            headers=request_headers,
            params=params,
            json_data=json_data,
            data=data,
            files=files
        )

        # 记录开始时间
        start_time = time.time()

        try:
            # 发送请求
            response = self.session.request(
                method=method.upper(),
                url=url,
                params=params,
                json=json_data,
                data=data,
                headers=request_headers,
                files=files,
                timeout=self.timeout,
                **kwargs
            )

            # 计算响应时间
            elapsed_time = time.time() - start_time

            # 记录响应日志
            self.logger.log_response(response, elapsed_time)

            return response

        except requests.exceptions.Timeout:
            elapsed_time = time.time() - start_time
            self.logger.log_error(f"请求超时:{method} {url},耗时:{elapsed_time:.3f} 秒")
            raise
        except requests.exceptions.ConnectionError:
            self.logger.log_error(f"连接错误:{method} {url}")
            raise
        except Exception as e:
            self.logger.log_error(f"请求异常:{method} {url},错误:{str(e)}", exc_info=True)
            raise

    def get(self, endpoint: str, params: Optional[Dict] = None, **kwargs) -> requests.Response:
        """发送 GET 请求"""
        return self.request("GET", endpoint, params=params, **kwargs)

    def post(self, endpoint: str, json_data: Optional[Dict] = None, 
             data: Optional[Dict] = None, files: Optional[Dict] = None, **kwargs) -> requests.Response:
        """发送 POST 请求"""
        return self.request("POST", endpoint, json_data=json_data, data=data, files=files, **kwargs)

    def put(self, endpoint: str, json_data: Optional[Dict] = None, **kwargs) -> requests.Response:
        """发送 PUT 请求"""
        return self.request("PUT", endpoint, json_data=json_data, **kwargs)

    def delete(self, endpoint: str, **kwargs) -> requests.Response:
        """发送 DELETE 请求"""
        return self.request("DELETE", endpoint, **kwargs)

    def patch(self, endpoint: str, json_data: Optional[Dict] = None, **kwargs) -> requests.Response:
        """发送 PATCH 请求"""
        return self.request("PATCH", endpoint, json_data=json_data, **kwargs)

    def set_header(self, key: str, value: str):
        """设置请求头"""
        self.session.headers[key] = value
        self.logger.debug(f"设置请求头:{key} = {value}")

    def remove_header(self, key: str):
        """移除请求头"""
        if key in self.session.headers:
            del self.session.headers[key]
            self.logger.debug(f"移除请求头:{key}")

    def set_auth_token(self, token: str):
        """设置认证 Token"""
        self.set_header("Authorization", f"Bearer {token}")

3.4 响应处理器封装

3.4.1 响应处理器类

创建 utils/response_handler.py

"""
响应处理器模块
封装响应对象的常用操作,如断言、数据提取等
"""
import requests
import json
import jsonpath
from typing import Any, Optional, List
from responses_validator import validator as responses_validator

class ResponseHandler:
    """响应处理器"""

    def __init__(self, response: requests.Response):
        """
        初始化响应处理器

        参数:
            response: requests 响应对象
        """
        self.response = response
        self._json_data = None

    @property
    def status_code(self) -> int:
        """获取状态码"""
        return self.response.status_code

    @property
    def text(self) -> str:
        """获取响应文本"""
        return self.response.text

    @property
    def headers(self) -> Dict:
        """获取响应头"""
        return dict(self.response.headers)

    @property
    def json(self) -> dict:
        """
        获取 JSON 数据(缓存)

        返回:
            JSON 字典

        异常:
            ValueError: 如果响应不是有效的 JSON 格式
        """
        if self._json_data is None:
            try:
                self._json_data = self.response.json()
            except json.JSONDecodeError:
                raise ValueError(f"响应不是有效的 JSON 格式:{self.response.text[:200]}")
        return self._json_data

    def assert_status_code(self, expected_code: int):
        """
        断言状态码

        参数:
            expected_code: 期望的状态码

        返回:
            self(支持链式调用)
        """
        actual_code = self.status_code
        assert actual_code == expected_code, 
            f"状态码断言失败:期望 {expected_code},实际 {actual_code}"
        return self

    def assert_json_path(self, json_path: str, expected_value: Any):
        """
        断言 JSON 路径的值

        参数:
            json_path: JSON 路径表达式(如 "$.data.id")
            expected_value: 期望的值

        返回:
            self(支持链式调用)
        """
        actual_value = self.extract(json_path)
        assert actual_value == expected_value, 
            f"JSON 路径断言失败:路径 {json_path},期望 {expected_value},实际 {actual_value}"
        return self

    def assert_contains(self, text: str):
        """
        断言响应包含指定文本

        参数:
            text: 要查找的文本

        返回:
            self(支持链式调用)
        """
        assert text in self.text, 
            f"响应中不包含文本 '{text}':{self.text[:200]}"
        return self

    def extract(self, json_path: str) -> Any:
        """
        提取 JSON 路径的值

        参数:
            json_path: JSON 路径表达式(如 "$.data.id")

        返回:
            提取的值

        异常:
            ValueError: 如果路径不存在
        """
        try:
            data = self.json
            result = jsonpath.jsonpath(data, json_path)
            if not result:
                raise ValueError(f"JSON 路径 {json_path} 不存在")
            return result[0]
        except Exception as e:
            raise ValueError(f"提取 JSON 路径失败:{json_path},错误:{str(e)}")

    def extract_all(self, json_path: str) -> List[Any]:
        """
        提取 JSON 路径的所有匹配值

        参数:
            json_path: JSON 路径表达式

        返回:
            提取的值列表
        """
        try:
            data = self.json
            result = jsonpath.jsonpath(data, json_path)
            return result if result else []
        except Exception as e:
            raise ValueError(f"提取 JSON 路径失败:{json_path},错误:{str(e)}")

    def validate(self, **validation_rules):
        """
        使用 responses-validator 进行响应验证

        参数:
            **validation_rules: 验证规则
                支持的规则:
                - status_code: 状态码验证
                - headers: 响应头验证
                - json: JSON 验证
                - text: 文本验证
                - cookies: Cookies 验证
                - function: 函数验证

        返回:
            self(支持链式调用)

        示例:
            response.validate(
                status_code=200,
                json={"code": 0, "message": "success"}
            )
        """
        responses_validator(self.response, **validation_rules)
        return self

    def get_elapsed_time(self) -> float:
        """
        获取响应时间(秒)

        返回:
            响应时间
        """
        return self.response.elapsed.total_seconds()

3.5 自定义断言模块

3.5.1 自定义断言类

创建 utils/assertion.py

"""
自定义断言模块
扩展断言功能,提供更友好的错误信息
"""
import json
from typing import Any, Dict, List, Optional

class CustomAssertion:
    """自定义断言类"""

    @staticmethod
    def assert_dict_contains(expected: Dict, actual: Dict, path: str = ""):
        """
        断言字典包含指定的键值对

        参数:
            expected: 期望的字典
            actual: 实际的字典
            path: 当前路径(用于错误信息)

        示例:
            expected = {"code": 0, "message": "success"}
            actual = {"code": 0, "message": "success", "data": {...}}
            CustomAssertion.assert_dict_contains(expected, actual)  # 通过
        """
        for key, expected_value in expected.items():
            current_path = f"{path}.{key}" if path else key

            assert key in actual, 
                f"字典中缺少键 '{current_path}',实际字典:{json.dumps(actual, ensure_ascii=False)}"

            actual_value = actual[key]

            if isinstance(expected_value, dict) and isinstance(actual_value, dict):
                CustomAssertion.assert_dict_contains(expected_value, actual_value, current_path)
            elif isinstance(expected_value, list) and isinstance(actual_value, list):
                CustomAssertion.assert_list_contains(expected_value, actual_value, current_path)
            else:
                assert actual_value == expected_value, 
                    f"键 '{current_path}' 的值不匹配:期望 {expected_value},实际 {actual_value}"

    @staticmethod
    def assert_list_contains(expected: List, actual: List, path: str = ""):
        """
        断言列表包含指定的元素

        参数:
            expected: 期望的元素列表
            actual: 实际的列表
            path: 当前路径(用于错误信息)
        """
        assert len(actual) >= len(expected), 
            f"列表长度不足:期望至少 {len(expected)} 个元素,实际 {len(actual)} 个"

        for i, expected_item in enumerate(expected):
            current_path = f"{path}[{i}]"
            actual_item = actual[i] if i < len(actual) else None

            assert actual_item is not None, 
                f"列表索引 {i} 处缺少元素"

            if isinstance(expected_item, dict) and isinstance(actual_item, dict):
                CustomAssertion.assert_dict_contains(expected_item, actual_item, current_path)
            elif isinstance(expected_item, list) and isinstance(actual_item, list):
                CustomAssertion.assert_list_contains(expected_item, actual_item, current_path)
            else:
                assert actual_item == expected_item, 
                    f"列表索引 {i} 处的值不匹配:期望 {expected_item},实际 {actual_item}"

    @staticmethod
    def assert_response_time(response, max_time: float):
        """
        断言响应时间

        参数:
            response: ResponseHandler 对象或 requests.Response 对象
            max_time: 最大响应时间(秒)
        """
        if hasattr(response, 'get_elapsed_time'):
            elapsed_time = response.get_elapsed_time()
        elif hasattr(response, 'elapsed'):
            elapsed_time = response.elapsed.total_seconds()
        else:
            raise ValueError("响应对象不支持获取响应时间")

        assert elapsed_time <= max_time, 
            f"响应时间超时:期望 <= {max_time} 秒,实际 {elapsed_time:.3f} 秒"

    @staticmethod
    def assert_status_code_range(response, min_code: int = 200, max_code: int = 299):
        """
        断言状态码范围

        参数:
            response: ResponseHandler 对象或 requests.Response 对象
            min_code: 最小状态码
            max_code: 最大状态码
        """
        if hasattr(response, 'status_code'):
            status_code = response.status_code
        else:
            status_code = response.status_code

        assert min_code <= status_code <= max_code, 
            f"状态码不在范围内:期望 {min_code}-{max_code},实际 {status_code}"

    @staticmethod
    def assert_json_schema(response, schema: Dict):
        """
        断言 JSON Schema(简化版)

        参数:
            response: ResponseHandler 对象
            schema: Schema 字典,键为字段名,值为类型或值

        示例:
            schema = {
                "code": int,
                "message": str,
                "data": {"id": int, "name": str}
            }
        """
        data = response.json

        def _check_schema(data_part: Any, schema_part: Any, path: str = ""):
            if isinstance(schema_part, dict):
                assert isinstance(data_part, dict), 
                    f"路径 '{path}' 期望字典类型,实际 {type(data_part).__name__}"

                for key, value_schema in schema_part.items():
                    current_path = f"{path}.{key}" if path else key
                    assert key in data_part, 
                        f"路径 '{current_path}' 不存在"
                    _check_schema(data_part[key], value_schema, current_path)
            elif isinstance(schema_part, type):
                assert isinstance(data_part, schema_part), 
                    f"路径 '{path}' 类型不匹配:期望 {schema_part.__name__},实际 {type(data_part).__name__}"
            else:
                assert data_part == schema_part, 
                    f"路径 '{path}' 值不匹配:期望 {schema_part},实际 {data_part}"

        _check_schema(data, schema)

3.6 数据库辅助工具

3.6.1 数据库辅助类

创建 utils/db_helper.py

"""
数据库辅助工具模块
用于数据库查询和验证
"""
import pymysql
from typing import Dict, List, Optional, Any
from contextlib import contextmanager

class DBHelper:
    """数据库辅助类"""

    def __init__(self, host: str, port: int, user: str, password: str, database: str):
        """
        初始化数据库连接配置

        参数:
            host: 数据库主机
            port: 数据库端口
            user: 用户名
            password: 密码
            database: 数据库名
        """
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.database = database

    @contextmanager
    def get_connection(self):
        """
        获取数据库连接(上下文管理器)

        使用示例:
            with db_helper.get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute("SELECT * FROM users WHERE id = %s", (1,))
                result = cursor.fetchone()
        """
        conn = None
        try:
            conn = pymysql.connect(
                host=self.host,
                port=self.port,
                user=self.user,
                password=self.password,
                database=self.database,
                charset='utf8mb4',
                cursorclass=pymysql.cursors.DictCursor
            )
            yield conn
            conn.commit()
        except Exception as e:
            if conn:
                conn.rollback()
            raise
        finally:
            if conn:
                conn.close()

    def query_one(self, sql: str, params: Optional[tuple] = None) -> Optional[Dict]:
        """
        查询单条记录

        参数:
            sql: SQL 语句
            params: SQL 参数

        返回:
            查询结果字典,如果没有结果返回 None

        示例:
            user = db_helper.query_one("SELECT * FROM users WHERE id = %s", (1,))
        """
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, params)
            return cursor.fetchone()

    def query_all(self, sql: str, params: Optional[tuple] = None) -> List[Dict]:
        """
        查询多条记录

        参数:
            sql: SQL 语句
            params: SQL 参数

        返回:
            查询结果列表

        示例:
            users = db_helper.query_all("SELECT * FROM users WHERE status = %s", ('active',))
        """
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, params)
            return cursor.fetchall()

    def execute(self, sql: str, params: Optional[tuple] = None) -> int:
        """
        执行 SQL 语句(INSERT、UPDATE、DELETE)

        参数:
            sql: SQL 语句
            params: SQL 参数

        返回:
            受影响的行数

        示例:
            rows = db_helper.execute("UPDATE users SET status = %s WHERE id = %s", ('inactive', 1))
        """
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, params)
            return cursor.rowcount

    def assert_record_exists(self, table: str, conditions: Dict[str, Any]):
        """
        断言记录存在

        参数:
            table: 表名
            conditions: 查询条件字典

        示例:
            db_helper.assert_record_exists("users", {"id": 1, "status": "active"})
        """
        where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
        sql = f"SELECT * FROM {table} WHERE {where_clause}"
        params = tuple(conditions.values())

        result = self.query_one(sql, params)
        assert result is not None, 
            f"记录不存在:表 {table},条件 {conditions}"

    def assert_record_not_exists(self, table: str, conditions: Dict[str, Any]):
        """
        断言记录不存在

        参数:
            table: 表名
            conditions: 查询条件字典
        """
        where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
        sql = f"SELECT * FROM {table} WHERE {where_clause}"
        params = tuple(conditions.values())

        result = self.query_one(sql, params)
        assert result is None, 
            f"记录不应该存在:表 {table},条件 {conditions},但找到了:{result}"

4. 业务接口封装

4.1 基础 API 类

创建 api/base_api.py

"""
基础 API 类
所有业务 API 的基类
"""
from clients.http_client import HttpClient
from utils.response_handler import ResponseHandler

class BaseAPI:
    """API 基类"""

    def __init__(self, http_client: HttpClient):
        """
        初始化 API

        参数:
            http_client: HTTP 客户端实例
        """
        self.client = http_client

    def _get(self, endpoint: str, **kwargs) -> ResponseHandler:
        """GET 请求"""
        response = self.client.get(endpoint, **kwargs)
        return ResponseHandler(response)

    def _post(self, endpoint: str, json_data: Optional[Dict] = None, 
              data: Optional[Dict] = None, files: Optional[Dict] = None, **kwargs) -> ResponseHandler:
        """POST 请求"""
        response = self.client.post(endpoint, json_data=json_data, data=data, files=files, **kwargs)
        return ResponseHandler(response)

    def _put(self, endpoint: str, json_data: Optional[Dict] = None, **kwargs) -> ResponseHandler:
        """PUT 请求"""
        response = self.client.put(endpoint, json_data=json_data, **kwargs)
        return ResponseHandler(response)

    def _delete(self, endpoint: str, **kwargs) -> ResponseHandler:
        """DELETE 请求"""
        response = self.client.delete(endpoint, **kwargs)
        return ResponseHandler(response)

4.2 用户 API 封装

创建 api/user_api.py

"""
用户相关接口封装
"""
from typing import Optional, Dict
from api.base_api import BaseAPI
from utils.response_handler import ResponseHandler

class UserAPI(BaseAPI):
    """用户相关接口"""

    def login(self, username: str, password: str) -> ResponseHandler:
        """
        用户登录

        参数:
            username: 用户名
            password: 密码

        返回:
            响应处理器
        """
        endpoint = "/api/user/login"
        data = {
            "username": username,
            "password": password
        }
        return self._post(endpoint, json_data=data)

    def register(self, username: str, password: str, email: str) -> ResponseHandler:
        """
        用户注册

        参数:
            username: 用户名
            password: 密码
            email: 邮箱

        返回:
            响应处理器
        """
        endpoint = "/api/user/register"
        data = {
            "username": username,
            "password": password,
            "email": email
        }
        return self._post(endpoint, json_data=data)

    def get_user_info(self, user_id: Optional[int] = None) -> ResponseHandler:
        """
        获取用户信息

        参数:
            user_id: 用户 ID,如果为 None 则获取当前用户信息

        返回:
            响应处理器
        """
        if user_id:
            endpoint = f"/api/user/{user_id}"
        else:
            endpoint = "/api/user/info"
        return self._get(endpoint)

    def update_user_info(self, user_id: int, **kwargs) -> ResponseHandler:
        """
        更新用户信息

        参数:
            user_id: 用户 ID
            **kwargs: 要更新的用户信息字段

        返回:
            响应处理器
        """
        endpoint = f"/api/user/{user_id}"
        return self._put(endpoint, json_data=kwargs)

    def delete_user(self, user_id: int) -> ResponseHandler:
        """
        删除用户

        参数:
            user_id: 用户 ID

        返回:
            响应处理器
        """
        endpoint = f"/api/user/{user_id}"
        return self._delete(endpoint)

    def upload_avatar(self, user_id: int, file_path: str) -> ResponseHandler:
        """
        上传用户头像

        参数:
            user_id: 用户 ID
            file_path: 文件路径

        返回:
            响应处理器
        """
        endpoint = f"/api/user/{user_id}/avatar"

        with open(file_path, 'rb') as f:
            files = {'file': (file_path.split('/')[-1], f, 'image/jpeg')}
            return self._post(endpoint, files=files)

5. YAML 测试用例设计

5.1 YAML 用例结构

5.1.1 登录测试用例

创建 tests/test_cases/test_login.yaml

# 登录接口测试用例
test_cases:
  # 用例 1:登录成功
  - name: "登录成功"
    description: "使用正确的用户名和密码登录"
    request:
      method: POST
      endpoint: /api/user/login
      json:
        username: admin
        password: 123456
    validate:
      status_code: 200
      json:
        code: 0
        message: "登录成功"
        data:
          token: "*"  # 通配符,表示任意值
          user_id: "*"
    extract:
      token: "$.data.token"
      user_id: "$.data.user_id"

  # 用例 2:密码错误
  - name: "密码错误"
    description: "使用错误的密码登录"
    request:
      method: POST
      endpoint: /api/user/login
      json:
        username: admin
        password: wrong_password
    validate:
      status_code: 200
      json:
        code: 1001
        message: "密码错误"

  # 用例 3:用户名不存在
  - name: "用户名不存在"
    description: "使用不存在的用户名登录"
    request:
      method: POST
      endpoint: /api/user/login
      json:
        username: not_exist_user
        password: 123456
    validate:
      status_code: 200
      json:
        code: 1002
        message: "用户名不存在"

  # 用例 4:参数缺失
  - name: "参数缺失"
    description: "缺少密码参数"
    request:
      method: POST
      endpoint: /api/user/login
      json:
        username: admin
    validate:
      status_code: 400
      json:
        code: 1003
        message: "*参数错误*"  # 使用通配符匹配

5.1.2 用户管理测试用例

创建 tests/test_cases/test_user.yaml

# 用户管理接口测试用例
test_cases:
  # 用例 1:获取用户信息
  - name: "获取用户信息"
    description: "获取指定用户的信息"
    depends_on: login  # 依赖登录用例
    request:
      method: GET
      endpoint: /api/user/{user_id}
      path_params:
        user_id: "{user_id}"  # 使用变量
      headers:
        Authorization: "Bearer {token}"  # 使用登录返回的 token
    validate:
      status_code: 200
      json:
        code: 0
        data:
          id: "{user_id}"
          username: "*"
          email: "*@*"  # 邮箱格式验证
    extract:
      username: "$.data.username"
      email: "$.data.email"

  # 用例 2:创建用户
  - name: "创建用户"
    description: "创建新用户"
    request:
      method: POST
      endpoint: /api/user/register
      json:
        username: "test_user_{timestamp}"  # 使用时间戳生成唯一用户名
        password: "123456"
        email: "test_{timestamp}@example.com"
    validate:
      status_code: 201
      json:
        code: 0
        message: "创建成功"
        data:
          id: "*"
    extract:
      new_user_id: "$.data.id"

  # 用例 3:更新用户信息
  - name: "更新用户信息"
    description: "更新用户信息"
    depends_on: login
    request:
      method: PUT
      endpoint: /api/user/{user_id}
      path_params:
        user_id: "{user_id}"
      headers:
        Authorization: "Bearer {token}"
      json:
        nickname: "新昵称"
        email: "newemail@example.com"
    validate:
      status_code: 200
      json:
        code: 0
        message: "更新成功"

  # 用例 4:删除用户
  - name: "删除用户"
    description: "删除用户"
    depends_on: login
    request:
      method: DELETE
      endpoint: /api/user/{user_id}
      path_params:
        user_id: "{user_id}"
      headers:
        Authorization: "Bearer {token}"
    validate:
      status_code: 200
      json:
        code: 0
        message: "删除成功"

5.1.3 文件上传测试用例

创建 tests/test_cases/test_upload.yaml

# 文件上传接口测试用例
test_cases:
  # 用例 1:上传头像
  - name: "上传用户头像"
    description: "上传用户头像文件"
    depends_on: login
    request:
      method: POST
      endpoint: /api/user/{user_id}/avatar
      path_params:
        user_id: "{user_id}"
      headers:
        Authorization: "Bearer {token}"
      files:
        file: "data/upload_files/avatar.jpg"  # 文件路径
    validate:
      status_code: 200
      json:
        code: 0
        message: "上传成功"
        data:
          file_url: "*"  # 文件 URL
    extract:
      avatar_url: "$.data.file_url"

  # 用例 2:上传文档
  - name: "上传文档"
    description: "上传文档文件"
    depends_on: login
    request:
      method: POST
      endpoint: /api/user/{user_id}/documents
      path_params:
        user_id: "{user_id}"
      headers:
        Authorization: "Bearer {token}"
      files:
        file: "data/upload_files/document.pdf"
        description: "测试文档"  # 额外的表单字段
    validate:
      status_code: 200
      json:
        code: 0
        message: "上传成功"

  # 用例 3:上传多个文件
  - name: "上传多个文件"
    description: "同时上传多个文件"
    depends_on: login
    request:
      method: POST
      endpoint: /api/user/{user_id}/files
      path_params:
        user_id: "{user_id}"
      headers:
        Authorization: "Bearer {token}"
      files:
        file1: "data/upload_files/file1.txt"
        file2: "data/upload_files/file2.txt"
    validate:
      status_code: 200
      json:
        code: 0
        message: "上传成功"

5.2 YAML 用例加载器

创建 utils/yaml_loader.py

"""
YAML 用例加载器
用于加载和解析 YAML 格式的测试用例
"""
import yaml
import os
import re
import time
from pathlib import Path
from typing import Dict, List, Any, Optional

class YAMLLoader:
    """YAML 用例加载器"""

    @staticmethod
    def load_test_cases(file_path: str) -> List[Dict[str, Any]]:
        """
        加载测试用例文件

        参数:
            file_path: YAML 文件路径

        返回:
            测试用例列表
        """
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"测试用例文件不存在:{file_path}")

        with open(file_path, "r", encoding="utf-8") as f:
            data = yaml.safe_load(f)

        if "test_cases" not in data:
            raise ValueError(f"YAML 文件中缺少 'test_cases' 键:{file_path}")

        return data["test_cases"]

    @staticmethod
    def replace_variables(text: str, variables: Dict[str, Any]) -> str:
        """
        替换文本中的变量

        参数:
            text: 包含变量的文本
            variables: 变量字典

        返回:
            替换后的文本

        示例:
            text = "/api/user/{user_id}"
            variables = {"user_id": 123}
            result = "/api/user/123"
        """
        # 处理特殊变量
        if "{timestamp}" in text:
            text = text.replace("{timestamp}", str(int(time.time())))

        # 处理普通变量
        pattern = r'{([^}]+)}'

        def replace_match(match):
            var_name = match.group(1)
            if var_name in variables:
                return str(variables[var_name])
            return match.group(0)  # 如果变量不存在,保持原样

        return re.sub(pattern, replace_match, text)

    @staticmethod
    def process_test_case(case: Dict[str, Any], variables: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        处理测试用例,替换变量

        参数:
            case: 测试用例字典
            variables: 变量字典

        返回:
            处理后的测试用例
        """
        if variables is None:
            variables = {}

        processed_case = case.copy()

        # 处理请求中的变量
        if "request" in processed_case:
            request = processed_case["request"]

            # 处理 endpoint
            if "endpoint" in request:
                request["endpoint"] = YAMLLoader.replace_variables(request["endpoint"], variables)

            # 处理 path_params
            if "path_params" in request:
                for key, value in request["path_params"].items():
                    request["path_params"][key] = YAMLLoader.replace_variables(str(value), variables)

            # 处理 headers
            if "headers" in request:
                for key, value in request["headers"].items():
                    request["headers"][key] = YAMLLoader.replace_variables(str(value), variables)

            # 处理 json
            if "json" in request:
                request["json"] = YAMLLoader._replace_dict_variables(request["json"], variables)

        return processed_case

    @staticmethod
    def _replace_dict_variables(data: Any, variables: Dict[str, Any]) -> Any:
        """递归替换字典中的变量"""
        if isinstance(data, dict):
            return {k: YAMLLoader._replace_dict_variables(v, variables) for k, v in data.items()}
        elif isinstance(data, list):
            return [YAMLLoader._replace_dict_variables(item, variables) for item in data]
        elif isinstance(data, str):
            return YAMLLoader.replace_variables(data, variables)
        else:
            return data

6. pytest 配置和 Fixture

6.1 conftest.py 配置

创建 tests/conftest.py

"""
pytest 配置文件
定义全局 fixture 和配置
"""
import pytest
import os
from pathlib import Path
from config.config import Config
from clients.http_client import HttpClient
from utils.logger import HTTPLogger
from api.user_api import UserAPI
from utils.db_helper import DBHelper

def pytest_addoption(parser):
    """添加命令行参数"""
    parser.addoption(
        "--env",
        action="store",
        default="test",
        help="测试环境:test、dev、prod"
    )

@pytest.fixture(scope="session")
def config(request):
    """全局配置 fixture"""
    env = request.config.getoption("--env")
    return Config(env=env)

@pytest.fixture(scope="session")
def logger(config):
    """全局日志记录器 fixture"""
    logging_config = config.logging_config
    return HTTPLogger("test", logging_config)

@pytest.fixture(scope="session")
def http_client(config, logger):
    """全局 HTTP 客户端 fixture"""
    return HttpClient(
        base_url=config.base_url,
        timeout=config.timeout,
        logger=logger
    )

@pytest.fixture(scope="session")
def user_api(http_client):
    """用户 API fixture"""
    return UserAPI(http_client)

@pytest.fixture(scope="session")
def db_helper(config):
    """数据库辅助工具 fixture"""
    db_config = config.db_config
    return DBHelper(
        host=db_config["host"],
        port=db_config["port"],
        user=db_config["username"],
        password=db_config["password"],
        database=db_config["database"]
    )

@pytest.fixture(scope="function")
def login_token(user_api, config):
    """
    登录并返回 token fixture

    注意:这是一个示例 fixture,实际使用时需要根据你的登录接口调整
    """
    # 从配置或环境变量获取测试账号
    test_username = os.getenv("TEST_USERNAME", "admin")
    test_password = os.getenv("TEST_PASSWORD", "123456")

    try:
        response = user_api.login(test_username, test_password)
        response.assert_status_code(200)
        token = response.extract("$.data.token")
        return token
    except Exception as e:
        pytest.skip(f"登录失败,跳过测试:{str(e)}")

@pytest.fixture(scope="function")
def login_user_id(user_api, login_token):
    """获取登录用户的 ID"""
    try:
        response = user_api.get_user_info()
        response.assert_status_code(200)
        user_id = response.extract("$.data.id")
        return user_id
    except Exception as e:
        pytest.skip(f"获取用户信息失败,跳过测试:{str(e)}")

6.2 pytest.ini 配置

创建 pytest.ini

[pytest]
# 测试发现配置
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*

# 日志配置
log_cli = true
log_cli_level = INFO
log_cli_format = %(asctime)s [%(levelname)8s] %(name)s: %(message)s
log_cli_date_format = %Y-%m-%d %H:%M:%S

log_file = logs/pytest.log
log_file_level = DEBUG
log_file_format = %(asctime)s [%(levelname)8s] %(filename)s:%(lineno)d - %(funcName)s() - %(message)s
log_file_date_format = %Y-%m-%d %H:%M:%S

# Allure 配置
addopts = 
    -v
    --tb=short
    --strict-markers
    --alluredir=reports/allure-results

# 标记定义
markers =
    smoke: 冒烟测试
    regression: 回归测试
    api: API 测试
    database: 数据库测试
    upload: 文件上传测试

7. 测试用例实现

7.1 基于 YAML 的测试用例

创建 tests/test_user_api.py

"""
用户 API 测试用例
基于 YAML 文件实现数据驱动测试
"""
import pytest
import os
from pathlib import Path
from utils.yaml_loader import YAMLLoader
from clients.http_client import HttpClient
from utils.response_handler import ResponseHandler

class TestUserAPI:
    """用户 API 测试类"""

    @pytest.fixture(scope="class")
    def test_cases_dir(self):
        """测试用例目录"""
        return Path(__file__).parent / "test_cases"

    @pytest.fixture(scope="class")
    def login_cases(self, test_cases_dir):
        """加载登录测试用例"""
        yaml_file = test_cases_dir / "test_login.yaml"
        return YAMLLoader.load_test_cases(str(yaml_file))

    @pytest.fixture(scope="class")
    def user_cases(self, test_cases_dir):
        """加载用户管理测试用例"""
        yaml_file = test_cases_dir / "test_user.yaml"
        return YAMLLoader.load_test_cases(str(yaml_file))

    @pytest.mark.parametrize("case", [
        pytest.param(case, id=case["name"])
        for case in YAMLLoader.load_test_cases(
            str(Path(__file__).parent / "test_cases" / "test_login.yaml")
        )
    ])
    @pytest.mark.api
    def test_login(self, http_client, case):
        """
        登录接口测试(数据驱动)

        参数:
            http_client: HTTP 客户端
            case: 测试用例数据
        """
        # 处理测试用例
        processed_case = YAMLLoader.process_test_case(case)
        request = processed_case["request"]

        # 发送请求
        method = request["method"]
        endpoint = request["endpoint"]
        json_data = request.get("json")
        headers = request.get("headers", {})

        response = http_client.request(
            method=method,
            endpoint=endpoint,
            json_data=json_data,
            headers=headers
        )

        # 创建响应处理器
        handler = ResponseHandler(response)

        # 验证响应
        validate = processed_case.get("validate", {})

        # 状态码验证
        if "status_code" in validate:
            handler.assert_status_code(validate["status_code"])

        # JSON 验证
        if "json" in validate:
            handler.validate(json=validate["json"])

        # 提取变量(如果需要)
        extract = processed_case.get("extract", {})
        variables = {}
        for var_name, json_path in extract.items():
            try:
                value = handler.extract(json_path)
                variables[var_name] = value
                pytest.current_variables = variables  # 保存到 pytest 上下文
            except Exception as e:
                pytest.fail(f"提取变量失败:{var_name} = {json_path},错误:{str(e)}")

    @pytest.mark.api
    def test_get_user_info(self, user_api, login_token, login_user_id):
        """
        测试获取用户信息

        参数:
            user_api: 用户 API
            login_token: 登录 token
            user_id: 用户 ID
        """
        # 设置认证头
        user_api.client.set_auth_token(login_token)

        # 获取用户信息
        response = user_api.get_user_info(login_user_id)

        # 验证响应
        response.assert_status_code(200)
        response.assert_json_path("$.code", 0)
        response.assert_json_path("$.data.id", login_user_id)

        # 提取用户信息
        username = response.extract("$.data.username")
        email = response.extract("$.data.email")

        assert username is not None
        assert "@" in email

    @pytest.mark.api
    def test_create_user(self, user_api):
        """测试创建用户"""
        import time

        # 生成唯一的用户名和邮箱
        timestamp = int(time.time())
        username = f"test_user_{timestamp}"
        email = f"test_{timestamp}@example.com"

        # 创建用户
        response = user_api.register(
            username=username,
            password="123456",
            email=email
        )

        # 验证响应
        response.assert_status_code(201)
        response.assert_json_path("$.code", 0)
        response.assert_json_path("$.message", "创建成功")

        # 提取用户 ID
        user_id = response.extract("$.data.id")
        assert user_id is not None

        return user_id

    @pytest.mark.api
    def test_update_user(self, user_api, login_token, login_user_id):
        """测试更新用户信息"""
        # 设置认证头
        user_api.client.set_auth_token(login_token)

        # 更新用户信息
        response = user_api.update_user_info(
            user_id=login_user_id,
            nickname="新昵称",
            email="newemail@example.com"
        )

        # 验证响应
        response.assert_status_code(200)
        response.assert_json_path("$.code", 0)
        response.assert_json_path("$.message", "更新成功")

        # 验证更新后的信息
        info_response = user_api.get_user_info(login_user_id)
        assert info_response.extract("$.data.nickname") == "新昵称"
        assert info_response.extract("$.data.email") == "newemail@example.com"

    @pytest.mark.upload
    def test_upload_avatar(self, user_api, login_token, login_user_id):
        """测试上传用户头像"""
        import os

        # 设置认证头
        user_api.client.set_auth_token(login_token)

        # 准备测试文件
        test_file = Path(__file__).parent.parent / "data" / "upload_files" / "test.txt"

        # 如果文件不存在,创建一个
        if not test_file.exists():
            test_file.parent.mkdir(parents=True, exist_ok=True)
            test_file.write_text("测试文件内容", encoding="utf-8")

        # 上传文件
        response = user_api.upload_avatar(login_user_id, str(test_file))

        # 验证响应
        response.assert_status_code(200)
        response.assert_json_path("$.code", 0)

        # 提取文件 URL
        file_url = response.extract("$.data.file_url")
        assert file_url is not None
        assert file_url.startswith("http")

7.2 数据库验证测试

创建 tests/test_database.py

"""
数据库验证测试
验证接口操作后的数据库状态
"""
import pytest
from utils.db_helper import DBHelper

@pytest.mark.database
class TestDatabase:
    """数据库测试类"""

    def test_user_created_in_db(self, db_helper, user_api):
        """
        测试创建用户后,数据库中是否有对应记录

        参数:
            db_helper: 数据库辅助工具
            user_api: 用户 API
        """
        import time

        # 创建用户
        timestamp = int(time.time())
        username = f"db_test_user_{timestamp}"
        email = f"db_test_{timestamp}@example.com"

        response = user_api.register(
            username=username,
            password="123456",
            email=email
        )

        user_id = response.extract("$.data.id")

        # 验证数据库中的记录
        db_helper.assert_record_exists(
            "users",
            {"id": user_id, "username": username, "email": email}
        )

        # 查询并验证详细信息
        user = db_helper.query_one(
            "SELECT * FROM users WHERE id = %s",
            (user_id,)
        )

        assert user is not None
        assert user["username"] == username
        assert user["email"] == email

    def test_user_deleted_from_db(self, db_helper, user_api, login_token, login_user_id):
        """
        测试删除用户后,数据库中的记录是否被删除

        参数:
            db_helper: 数据库辅助工具
            user_api: 用户 API
            login_token: 登录 token
            login_user_id: 用户 ID
        """
        # 设置认证头
        user_api.client.set_auth_token(login_token)

        # 先创建测试用户
        import time
        timestamp = int(time.time())
        username = f"delete_test_user_{timestamp}"
        email = f"delete_test_{timestamp}@example.com"

        create_response = user_api.register(
            username=username,
            password="123456",
            email=email
        )
        test_user_id = create_response.extract("$.data.id")

        # 验证用户存在
        db_helper.assert_record_exists("users", {"id": test_user_id})

        # 删除用户
        delete_response = user_api.delete_user(test_user_id)
        delete_response.assert_status_code(200)

        # 验证用户已从数据库删除
        db_helper.assert_record_not_exists("users", {"id": test_user_id})

    def test_user_updated_in_db(self, db_helper, user_api, login_token, login_user_id):
        """
        测试更新用户后,数据库中的记录是否更新

        参数:
            db_helper: 数据库辅助工具
            user_api: 用户 API
            login_token: 登录 token
            login_user_id: 用户 ID
        """
        # 设置认证头
        user_api.client.set_auth_token(login_token)

        # 更新用户信息
        new_nickname = "数据库测试昵称"
        new_email = "dbtest@example.com"

        update_response = user_api.update_user_info(
            user_id=login_user_id,
            nickname=new_nickname,
            email=new_email
        )
        update_response.assert_status_code(200)

        # 验证数据库中的记录已更新
        user = db_helper.query_one(
            "SELECT * FROM users WHERE id = %s",
            (login_user_id,)
        )

        assert user is not None
        assert user["nickname"] == new_nickname
        assert user["email"] == new_email

8. Allure 测试报告

8.1 Allure 集成

8.1.1 安装 Allure 插件

确保已安装 allure-pytest

pip install allure-pytest

8.1.2 在测试中使用 Allure 装饰器

更新 tests/test_user_api.py,添加 Allure 装饰器:

"""
用户 API 测试用例(带 Allure 报告)
"""
import pytest
import allure
from pathlib import Path
from utils.yaml_loader import YAMLLoader
from clients.http_client import HttpClient
from utils.response_handler import ResponseHandler

@allure.epic("用户管理")
@allure.feature("用户 API")
class TestUserAPI:
    """用户 API 测试类"""

    @allure.story("用户登录")
    @allure.title("测试用户登录接口")
    @allure.description("验证用户登录接口的各种场景")
    @pytest.mark.parametrize("case", [
        pytest.param(case, id=case["name"])
        for case in YAMLLoader.load_test_cases(
            str(Path(__file__).parent / "test_cases" / "test_login.yaml")
        )
    ])
    @pytest.mark.api
    def test_login(self, http_client, case):
        """登录接口测试"""
        with allure.step(f"执行测试用例:{case['name']}"):
            # 处理测试用例
            processed_case = YAMLLoader.process_test_case(case)
            request = processed_case["request"]

            # 记录请求信息到 Allure
            allure.attach(
                f"请求方法: {request['method']}n"
                f"请求路径: {request['endpoint']}n"
                f"请求体: {request.get('json', {})}",
                name="请求信息",
                attachment_type=allure.attachment_type.TEXT
            )

            # 发送请求
            method = request["method"]
            endpoint = request["endpoint"]
            json_data = request.get("json")
            headers = request.get("headers", {})

            response = http_client.request(
                method=method,
                endpoint=endpoint,
                json_data=json_data,
                headers=headers
            )

            # 记录响应信息到 Allure
            allure.attach(
                f"状态码: {response.status_code}n"
                f"响应体: {response.text}",
                name="响应信息",
                attachment_type=allure.attachment_type.TEXT
            )

            # 创建响应处理器
            handler = ResponseHandler(response)

            # 验证响应
            validate = processed_case.get("validate", {})

            with allure.step("验证状态码"):
                if "status_code" in validate:
                    handler.assert_status_code(validate["status_code"])

            with allure.step("验证响应数据"):
                if "json" in validate:
                    handler.validate(json=validate["json"])

    @allure.story("用户信息")
    @allure.title("获取用户信息")
    @allure.description("测试获取指定用户的信息")
    @pytest.mark.api
    def test_get_user_info(self, user_api, login_token, login_user_id):
        """测试获取用户信息"""
        with allure.step("设置认证 Token"):
            user_api.client.set_auth_token(login_token)

        with allure.step("发送获取用户信息请求"):
            response = user_api.get_user_info(login_user_id)

        with allure.step("验证响应"):
            response.assert_status_code(200)
            response.assert_json_path("$.code", 0)

            # 记录提取的用户信息
            username = response.extract("$.data.username")
            email = response.extract("$.data.email")

            allure.attach(
                f"用户名: {username}n邮箱: {email}",
                name="用户信息",
                attachment_type=allure.attachment_type.TEXT
            )

            assert username is not None
            assert "@" in email

    @allure.story("用户创建")
    @allure.title("创建新用户")
    @allure.description("测试创建新用户接口")
    @pytest.mark.api
    def test_create_user(self, user_api):
        """测试创建用户"""
        import time

        with allure.step("准备测试数据"):
            timestamp = int(time.time())
            username = f"test_user_{timestamp}"
            email = f"test_{timestamp}@example.com"

            allure.attach(
                f"用户名: {username}n邮箱: {email}",
                name="测试数据",
                attachment_type=allure.attachment_type.TEXT
            )

        with allure.step("发送创建用户请求"):
            response = user_api.register(
                username=username,
                password="123456",
                email=email
            )

        with allure.step("验证响应"):
            response.assert_status_code(201)
            response.assert_json_path("$.code", 0)

            user_id = response.extract("$.data.id")
            allure.attach(
                f"创建的用户 ID: {user_id}",
                name="创建结果",
                attachment_type=allure.attachment_type.TEXT
            )

            assert user_id is not None

    @allure.story("文件上传")
    @allure.title("上传用户头像")
    @allure.description("测试上传用户头像文件")
    @pytest.mark.upload
    def test_upload_avatar(self, user_api, login_token, login_user_id):
        """测试上传用户头像"""
        import os

        with allure.step("设置认证 Token"):
            user_api.client.set_auth_token(login_token)

        with allure.step("准备上传文件"):
            test_file = Path(__file__).parent.parent / "data" / "upload_files" / "test.txt"

            if not test_file.exists():
                test_file.parent.mkdir(parents=True, exist_ok=True)
                test_file.write_text("测试文件内容", encoding="utf-8")

            allure.attach.file(
                str(test_file),
                name="上传文件",
                attachment_type=allure.attachment_type.TEXT
            )

        with allure.step("上传文件"):
            response = user_api.upload_avatar(login_user_id, str(test_file))

        with allure.step("验证上传结果"):
            response.assert_status_code(200)
            response.assert_json_path("$.code", 0)

            file_url = response.extract("$.data.file_url")
            allure.attach(
                f"文件 URL: {file_url}",
                name="上传结果",
                attachment_type=allure.attachment_type.TEXT
            )

            assert file_url is not None
            assert file_url.startswith("http")

8.2 生成 Allure 报告

8.2.1 运行测试并生成报告

# 运行测试,生成 Allure 结果
pytest tests/test_user_api.py --alluredir=reports/allure-results

# 生成 Allure 报告
allure generate reports/allure-results -o reports/allure-report --clean

# 打开报告(会自动在浏览器中打开)
allure open reports/allure-report

8.2.2 持续集成配置

创建 .github/workflows/test.yml(GitHub Actions 示例):

name: API Tests

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v2

    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.8'

    - name: Install dependencies
      run: |
        pip install -r requirements.txt

    - name: Install Allure
      run: |
        wget https://github.com/allure-framework/allure2/releases/download/2.13.2/allure-2.13.2.tgz
        tar -zxvf allure-2.13.2.tgz
        sudo mv allure-2.13.2 /opt/allure
        sudo ln -s /opt/allure/bin/allure /usr/local/bin/allure

    - name: Run tests
      run: |
        pytest tests/ --alluredir=reports/allure-results

    - name: Generate Allure Report
      run: |
        allure generate reports/allure-results -o reports/allure-report --clean

    - name: Upload Allure Report
      uses: actions/upload-artifact@v2
      with:
        name: allure-report
        path: reports/allure-report

9. 完整示例:端到端测试

9.1 完整的业务流程测试

创建 tests/test_complete_flow.py

"""
完整的业务流程测试
演示如何串联多个接口进行端到端测试
"""
import pytest
import allure
import time
from api.user_api import UserAPI
from utils.db_helper import DBHelper

@allure.epic("业务流程")
@allure.feature("用户完整流程")
class TestCompleteFlow:
    """完整流程测试类"""

    @allure.story("用户注册到删除完整流程")
    @allure.title("用户完整生命周期测试")
    @pytest.mark.api
    def test_user_lifecycle(self, user_api, db_helper):
        """
        测试用户的完整生命周期:
        1. 注册用户
        2. 登录获取 Token
        3. 获取用户信息
        4. 更新用户信息
        5. 上传头像
        6. 删除用户
        7. 验证数据库
        """
        timestamp = int(time.time())
        username = f"lifecycle_user_{timestamp}"
        email = f"lifecycle_{timestamp}@example.com"
        password = "123456"

        # 步骤 1:注册用户
        with allure.step("1. 注册新用户"):
            register_response = user_api.register(
                username=username,
                password=password,
                email=email
            )
            register_response.assert_status_code(201)
            user_id = register_response.extract("$.data.id")

            allure.attach(
                f"用户 ID: {user_id}n用户名: {username}n邮箱: {email}",
                name="注册信息",
                attachment_type=allure.attachment_type.TEXT
            )

            # 验证数据库
            db_helper.assert_record_exists("users", {"id": user_id, "username": username})

        # 步骤 2:登录
        with allure.step("2. 用户登录"):
            login_response = user_api.login(username, password)
            login_response.assert_status_code(200)
            token = login_response.extract("$.data.token")

            allure.attach(
                f"Token: {token[:20]}...",
                name="登录 Token",
                attachment_type=allure.attachment_type.TEXT
            )

        # 步骤 3:获取用户信息
        with allure.step("3. 获取用户信息"):
            user_api.client.set_auth_token(token)
            info_response = user_api.get_user_info(user_id)
            info_response.assert_status_code(200)

            user_info = {
                "id": info_response.extract("$.data.id"),
                "username": info_response.extract("$.data.username"),
                "email": info_response.extract("$.data.email")
            }

            allure.attach(
                f"用户信息: {user_info}",
                name="用户信息",
                attachment_type=allure.attachment_type.JSON
            )

        # 步骤 4:更新用户信息
        with allure.step("4. 更新用户信息"):
            new_nickname = "生命周期测试昵称"
            new_email = f"updated_{timestamp}@example.com"

            update_response = user_api.update_user_info(
                user_id=user_id,
                nickname=new_nickname,
                email=new_email
            )
            update_response.assert_status_code(200)

            # 验证数据库更新
            db_user = db_helper.query_one("SELECT * FROM users WHERE id = %s", (user_id,))
            assert db_user["nickname"] == new_nickname
            assert db_user["email"] == new_email

        # 步骤 5:上传头像
        with allure.step("5. 上传用户头像"):
            from pathlib import Path
            test_file = Path(__file__).parent.parent / "data" / "upload_files" / "test.txt"

            if not test_file.exists():
                test_file.parent.mkdir(parents=True, exist_ok=True)
                test_file.write_text("测试文件内容", encoding="utf-8")

            upload_response = user_api.upload_avatar(user_id, str(test_file))
            upload_response.assert_status_code(200)

            avatar_url = upload_response.extract("$.data.file_url")
            allure.attach(
                f"头像 URL: {avatar_url}",
                name="上传结果",
                attachment_type=allure.attachment_type.TEXT
            )

        # 步骤 6:删除用户
        with allure.step("6. 删除用户"):
            delete_response = user_api.delete_user(user_id)
            delete_response.assert_status_code(200)

            # 验证数据库删除
            db_helper.assert_record_not_exists("users", {"id": user_id})

            allure.attach(
                "用户已成功删除",
                name="删除结果",
                attachment_type=allure.attachment_type.TEXT
            )

10. 运行测试和生成报告

10.1 运行测试

10.1.1 基本运行

# 运行所有测试
pytest

# 运行指定测试文件
pytest tests/test_user_api.py

# 运行指定测试类
pytest tests/test_user_api.py::TestUserAPI

# 运行指定测试方法
pytest tests/test_user_api.py::TestUserAPI::test_login

10.1.2 带标记运行

# 运行标记为 api 的测试
pytest -m api

# 运行标记为 upload 的测试
pytest -m upload

# 运行标记为 database 的测试
pytest -m database

# 运行多个标记的测试
pytest -m "api or upload"

10.1.3 指定环境运行

# 使用测试环境
pytest --env=test

# 使用开发环境
pytest --env=dev

# 使用生产环境
pytest --env=prod

10.2 生成报告

10.2.1 HTML 报告

# 生成 HTML 报告
pytest --html=reports/report.html --self-contained-html

# 打开报告
# Windows
start reports/report.html

# Mac/Linux
open reports/report.html

10.2.2 Allure 报告

# 1. 运行测试并生成 Allure 结果
pytest tests/ --alluredir=reports/allure-results

# 2. 生成 Allure 报告
allure generate reports/allure-results -o reports/allure-report --clean

# 3. 打开报告
allure open reports/allure-report

10.3 查看日志

# 运行测试并显示日志
pytest -v -s --log-cli-level=INFO

# 查看日志文件
# Windows
type logspytest.log

# Mac/Linux
cat logs/pytest.log

11. 最佳实践和注意事项

11.1 YAML 用例编写规范

  1. 用例命名清晰:使用描述性的用例名称
  2. 结构统一:保持所有用例的结构一致
  3. 变量使用:合理使用变量,避免硬编码
  4. 验证完整:包含充分的验证规则
  5. 依赖明确:明确标注用例之间的依赖关系

11.2 代码封装原则

  1. 单一职责:每个类和方法只负责一个功能
  2. DRY 原则:避免重复代码
  3. 配置分离:配置与代码分离
  4. 错误处理:统一的错误处理机制
  5. 日志记录:完善的日志记录

11.3 测试数据管理

  1. 数据分离:测试数据与代码分离
  2. 数据清理:测试后清理测试数据
  3. 数据复用:合理复用测试数据
  4. 数据安全:不要提交敏感数据到版本控制

11.4 性能优化

  1. 使用 Session:复用 HTTP 连接
  2. 合理超时:设置合理的超时时间
  3. 并行执行:使用 pytest-xdist 并行执行测试
  4. 日志级别:生产环境使用较高的日志级别

12. 常见问题解答

12.1 如何调试失败的测试?

问题:测试失败时如何快速定位问题?

解决方案

  1. 查看日志:检查 logs/pytest.loglogs/http_client.log
  2. 使用 -s 参数pytest -s 显示所有输出
  3. 使用 --pdbpytest --pdb 在失败时进入调试器
  4. 查看 Allure 报告:报告中包含详细的请求和响应信息

12.2 如何处理接口依赖?

问题:测试用例之间有依赖关系怎么办?

解决方案

  1. 使用 Fixture:在 conftest.py 中定义依赖的 fixture
  2. 使用变量提取:从上一个接口的响应中提取数据
  3. 使用 YAML 的 depends_on:在 YAML 用例中标注依赖关系

12.3 如何管理测试环境?

问题:如何在不同环境之间切换?

解决方案

  1. 使用配置文件:为每个环境创建独立的配置文件
  2. 使用环境变量:通过环境变量覆盖配置
  3. 使用命令行参数pytest --env=test

12.4 如何处理文件上传?

问题:如何测试文件上传接口?

解决方案

  1. 准备测试文件:在 data/upload_files/ 目录下准备测试文件
  2. 使用 files 参数:在请求中使用 files 参数
  3. 验证上传结果:验证返回的文件 URL 或文件信息

13. 总结

13.1 本教程涵盖的内容

通过本教程,你学会了:

  1. 配置管理:如何管理不同环境的配置
  2. HTTP 客户端封装:如何封装统一的 HTTP 请求方法
  3. 日志记录:如何记录详细的 HTTP 请求和响应日志
  4. 响应处理:如何处理和验证响应数据
  5. YAML 用例:如何编写和维护 YAML 格式的测试用例
  6. 数据驱动:如何实现数据驱动的测试
  7. 变量提取:如何从响应中提取数据
  8. 自定义断言:如何扩展断言功能
  9. 数据库验证:如何验证数据库状态
  10. 文件上传:如何测试文件上传接口
  11. Allure 报告:如何生成美观的测试报告

13.2 下一步学习

完成本教程后,你可以:

  1. 扩展功能:根据实际需求扩展框架功能
  2. 优化性能:优化测试执行性能
  3. 集成 CI/CD:将测试集成到持续集成流程
  4. 学习高级特性:学习 pytest 的高级特性

13.3 参考资料


恭喜你完成了 pytest 接口自动化封装实战的学习!希望这份详细的教程能够帮助你构建一个强大的接口测试框架! 🎉

发表评论