26.pytest的接口自动化封装实战

Pytest 接口自动化封装实战 – 零代码框架完整教程

前言

本教程适合谁

本教程专为新手设计,如果你:

  • 刚开始学习 pytest 接口自动化测试
  • 想要了解如何从零开始构建一个完整的零代码测试框架
  • 需要详细的示例和步骤说明
  • 希望掌握企业级接口自动化测试的最佳实践

那么这份教程非常适合你!

本教程包含什么

本教程将带你从零开始,构建一个完整的零代码接口自动化测试框架,涵盖以下16个核心主题:

  1. YAML测试用例规范封装 – 使用YAML编写测试用例,实现代码与用例分离
  2. 用例执行顺序 – 控制测试用例的执行顺序,支持依赖关系
  3. 接口关联改进封装 – 自动提取和使用接口返回的数据
  4. 标准化流程封装 – 统一测试流程,提高可维护性
  5. 热加载封装 – 动态加载配置和用例,无需重启
  6. 统一断言封装 – 封装各种断言方法,简化验证逻辑
  7. 数据库断言封装 – 验证数据库中的数据是否符合预期
  8. Exception异常处理封装 – 统一的异常处理机制
  9. Allure企业级测试报告改进 – 生成美观的测试报告
  10. 全局配置文件封装 – 统一管理配置信息
  11. 流程用例封装 – 支持复杂的业务流程测试
  12. 数据驱动封装 – 使用YAML实现数据驱动测试
  13. MD5、Base64、RSA等加密封装 – 支持各种加密算法
  14. 接口签名验证封装 – 自动生成和验证接口签名
  15. 基础路径Base_URL封装 – 统一管理API基础路径
  16. 完整零代码极限封装 – 实现真正的零代码测试

学习目标

完成本教程后,你将能够:

  • 理解零代码测试框架的设计思路
  • 搭建一个完整的接口测试框架
  • 编写和维护YAML格式的测试用例
  • 实现各种高级功能(加密、签名、数据库验证等)
  • 生成专业的测试报告
  • 掌握企业级测试框架的最佳实践

1. 环境准备

1.1 Python环境

确保你已经安装了Python 3.8或更高版本:

python --version
# 应该显示:Python 3.8.x 或更高版本

1.2 安装依赖包

创建 requirements.txt 文件:

# 测试框架
pytest==7.4.3
pytest-html==4.1.1
pytest-xdist==3.5.0
pytest-rerunfailures==12.0
pytest-ordering==0.6

# HTTP请求
requests==2.31.0

# 数据处理
pyyaml==6.0.1
jsonpath==0.82

# 响应验证
responses-validator==0.3.0

# 日志
loguru==0.7.2

# 测试报告
allure-pytest==2.13.2

# 数据库
pymysql==1.1.0
sqlalchemy==2.0.23

# 加密相关
cryptography==41.0.7
pycryptodome==3.19.0

# 其他工具
python-dotenv==1.0.0
faker==20.1.0

安装所有依赖:

pip install -r requirements.txt

1.3 安装Allure

Windows系统:

  1. 下载Allure:https://github.com/allure-framework/allure2/releases
  2. 解压到某个目录(如 C:allure
  3. C:allurebin 添加到系统环境变量PATH中
  4. 验证安装:
allure --version

Mac/Linux系统:

# 使用Homebrew (Mac)
brew install allure

# 或使用Scoop (Windows)
scoop install allure

1.4 项目结构

创建以下项目结构:

api_test_framework/
├── config/                      # 配置目录
│   ├── __init__.py
│   ├── config.py               # 配置管理类
│   ├── test_config.yaml        # 测试环境配置
│   └── dev_config.yaml         # 开发环境配置
│
├── clients/                    # HTTP客户端目录
│   ├── __init__.py
│   └── http_client.py          # HTTP客户端封装
│
├── api/                        # 业务接口目录
│   ├── __init__.py
│   ├── base_api.py             # 基础API类
│   └── user_api.py             # 用户相关接口
│
├── utils/                      # 工具类目录
│   ├── __init__.py
│   ├── logger.py               # 日志工具
│   ├── response_handler.py     # 响应处理器
│   ├── assertion.py            # 自定义断言
│   ├── db_helper.py            # 数据库辅助工具
│   ├── extractor.py            # 变量提取工具
│   ├── encrypt.py              # 加密工具
│   └── signature.py            # 签名工具
│
├── tests/                      # 测试用例目录
│   ├── __init__.py
│   ├── conftest.py             # pytest配置
│   ├── test_cases/             # YAML测试用例
│   │   ├── test_login.yaml
│   │   ├── test_user.yaml
│   │   └── test_flow.yaml
│   └── test_user_api.py        # 测试文件
│
├── data/                       # 测试数据目录
│   ├── test_users.yaml
│   └── upload_files/           # 上传文件目录
│       └── test.txt
│
├── logs/                       # 日志目录(自动生成)
│   └── .gitkeep
│
├── reports/                    # 测试报告目录(自动生成)
│   └── .gitkeep
│
├── requirements.txt            # 依赖文件
├── pytest.ini                  # pytest配置
└── README.md                   # 项目说明

2. 主题1:YAML测试用例规范封装

2.1 什么是YAML测试用例

YAML(YAML Ain’t Markup Language)是一种人类可读的数据序列化标准。在接口自动化测试中,使用YAML编写测试用例可以实现代码与用例分离,让测试用例更加清晰易懂。

2.2 为什么使用YAML

传统方式(代码编写用例):

# 传统方式 - 用例和代码混在一起
def test_login():
    url = "https://api.example.com/login"
    headers = {"Content-Type": "application/json"}
    data = {"username": "admin", "password": "123456"}
    response = requests.post(url, json=data, headers=headers)
    assert response.status_code == 200
    assert response.json()["code"] == 0

YAML方式(用例与代码分离):

# YAML方式 - 用例清晰,易于维护
name: 用户登录测试
request:
  method: POST
  url: /login
  headers:
    Content-Type: application/json
  json:
    username: admin
    password: "123456"
validate:
  - eq: [status_code, 200]
  - eq: [body.code, 0]

YAML方式的优点:

  1. 可读性强:非技术人员也能看懂测试用例
  2. 易于维护:修改用例不需要改代码
  3. 支持数据驱动:可以轻松实现数据驱动测试
  4. 版本控制友好:YAML文件易于进行版本管理

2.3 YAML测试用例规范设计

2.3.1 基础结构

一个标准的YAML测试用例应该包含以下部分:

# 测试用例基础结构
name: 测试用例名称                    # 用例名称
description: 测试用例描述              # 用例描述(可选)
tags: [smoke, login]                 # 标签(可选)
order: 1                             # 执行顺序(可选)
request:                             # 请求部分
  method: GET                        # HTTP方法
  url: /api/user/info                # 接口路径
  headers:                           # 请求头(可选)
    Content-Type: application/json
  params:                            # URL参数(可选)
    id: 123
  json:                              # JSON请求体(可选)
    key: value
validate:                            # 断言部分
  - eq: [status_code, 200]           # 断言:状态码等于200
extract:                             # 变量提取(可选)
  token: body.data.token             # 提取token

2.3.2 完整示例

示例1:GET请求

name: 获取用户信息
description: 测试获取用户信息的接口
request:
  method: GET
  url: /api/user/info
  headers:
    Authorization: Bearer ${token}   # 使用变量
  params:
    user_id: 123
validate:
  - eq: [status_code, 200]
  - eq: [body.code, 0]
  - eq: [body.data.username, "admin"]
extract:
  user_id: body.data.id

示例2:POST请求

name: 用户登录
description: 测试用户登录接口
request:
  method: POST
  url: /api/login
  headers:
    Content-Type: application/json
  json:
    username: admin
    password: "123456"
validate:
  - eq: [status_code, 200]
  - eq: [body.code, 0]
  - eq: [body.message, "登录成功"]
extract:
  token: body.data.token
  user_id: body.data.user_id

示例3:PUT请求

name: 更新用户信息
description: 测试更新用户信息的接口
request:
  method: PUT
  url: /api/user/update
  headers:
    Content-Type: application/json
    Authorization: Bearer ${token}
  json:
    username: new_admin
    email: new_admin@example.com
validate:
  - eq: [status_code, 200]
  - eq: [body.code, 0]

示例4:DELETE请求

name: 删除用户
description: 测试删除用户的接口
request:
  method: DELETE
  url: /api/user/delete
  headers:
    Authorization: Bearer ${token}
  params:
    user_id: ${user_id}
validate:
  - eq: [status_code, 200]
  - eq: [body.code, 0]

示例5:文件上传

name: 上传文件
description: 测试文件上传接口
request:
  method: POST
  url: /api/upload
  headers:
    Authorization: Bearer ${token}
  files:
    file: data/upload_files/test.txt
validate:
  - eq: [status_code, 200]
  - eq: [body.code, 0]
extract:
  file_url: body.data.url

2.4 YAML用例解析器实现

创建 utils/yaml_loader.py

"""
YAML测试用例加载器
用于加载和解析YAML格式的测试用例
"""
import yaml
import os
from pathlib import Path
from typing import Dict, List, Any, Optional

class YamlLoader:
    """YAML用例加载器"""

    def __init__(self, base_dir: Optional[str] = None):
        """
        初始化YAML加载器

        参数:
            base_dir: 用例文件的基础目录
        """
        if base_dir:
            self.base_dir = Path(base_dir)
        else:
            # 默认使用项目根目录下的tests/test_cases目录
            self.base_dir = Path(__file__).parent.parent / "tests" / "test_cases"

    def load_file(self, file_path: str) -> Dict[str, Any]:
        """
        加载单个YAML文件

        参数:
            file_path: YAML文件路径(相对路径或绝对路径)

        返回:
            解析后的字典

        示例:
            loader = YamlLoader()
            case = loader.load_file("test_login.yaml")
        """
        # 如果是相对路径,使用base_dir
        if not os.path.isabs(file_path):
            file_path = self.base_dir / file_path

        file_path = Path(file_path)

        if not file_path.exists():
            raise FileNotFoundError(f"用例文件不存在: {file_path}")

        with open(file_path, "r", encoding="utf-8") as f:
            content = yaml.safe_load(f)

        return content

    def load_dir(self, dir_path: Optional[str] = None) -> List[Dict[str, Any]]:
        """
        加载目录下所有YAML文件

        参数:
            dir_path: 目录路径(可选,默认使用base_dir)

        返回:
            所有用例的列表

        示例:
            loader = YamlLoader()
            cases = loader.load_dir()
        """
        if dir_path:
            target_dir = Path(dir_path)
        else:
            target_dir = self.base_dir

        if not target_dir.exists():
            raise FileNotFoundError(f"用例目录不存在: {target_dir}")

        cases = []
        # 遍历目录下所有.yaml和.yml文件
        for file_path in target_dir.glob("*.yaml"):
            try:
                case = self.load_file(file_path)
                # 添加文件路径信息
                case["_file_path"] = str(file_path)
                cases.append(case)
            except Exception as e:
                print(f"加载用例文件失败 {file_path}: {e}")

        for file_path in target_dir.glob("*.yml"):
            try:
                case = self.load_file(file_path)
                case["_file_path"] = str(file_path)
                cases.append(case)
            except Exception as e:
                print(f"加载用例文件失败 {file_path}: {e}")

        return cases

    def validate_case(self, case: Dict[str, Any]) -> bool:
        """
        验证用例格式是否正确

        参数:
            case: 用例字典

        返回:
            是否有效

        示例:
            loader = YamlLoader()
            case = loader.load_file("test_login.yaml")
            if loader.validate_case(case):
                print("用例格式正确")
        """
        # 必须包含name和request
        if "name" not in case:
            raise ValueError("用例必须包含name字段")

        if "request" not in case:
            raise ValueError("用例必须包含request字段")

        request = case["request"]

        # request必须包含method和url
        if "method" not in request:
            raise ValueError("request必须包含method字段")

        if "url" not in request:
            raise ValueError("request必须包含url字段")

        # method必须是有效的HTTP方法
        valid_methods = ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"]
        if request["method"].upper() not in valid_methods:
            raise ValueError(f"无效的HTTP方法: {request['method']}")

        return True

2.5 使用示例

创建测试用例文件 tests/test_cases/test_login.yaml

name: 用户登录测试
description: 测试用户登录功能,验证登录成功后返回token
tags: [smoke, login]
order: 1
request:
  method: POST
  url: /api/login
  headers:
    Content-Type: application/json
  json:
    username: admin
    password: "123456"
validate:
  - eq: [status_code, 200]
  - eq: [body.code, 0]
  - eq: [body.message, "登录成功"]
  - type: [body.data.token, str]
extract:
  token: body.data.token
  user_id: body.data.user_id

在测试代码中使用:

import pytest
from utils.yaml_loader import YamlLoader
from clients.http_client import HttpClient

class TestLogin:
    """登录测试类"""

    def setup_method(self):
        """每个测试方法执行前的初始化"""
        self.loader = YamlLoader()
        self.client = HttpClient(base_url="https://api.example.com")

    def test_login(self):
        """测试登录接口"""
        # 加载YAML用例
        case = self.loader.load_file("test_login.yaml")

        # 验证用例格式
        self.loader.validate_case(case)

        # 执行请求
        request_data = case["request"]
        response = self.client.request(
            method=request_data["method"],
            url=request_data["url"],
            headers=request_data.get("headers"),
            json=request_data.get("json")
        )

        # 执行断言
        for assertion in case.get("validate", []):
            # 这里需要实现断言逻辑(后面会详细讲解)
            pass

        # 提取变量
        for key, path in case.get("extract", {}).items():
            # 这里需要实现变量提取逻辑(后面会详细讲解)
            pass

2.6 高级特性

2.6.1 支持变量引用

name: 获取用户信息
request:
  method: GET
  url: /api/user/info
  headers:
    Authorization: Bearer ${token}      # 引用变量token
  params:
    user_id: ${user_id}                # 引用变量user_id

2.6.2 支持函数调用

name: 创建用户
request:
  method: POST
  url: /api/user/create
  json:
    username: ${random_string(10)}     # 生成随机字符串
    email: ${random_email()}           # 生成随机邮箱
    timestamp: ${timestamp()}          # 获取当前时间戳

2.6.3 支持条件执行

name: 条件测试
if: ${env} == "test"                   # 只在test环境执行
request:
  method: GET
  url: /api/test/endpoint

3. 主题2:用例执行顺序

3.1 为什么需要控制执行顺序

在接口自动化测试中,有些测试用例之间存在依赖关系:

  • 登录用例必须在获取用户信息用例之前执行
  • 创建用户用例必须在更新用户用例之前执行
  • 创建订单用例必须在支付订单用例之前执行

如果不控制执行顺序,可能会导致测试失败。

3.2 pytest-ordering插件

pytest默认不保证测试用例的执行顺序,但我们可以使用pytest-ordering插件来控制执行顺序。

3.2.1 安装插件

pip install pytest-ordering

3.2.2 使用装饰器控制顺序

方法1:使用order标记

import pytest

@pytest.mark.order(1)
def test_login():
    """第一个执行"""
    pass

@pytest.mark.order(2)
def test_get_user_info():
    """第二个执行"""
    pass

@pytest.mark.order(3)
def test_logout():
    """第三个执行"""
    pass

方法2:使用first/last标记

import pytest

@pytest.mark.first
def test_setup():
    """最先执行"""
    pass

@pytest.mark.last
def test_teardown():
    """最后执行"""
    pass

def test_normal():
    """中间执行"""
    pass

方法3:使用second_to_last等标记

import pytest

@pytest.mark.first
def test_setup():
    pass

@pytest.mark.second_to_last
def test_cleanup():
    pass

@pytest.mark.last
def test_teardown():
    pass

3.3 在YAML用例中指定执行顺序

3.3.1 YAML用例中添加order字段

# test_login.yaml
name: 用户登录
order: 1
request:
  method: POST
  url: /api/login
# test_user_info.yaml
name: 获取用户信息
order: 2
request:
  method: GET
  url: /api/user/info
# test_logout.yaml
name: 用户登出
order: 3
request:
  method: POST
  url: /api/logout

3.3.2 实现用例排序功能

创建 utils/case_sorter.py

"""
用例排序工具
根据order字段对用例进行排序
"""
from typing import List, Dict, Any

class CaseSorter:
    """用例排序器"""

    @staticmethod
    def sort_cases(cases: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        对用例列表进行排序

        参数:
            cases: 用例列表

        返回:
            排序后的用例列表

        排序规则:
            1. 有order字段的用例优先,按order值排序
            2. 没有order字段的用例排在后面,保持原有顺序

        示例:
            sorter = CaseSorter()
            sorted_cases = sorter.sort_cases(cases)
        """
        # 分离有order和没有order的用例
        cases_with_order = []
        cases_without_order = []

        for case in cases:
            if "order" in case and case["order"] is not None:
                cases_with_order.append(case)
            else:
                cases_without_order.append(case)

        # 对有order的用例进行排序
        cases_with_order.sort(key=lambda x: x["order"])

        # 合并结果
        return cases_with_order + cases_without_order

    @staticmethod
    def validate_order(cases: List[Dict[str, Any]]) -> bool:
        """
        验证用例的order值是否有效

        参数:
            cases: 用例列表

        返回:
            是否有效

        验证规则:
            1. order必须是数字
            2. order不能重复(可选,根据需求决定)
        """
        orders = []
        for case in cases:
            if "order" in case:
                order = case["order"]
                if not isinstance(order, (int, float)):
                    raise ValueError(f"用例 {case.get('name', 'unknown')} 的order必须是数字")
                if order in orders:
                    # 如果允许重复order,可以注释掉这部分
                    raise ValueError(f"用例order值重复: {order}")
                orders.append(order)

        return True

3.4 在pytest中使用排序

3.4.1 修改conftest.py

创建 tests/conftest.py

"""
pytest配置文件
"""
import pytest
from utils.yaml_loader import YamlLoader
from utils.case_sorter import CaseSorter

def pytest_collection_modifyitems(config, items):
    """
    修改测试用例收集顺序
    这个hook会在收集完所有测试用例后执行
    """
    # 获取所有YAML用例
    loader = YamlLoader()
    yaml_cases = loader.load_dir()

    # 对用例进行排序
    sorter = CaseSorter()
    sorter.validate_order(yaml_cases)
    sorted_cases = sorter.sort_cases(yaml_cases)

    # 创建用例名称到order的映射
    case_order_map = {}
    for case in sorted_cases:
        case_name = case.get("name", "")
        order = case.get("order", 9999)  # 没有order的用例order为9999
        case_order_map[case_name] = order

    # 对pytest收集的测试用例进行排序
    def get_test_order(item):
        """获取测试用例的执行顺序"""
        # 从测试用例名称中提取用例名
        test_name = item.name

        # 尝试从marker中获取order
        order_marker = item.get_closest_marker("order")
        if order_marker:
            return order_marker.args[0] if order_marker.args else 9999

        # 尝试从用例名称匹配YAML用例的order
        for case_name, order in case_order_map.items():
            if case_name in test_name or test_name in case_name:
                return order

        return 9999

    # 对items进行排序
    items.sort(key=get_test_order)

3.4.2 在测试类中使用

import pytest
from utils.yaml_loader import YamlLoader
from clients.http_client import HttpClient

class TestUserFlow:
    """用户流程测试"""

    @pytest.mark.order(1)
    def test_login(self):
        """测试登录 - 第一个执行"""
        loader = YamlLoader()
        case = loader.load_file("test_login.yaml")
        # 执行测试...

    @pytest.mark.order(2)
    def test_get_user_info(self):
        """获取用户信息 - 第二个执行"""
        loader = YamlLoader()
        case = loader.load_file("test_user_info.yaml")
        # 执行测试...

    @pytest.mark.order(3)
    def test_logout(self):
        """测试登出 - 第三个执行"""
        loader = YamlLoader()
        case = loader.load_file("test_logout.yaml")
        # 执行测试...

3.5 依赖关系管理

3.5.1 在YAML中定义依赖

name: 获取用户信息
depends_on: [登录]                    # 依赖登录用例
order: 2
request:
  method: GET
  url: /api/user/info

3.5.2 实现依赖检查

创建 utils/dependency_checker.py

"""
依赖关系检查器
检查用例之间的依赖关系是否满足
"""
from typing import List, Dict, Any, Set

class DependencyChecker:
    """依赖关系检查器"""

    @staticmethod
    def check_dependencies(cases: List[Dict[str, Any]]) -> bool:
        """
        检查用例依赖关系是否满足

        参数:
            cases: 用例列表

        返回:
            是否满足依赖关系

        示例:
            checker = DependencyChecker()
            if checker.check_dependencies(cases):
                print("依赖关系满足")
        """
        # 创建用例名称到用例的映射
        case_map = {case["name"]: case for case in cases}

        # 检查每个用例的依赖
        for case in cases:
            depends_on = case.get("depends_on", [])

            if not depends_on:
                continue

            # 检查依赖的用例是否存在
            for dep_name in depends_on:
                if dep_name not in case_map:
                    raise ValueError(
                        f"用例 '{case['name']}' 依赖的用例 '{dep_name}' 不存在"
                    )

            # 检查依赖的用例是否在当前用例之前
            current_order = case.get("order", 9999)
            for dep_name in depends_on:
                dep_case = case_map[dep_name]
                dep_order = dep_case.get("order", 9999)

                if dep_order >= current_order:
                    raise ValueError(
                        f"用例 '{case['name']}' (order={current_order}) "
                        f"依赖的用例 '{dep_name}' (order={dep_order}) "
                        f"必须在当前用例之前执行"
                    )

        return True

    @staticmethod
    def get_execution_order(cases: List[Dict[str, Any]]) -> List[str]:
        """
        根据依赖关系获取执行顺序

        参数:
            cases: 用例列表

        返回:
            用例名称的执行顺序列表
        """
        # 先按order排序
        from utils.case_sorter import CaseSorter
        sorted_cases = CaseSorter.sort_cases(cases)

        # 检查依赖关系
        DependencyChecker.check_dependencies(sorted_cases)

        # 返回用例名称列表
        return [case["name"] for case in sorted_cases]

3.6 完整示例

创建多个测试用例:

# test_01_login.yaml
name: 用户登录
order: 1
request:
  method: POST
  url: /api/login
  json:
    username: admin
    password: "123456"
extract:
  token: body.data.token
# test_02_user_info.yaml
name: 获取用户信息
depends_on: [用户登录]
order: 2
request:
  method: GET
  url: /api/user/info
  headers:
    Authorization: Bearer ${token}
# test_03_logout.yaml
name: 用户登出
depends_on: [用户登录, 获取用户信息]
order: 3
request:
  method: POST
  url: /api/logout
  headers:
    Authorization: Bearer ${token}

使用示例:

from utils.yaml_loader import YamlLoader
from utils.case_sorter import CaseSorter
from utils.dependency_checker import DependencyChecker

# 加载用例
loader = YamlLoader()
cases = loader.load_dir()

# 检查依赖关系
checker = DependencyChecker()
checker.check_dependencies(cases)

# 获取执行顺序
execution_order = checker.get_execution_order(cases)
print("执行顺序:", execution_order)
# 输出: ['用户登录', '获取用户信息', '用户登出']

# 排序用例
sorter = CaseSorter()
sorted_cases = sorter.sort_cases(cases)

# 按顺序执行用例
for case in sorted_cases:
    print(f"执行用例: {case['name']}")
    # 执行测试...

4. 主题3:接口关联改进封装

4.1 什么是接口关联

接口关联是指一个接口的返回数据被另一个接口使用。例如:

  1. 登录接口返回token
  2. 获取用户信息接口需要使用这个token作为请求头

这就是接口关联。

4.2 为什么需要接口关联封装

不封装的方式(手动传递):

def test_login_and_get_user_info():
    # 登录
    login_response = requests.post("/api/login", json={"username": "admin"})
    token = login_response.json()["data"]["token"]

    # 获取用户信息(手动传递token)
    headers = {"Authorization": f"Bearer {token}"}
    user_response = requests.get("/api/user/info", headers=headers)

封装后的方式(自动关联):

def test_login_and_get_user_info():
    # 登录并自动提取token
    login_response = client.login(username="admin")
    token = login_response.extract("data.token")

    # 获取用户信息(自动使用token)
    user_response = client.get_user_info()  # token自动从上下文中获取

4.3 变量提取器实现

创建 utils/extractor.py

"""
变量提取器
从响应中提取数据并存储到上下文中
"""
import jsonpath
from typing import Any, Dict, Optional

class VariableContext:
    """变量上下文(全局变量存储)"""

    _instance = None
    _variables = {}

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    def set(self, key: str, value: Any):
        """设置变量"""
        self._variables[key] = value

    def get(self, key: str, default: Any = None) -> Any:
        """获取变量"""
        return self._variables.get(key, default)

    def clear(self):
        """清空所有变量"""
        self._variables.clear()

    def get_all(self) -> Dict[str, Any]:
        """获取所有变量"""
        return self._variables.copy()

class Extractor:
    """变量提取器"""

    def __init__(self):
        self.context = VariableContext()

    def extract(self, response: Any, extract_config: Dict[str, str]) -> Dict[str, Any]:
        """
        从响应中提取变量

        参数:
            response: requests.Response对象或字典
            extract_config: 提取配置,格式:{变量名: jsonpath路径}

        返回:
            提取的变量字典

        示例:
            extractor = Extractor()
            variables = extractor.extract(
                response,
                {"token": "body.data.token", "user_id": "body.data.user_id"}
            )
        """
        # 将response转换为字典格式
        if hasattr(response, "json"):
            try:
                response_dict = {
                    "status_code": response.status_code,
                    "headers": dict(response.headers),
                    "body": response.json()
                }
            except:
                response_dict = {
                    "status_code": response.status_code,
                    "headers": dict(response.headers),
                    "body": response.text
                }
        else:
            response_dict = response

        extracted_vars = {}

        for var_name, path in extract_config.items():
            value = self._extract_value(response_dict, path)
            if value is not None:
                # 存储到上下文
                self.context.set(var_name, value)
                extracted_vars[var_name] = value

        return extracted_vars

    def _extract_value(self, data: Dict, path: str) -> Any:
        """
        从数据中提取值

        支持的路径格式:
            - body.data.token          # 点号分隔的路径
            - body.data.items[0].id    # 支持数组索引
            - $..token                 # jsonpath表达式
            - headers.Authorization    # 提取响应头
            - status_code              # 提取状态码
        """
        # 处理jsonpath表达式
        if path.startswith("$"):
            try:
                result = jsonpath.jsonpath(data, path)
                if result:
                    return result[0] if len(result) == 1 else result
            except:
                pass

        # 处理点号分隔的路径
        parts = path.split(".")
        current = data

        for part in parts:
            # 处理数组索引,如 items[0]
            if "[" in part and "]" in part:
                key = part[:part.index("[")]
                index = int(part[part.index("[") + 1:part.index("]")])
                if key in current:
                    current = current[key]
                    if isinstance(current, list) and 0 <= index < len(current):
                        current = current[index]
                    else:
                        return None
                else:
                    return None
            else:
                if isinstance(current, dict) and part in current:
                    current = current[part]
                else:
                    return None

        return current

    def replace_variables(self, data: Any) -> Any:
        """
        替换数据中的变量引用

        支持的格式:
            - ${variable_name}         # 简单变量
            - ${variable_name:default} # 带默认值

        参数:
            data: 需要替换的数据(可以是字符串、字典、列表等)

        返回:
            替换后的数据

        示例:
            extractor = Extractor()
            extractor.context.set("token", "abc123")
            result = extractor.replace_variables("Bearer ${token}")
            # 结果: "Bearer abc123"
        """
        import re

        if isinstance(data, str):
            # 替换字符串中的变量
            pattern = r'${([^}:]+)(?::([^}]+))?}'

            def replace_match(match):
                var_name = match.group(1)
                default = match.group(2)
                value = self.context.get(var_name)
                if value is not None:
                    return str(value)
                elif default is not None:
                    return default
                else:
                    return match.group(0)  # 找不到变量,保持原样

            return re.sub(pattern, replace_match, data)

        elif isinstance(data, dict):
            # 递归处理字典
            return {k: self.replace_variables(v) for k, v in data.items()}

        elif isinstance(data, list):
            # 递归处理列表
            return [self.replace_variables(item) for item in data]

        else:
            return data

4.4 在HTTP客户端中集成变量提取

修改 clients/http_client.py

"""
HTTP客户端封装模块(增强版)
支持变量提取和自动替换
"""
import requests
from typing import Dict, Optional, Any
from utils.extractor import Extractor
from utils.logger import HTTPLogger

class HttpClient:
    """HTTP客户端封装类(支持变量提取)"""

    def __init__(self, base_url: str = ""):
        """
        初始化HTTP客户端

        参数:
            base_url: API基础URL
        """
        self.base_url = base_url
        self.session = requests.Session()
        self.extractor = Extractor()
        self.logger = HTTPLogger()

    def request(self, method: str, url: str, extract: Optional[Dict[str, str]] = None,
                **kwargs) -> requests.Response:
        """
        发送HTTP请求

        参数:
            method: HTTP方法
            url: 请求URL
            extract: 变量提取配置
            **kwargs: 其他requests参数

        返回:
            Response对象
        """
        # 替换URL中的变量
        url = self.extractor.replace_variables(url)

        # 替换请求参数中的变量
        kwargs = self._replace_kwargs_variables(kwargs)

        # 构建完整URL
        full_url = f"{self.base_url}{url}" if url.startswith("/") else url

        # 记录请求
        self.logger.log_request(method, full_url, **kwargs)

        # 发送请求
        response = self.session.request(method, full_url, **kwargs)

        # 记录响应
        self.logger.log_response(response)

        # 提取变量
        if extract:
            self.extractor.extract(response, extract)

        return response

    def _replace_kwargs_variables(self, kwargs: Dict) -> Dict:
        """替换kwargs中的变量"""
        result = {}
        for key, value in kwargs.items():
            result[key] = self.extractor.replace_variables(value)
        return result

    def get(self, url: str, extract: Optional[Dict[str, str]] = None, **kwargs):
        """GET请求"""
        return self.request("GET", url, extract=extract, **kwargs)

    def post(self, url: str, extract: Optional[Dict[str, str]] = None, **kwargs):
        """POST请求"""
        return self.request("POST", url, extract=extract, **kwargs)

    def put(self, url: str, extract: Optional[Dict[str, str]] = None, **kwargs):
        """PUT请求"""
        return self.request("PUT", url, extract=extract, **kwargs)

    def delete(self, url: str, extract: Optional[Dict[str, str]] = None, **kwargs):
        """DELETE请求"""
        return self.request("DELETE", url, extract=extract, **kwargs)

4.5 使用示例

YAML用例示例:

# test_login.yaml
name: 用户登录
request:
  method: POST
  url: /api/login
  json:
    username: admin
    password: "123456"
extract:
  token: body.data.token
  user_id: body.data.user_id
# test_user_info.yaml
name: 获取用户信息
request:
  method: GET
  url: /api/user/info
  headers:
    Authorization: Bearer ${token}      # 自动使用登录时提取的token
  params:
    user_id: ${user_id}                  # 自动使用登录时提取的user_id

测试代码示例:

import pytest
from utils.yaml_loader import YamlLoader
from clients.http_client import HttpClient
from utils.extractor import VariableContext

class TestUserFlow:
    """用户流程测试"""

    def setup_method(self):
        """每个测试方法执行前的初始化"""
        self.loader = YamlLoader()
        self.client = HttpClient(base_url="https://api.example.com")
        # 清空变量上下文(可选,根据需求决定)
        # VariableContext().clear()

    def test_login_and_get_user_info(self):
        """测试登录后获取用户信息"""
        # 1. 执行登录用例
        login_case = self.loader.load_file("test_login.yaml")
        login_request = login_case["request"]

        login_response = self.client.request(
            method=login_request["method"],
            url=login_request["url"],
            json=login_request.get("json"),
            extract=login_case.get("extract")  # 提取token和user_id
        )

        # 2. 执行获取用户信息用例(自动使用提取的变量)
        user_info_case = self.loader.load_file("test_user_info.yaml")
        user_info_request = user_info_case["request"]

        user_info_response = self.client.request(
            method=user_info_request["method"],
            url=user_info_request["url"],
            headers=user_info_request.get("headers"),  # 这里的${token}会自动替换
            params=user_info_request.get("params")      # 这里的${user_id}会自动替换
        )

        # 3. 验证响应
        assert user_info_response.status_code == 200
        assert user_info_response.json()["code"] == 0

4.6 高级特性

4.6.1 支持嵌套变量

name: 创建订单
request:
  method: POST
  url: /api/order/create
  json:
    user_id: ${user_id}
    product_id: ${product_id}
extract:
  order_id: body.data.order_id
name: 支付订单
request:
  method: POST
  url: /api/order/pay
  json:
    order_id: ${order_id}        # 使用创建订单时提取的order_id
    user_id: ${user_id}          # 使用之前提取的user_id

4.6.2 支持默认值

name: 获取用户信息
request:
  method: GET
  url: /api/user/info
  headers:
    Authorization: Bearer ${token:default_token}  # 如果token不存在,使用default_token

4.6.3 支持表达式

name: 创建用户
request:
  method: POST
  url: /api/user/create
  json:
    username: ${random_string(10)}
    email: ${random_email()}
    timestamp: ${timestamp()}

5. 主题4:标准化流程封装

5.1 什么是标准化流程

标准化流程是指将测试用例的执行流程统一化,每个用例都按照相同的步骤执行:

  1. 加载用例
  2. 替换变量
  3. 发送请求
  4. 提取变量
  5. 执行断言
  6. 记录日志

5.2 为什么需要标准化流程

不标准化的方式(每个用例写法不同):

def test_login():
    # 方式1:直接写代码
    response = requests.post("/api/login", json={"username": "admin"})
    assert response.status_code == 200

def test_user_info():
    # 方式2:使用封装的方法
    response = client.get("/api/user/info")
    assert response.json()["code"] == 0

def test_create_user():
    # 方式3:又是另一种写法
    data = {"username": "test"}
    result = api.create_user(data)
    assert result["success"] == True

标准化后的方式(统一流程):

def test_login():
    # 所有用例都使用相同的流程
    case = loader.load("test_login.yaml")
    result = executor.execute(case)
    assert result.success == True

5.3 用例执行器实现

创建 utils/case_executor.py

"""
用例执行器
统一执行YAML测试用例的流程
"""
from typing import Dict, Any, Optional
from clients.http_client import HttpClient
from utils.yaml_loader import YamlLoader
from utils.extractor import Extractor
from utils.assertion import Assertion
from utils.logger import HTTPLogger

class CaseExecutor:
    """用例执行器"""

    def __init__(self, base_url: str = ""):
        """
        初始化用例执行器

        参数:
            base_url: API基础URL
        """
        self.client = HttpClient(base_url=base_url)
        self.loader = YamlLoader()
        self.extractor = Extractor()
        self.assertion = Assertion()
        self.logger = HTTPLogger()

    def execute(self, case: Dict[str, Any]) -> Dict[str, Any]:
        """
        执行单个测试用例

        参数:
            case: 用例字典(可以是YAML加载的结果)

        返回:
            执行结果字典

        执行流程:
            1. 加载用例(如果case是文件路径)
            2. 验证用例格式
            3. 替换变量
            4. 发送请求
            5. 提取变量
            6. 执行断言
            7. 返回结果
        """
        # 1. 如果case是字符串(文件路径),先加载
        if isinstance(case, str):
            case = self.loader.load_file(case)

        # 2. 验证用例格式
        self.loader.validate_case(case)

        # 3. 获取请求配置
        request_config = case["request"]
        method = request_config["method"].upper()
        url = request_config["url"]

        # 4. 准备请求参数
        kwargs = {}

        # 处理headers
        if "headers" in request_config:
            kwargs["headers"] = self.extractor.replace_variables(
                request_config["headers"]
            )

        # 处理params
        if "params" in request_config:
            kwargs["params"] = self.extractor.replace_variables(
                request_config["params"]
            )

        # 处理json
        if "json" in request_config:
            kwargs["json"] = self.extractor.replace_variables(
                request_config["json"]
            )

        # 处理data(表单数据)
        if "data" in request_config:
            kwargs["data"] = self.extractor.replace_variables(
                request_config["data"]
            )

        # 处理files(文件上传)
        if "files" in request_config:
            kwargs["files"] = self._prepare_files(
                request_config["files"]
            )

        # 5. 发送请求
        try:
            response = self.client.request(
                method=method,
                url=url,
                extract=case.get("extract"),
                **kwargs
            )
        except Exception as e:
            self.logger.error(f"请求失败: {e}")
            return {
                "success": False,
                "error": str(e),
                "case_name": case.get("name", "unknown")
            }

        # 6. 执行断言
        assertion_results = []
        if "validate" in case:
            for assertion_config in case["validate"]:
                try:
                    result = self.assertion.assert_response(
                        response,
                        assertion_config
                    )
                    assertion_results.append(result)
                except AssertionError as e:
                    assertion_results.append({
                        "success": False,
                        "error": str(e)
                    })

        # 7. 返回结果
        all_passed = all(
            r.get("success", False) for r in assertion_results
        ) if assertion_results else True

        return {
            "success": all_passed,
            "case_name": case.get("name", "unknown"),
            "response": response,
            "assertions": assertion_results,
            "extracted_vars": self.extractor.context.get_all()
        }

    def _prepare_files(self, files_config: Dict[str, str]) -> Dict[str, Any]:
        """
        准备文件上传参数

        参数:
            files_config: 文件配置,格式:{字段名: 文件路径}

        返回:
            文件字典
        """
        from pathlib import Path

        files = {}
        for field_name, file_path in files_config.items():
            # 替换变量
            file_path = self.extractor.replace_variables(file_path)

            # 如果是相对路径,转换为绝对路径
            if not Path(file_path).is_absolute():
                base_dir = Path(__file__).parent.parent
                file_path = base_dir / file_path

            # 检查文件是否存在
            if not Path(file_path).exists():
                raise FileNotFoundError(f"文件不存在: {file_path}")

            # 打开文件
            files[field_name] = open(file_path, "rb")

        return files

    def execute_batch(self, cases: list) -> list:
        """
        批量执行用例

        参数:
            cases: 用例列表(可以是文件路径列表或用例字典列表)

        返回:
            执行结果列表
        """
        results = []
        for case in cases:
            result = self.execute(case)
            results.append(result)
        return results

5.4 在pytest中使用标准化流程

创建 tests/test_yaml_cases.py

"""
YAML用例测试文件
使用标准化流程执行YAML用例
"""
import pytest
from utils.case_executor import CaseExecutor
from config.config import Config

class TestYamlCases:
    """YAML用例测试类"""

    @pytest.fixture(scope="class")
    def executor(self):
        """创建用例执行器"""
        config = Config()
        return CaseExecutor(base_url=config.base_url)

    @pytest.fixture(scope="function", autouse=True)
    def setup(self):
        """每个测试方法执行前的初始化"""
        # 清空变量上下文(可选)
        from utils.extractor import VariableContext
        VariableContext().clear()

    def test_login(self, executor):
        """测试登录用例"""
        result = executor.execute("test_login.yaml")
        assert result["success"] == True

    def test_user_info(self, executor):
        """测试获取用户信息用例"""
        result = executor.execute("test_user_info.yaml")
        assert result["success"] == True

    def test_all_cases(self, executor):
        """批量执行所有用例"""
        from utils.yaml_loader import YamlLoader

        loader = YamlLoader()
        cases = loader.load_dir()

        results = executor.execute_batch(cases)

        # 统计结果
        passed = sum(1 for r in results if r["success"])
        failed = len(results) - passed

        print(f"n执行结果: 通过 {passed}, 失败 {failed}")

        # 所有用例都应该通过
        assert failed == 0

5.5 使用pytest参数化实现数据驱动

import pytest
from utils.case_executor import CaseExecutor
from utils.yaml_loader import YamlLoader

class TestDataDriven:
    """数据驱动测试"""

    @pytest.fixture(scope="class")
    def executor(self):
        config = Config()
        return CaseExecutor(base_url=config.base_url)

    @pytest.mark.parametrize("case_file", [
        "test_login.yaml",
        "test_user_info.yaml",
        "test_logout.yaml"
    ])
    def test_cases(self, executor, case_file):
        """参数化执行用例"""
        result = executor.execute(case_file)
        assert result["success"] == True

6. 主题5:热加载封装

6.1 什么是热加载

热加载(Hot Reload)是指在不重启程序的情况下,动态加载更新的配置或代码。在测试框架中,热加载可以让我们:

  • 修改YAML用例后立即生效,无需重启测试
  • 修改配置文件后立即生效
  • 提高开发和调试效率

6.2 为什么需要热加载

没有热加载的问题:

# 修改了YAML用例后,需要重启测试才能生效
def test_login():
    case = loader.load_file("test_login.yaml")  # 用例已经加载到内存
    # 如果修改了YAML文件,需要重启Python进程才能看到变化

有热加载的优势:

# 修改了YAML用例后,自动重新加载
def test_login():
    case = loader.load_file("test_login.yaml", reload=True)  # 每次都重新加载
    # 修改YAML文件后,下次执行会自动使用新内容

6.3 文件监控实现

创建 utils/file_watcher.py

"""
文件监控工具
监控文件变化,实现热加载功能
"""
import os
import time
from pathlib import Path
from typing import Dict, Callable, Optional
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class FileWatcher:
    """文件监控器"""

    def __init__(self):
        self.observers = {}
        self.file_cache = {}  # 文件内容缓存
        self.file_mtimes = {}  # 文件修改时间缓存

    def watch_file(self, file_path: str, callback: Callable):
        """
        监控单个文件

        参数:
            file_path: 文件路径
            callback: 文件变化时的回调函数
        """
        file_path = Path(file_path).resolve()

        if not file_path.exists():
            raise FileNotFoundError(f"文件不存在: {file_path}")

        # 记录初始修改时间
        self.file_mtimes[str(file_path)] = file_path.stat().st_mtime

        # 创建观察者
        observer = Observer()
        observer.schedule(
            FileChangeHandler(file_path, callback),
            str(file_path.parent),
            recursive=False
        )
        observer.start()

        self.observers[str(file_path)] = observer

    def watch_directory(self, dir_path: str, callback: Callable, pattern: str = "*.yaml"):
        """
        监控目录下的文件

        参数:
            dir_path: 目录路径
            callback: 文件变化时的回调函数
            pattern: 文件匹配模式
        """
        dir_path = Path(dir_path).resolve()

        if not dir_path.exists():
            raise FileNotFoundError(f"目录不存在: {dir_path}")

        # 创建观察者
        observer = Observer()
        observer.schedule(
            DirectoryChangeHandler(dir_path, callback, pattern),
            str(dir_path),
            recursive=True
        )
        observer.start()

        self.observers[str(dir_path)] = observer

    def stop_all(self):
        """停止所有监控"""
        for observer in self.observers.values():
            observer.stop()
            observer.join()
        self.observers.clear()

    def is_file_changed(self, file_path: str) -> bool:
        """
        检查文件是否发生变化

        参数:
            file_path: 文件路径

        返回:
            是否发生变化
        """
        file_path = Path(file_path).resolve()
        file_str = str(file_path)

        if not file_path.exists():
            return False

        current_mtime = file_path.stat().st_mtime

        if file_str not in self.file_mtimes:
            self.file_mtimes[file_str] = current_mtime
            return True

        if current_mtime != self.file_mtimes[file_str]:
            self.file_mtimes[file_str] = current_mtime
            return True

        return False

    def clear_cache(self, file_path: Optional[str] = None):
        """
        清空缓存

        参数:
            file_path: 文件路径(如果指定,只清空该文件的缓存)
        """
        if file_path:
            file_str = str(Path(file_path).resolve())
            self.file_cache.pop(file_str, None)
            self.file_mtimes.pop(file_str, None)
        else:
            self.file_cache.clear()
            self.file_mtimes.clear()

class FileChangeHandler(FileSystemEventHandler):
    """文件变化处理器"""

    def __init__(self, target_file: Path, callback: Callable):
        self.target_file = target_file.resolve()
        self.callback = callback

    def on_modified(self, event):
        if Path(event.src_path).resolve() == self.target_file:
            self.callback(event.src_path)

class DirectoryChangeHandler(FileSystemEventHandler):
    """目录变化处理器"""

    def __init__(self, target_dir: Path, callback: Callable, pattern: str):
        self.target_dir = target_dir.resolve()
        self.callback = callback
        self.pattern = pattern

    def on_modified(self, event):
        if event.is_directory:
            return

        file_path = Path(event.src_path)
        if file_path.match(self.pattern):
            self.callback(event.src_path)

注意: 需要安装watchdog库:

pip install watchdog

6.4 热加载YAML加载器

修改 utils/yaml_loader.py,添加热加载功能:

"""
YAML测试用例加载器(支持热加载)
"""
import yaml
import os
from pathlib import Path
from typing import Dict, List, Any, Optional
from utils.file_watcher import FileWatcher

class YamlLoader:
    """YAML用例加载器(支持热加载)"""

    def __init__(self, base_dir: Optional[str] = None, enable_hot_reload: bool = True):
        """
        初始化YAML加载器

        参数:
            base_dir: 用例文件的基础目录
            enable_hot_reload: 是否启用热加载
        """
        if base_dir:
            self.base_dir = Path(base_dir)
        else:
            self.base_dir = Path(__file__).parent.parent / "tests" / "test_cases"

        self.enable_hot_reload = enable_hot_reload
        self.file_cache = {}  # 文件内容缓存
        self.file_watcher = FileWatcher() if enable_hot_reload else None

    def load_file(self, file_path: str, reload: bool = False) -> Dict[str, Any]:
        """
        加载单个YAML文件(支持热加载)

        参数:
            file_path: YAML文件路径
            reload: 是否强制重新加载(忽略缓存)

        返回:
            解析后的字典
        """
        # 如果是相对路径,使用base_dir
        if not os.path.isabs(file_path):
            file_path = self.base_dir / file_path

        file_path = Path(file_path).resolve()
        file_str = str(file_path)

        if not file_path.exists():
            raise FileNotFoundError(f"用例文件不存在: {file_path}")

        # 检查是否需要重新加载
        if reload or not self.enable_hot_reload:
            # 强制重新加载
            return self._load_file_content(file_path)

        # 检查文件是否发生变化
        if self.file_watcher and self.file_watcher.is_file_changed(file_str):
            # 文件已变化,清除缓存并重新加载
            self.file_cache.pop(file_str, None)

        # 从缓存加载或重新加载
        if file_str in self.file_cache:
            return self.file_cache[file_str]

        # 加载文件内容
        content = self._load_file_content(file_path)

        # 缓存内容
        if self.enable_hot_reload:
            self.file_cache[file_str] = content

        return content

    def _load_file_content(self, file_path: Path) -> Dict[str, Any]:
        """加载文件内容(内部方法)"""
        with open(file_path, "r", encoding="utf-8") as f:
            content = yaml.safe_load(f)
        return content

    def load_dir(self, dir_path: Optional[str] = None, reload: bool = False) -> List[Dict[str, Any]]:
        """
        加载目录下所有YAML文件(支持热加载)

        参数:
            dir_path: 目录路径
            reload: 是否强制重新加载
        """
        if dir_path:
            target_dir = Path(dir_path)
        else:
            target_dir = self.base_dir

        if not target_dir.exists():
            raise FileNotFoundError(f"用例目录不存在: {target_dir}")

        cases = []
        for file_path in target_dir.glob("*.yaml"):
            try:
                case = self.load_file(file_path, reload=reload)
                case["_file_path"] = str(file_path)
                cases.append(case)
            except Exception as e:
                print(f"加载用例文件失败 {file_path}: {e}")

        for file_path in target_dir.glob("*.yml"):
            try:
                case = self.load_file(file_path, reload=reload)
                case["_file_path"] = str(file_path)
                cases.append(case)
            except Exception as e:
                print(f"加载用例文件失败 {file_path}: {e}")

        return cases

6.5 使用示例

from utils.yaml_loader import YamlLoader

# 启用热加载
loader = YamlLoader(enable_hot_reload=True)

# 第一次加载
case1 = loader.load_file("test_login.yaml")
print("第一次加载:", case1)

# 修改test_login.yaml文件后...

# 第二次加载(自动检测变化并重新加载)
case2 = loader.load_file("test_login.yaml")
print("第二次加载(已更新):", case2)

# 强制重新加载
case3 = loader.load_file("test_login.yaml", reload=True)

7. 主题6:统一断言封装

7.1 什么是统一断言

统一断言是指将各种断言逻辑封装成统一的方法,让断言更加简洁和易用。

7.2 为什么需要统一断言

不统一的断言方式:

# 方式1:直接使用assert
assert response.status_code == 200
assert response.json()["code"] == 0

# 方式2:使用pytest的assert
assert response.status_code == 200, "状态码不正确"

# 方式3:自定义断言函数
def assert_status_code(response, expected):
    assert response.status_code == expected

统一断言的优势:

# 统一的方式
assertion.assert_status_code(response, 200)
assertion.assert_json_path(response, "code", 0)
assertion.assert_contains(response, "success")

7.3 断言器实现

创建 utils/assertion.py

"""
统一断言工具
封装各种断言方法
"""
import jsonpath
from typing import Any, Dict, List, Optional
from requests import Response

class Assertion:
    """断言器"""

    def assert_status_code(self, response: Response, expected: int):
        """
        断言状态码

        参数:
            response: Response对象
            expected: 期望的状态码

        示例:
            assertion.assert_status_code(response, 200)
        """
        actual = response.status_code
        assert actual == expected, f"状态码断言失败: 期望 {expected}, 实际 {actual}"

    def assert_json_path(self, response: Response, path: str, expected: Any):
        """
        断言JSON路径的值

        参数:
            response: Response对象
            path: JSON路径(支持jsonpath表达式或点号分隔路径)
            expected: 期望的值

        示例:
            assertion.assert_json_path(response, "body.code", 0)
            assertion.assert_json_path(response, "$.data.token", "abc123")
        """
        try:
            response_json = response.json()
        except:
            raise AssertionError("响应不是JSON格式")

        # 提取值
        value = self._extract_value(response_json, path)

        assert value == expected, f"JSON路径断言失败: {path} 期望 {expected}, 实际 {value}"

    def assert_contains(self, response: Response, expected: Any, field: Optional[str] = None):
        """
        断言包含某个值

        参数:
            response: Response对象
            expected: 期望包含的值
            field: 字段名(如果指定,只在该字段中查找)

        示例:
            assertion.assert_contains(response, "success")
            assertion.assert_contains(response, "token", field="body.data")
        """
        if field:
            try:
                response_json = response.json()
                data = self._extract_value(response_json, field)
            except:
                raise AssertionError(f"无法提取字段: {field}")
        else:
            data = response.text

        assert expected in str(data), f"包含断言失败: 期望包含 {expected}"

    def assert_type(self, response: Response, path: str, expected_type: type):
        """
        断言类型

        参数:
            response: Response对象
            path: JSON路径
            expected_type: 期望的类型

        示例:
            assertion.assert_type(response, "body.data.token", str)
        """
        try:
            response_json = response.json()
        except:
            raise AssertionError("响应不是JSON格式")

        value = self._extract_value(response_json, path)
        actual_type = type(value)

        assert isinstance(value, expected_type), 
            f"类型断言失败: {path} 期望类型 {expected_type.__name__}, 实际类型 {actual_type.__name__}"

    def assert_length(self, response: Response, path: str, expected_length: int):
        """
        断言长度

        参数:
            response: Response对象
            path: JSON路径(应该指向列表或字符串)
            expected_length: 期望的长度

        示例:
            assertion.assert_length(response, "body.data.items", 10)
        """
        try:
            response_json = response.json()
        except:
            raise AssertionError("响应不是JSON格式")

        value = self._extract_value(response_json, path)

        if not isinstance(value, (list, str, dict)):
            raise AssertionError(f"{path} 不是列表、字符串或字典类型")

        actual_length = len(value)
        assert actual_length == expected_length, 
            f"长度断言失败: {path} 期望长度 {expected_length}, 实际长度 {actual_length}"

    def assert_regex(self, response: Response, path: str, pattern: str):
        """
        断言正则表达式匹配

        参数:
            response: Response对象
            path: JSON路径或"text"(表示响应文本)
            pattern: 正则表达式模式

        示例:
            assertion.assert_regex(response, "body.data.email", r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,}$")
        """
        import re

        if path == "text":
            value = response.text
        else:
            try:
                response_json = response.json()
                value = str(self._extract_value(response_json, path))
            except:
                raise AssertionError("响应不是JSON格式")

        assert re.search(pattern, value), 
            f"正则表达式断言失败: {path} 不匹配模式 {pattern}"

    def assert_response(self, response: Response, assertion_config: Dict[str, Any]) -> Dict[str, Any]:
        """
        根据配置执行断言(用于YAML用例)

        参数:
            response: Response对象
            assertion_config: 断言配置

        支持的配置格式:
            - eq: [path, expected]          # 等于
            - ne: [path, expected]          # 不等于
            - gt: [path, expected]          # 大于
            - ge: [path, expected]          # 大于等于
            - lt: [path, expected]          # 小于
            - le: [path, expected]          # 小于等于
            - contains: [path, expected]     # 包含
            - type: [path, type_name]       # 类型
            - length: [path, length]        # 长度
            - regex: [path, pattern]        # 正则表达式

        返回:
            断言结果字典
        """
        assertion_type = list(assertion_config.keys())[0]
        args = assertion_config[assertion_type]

        try:
            if assertion_type == "eq":
                self.assert_json_path(response, args[0], args[1])
            elif assertion_type == "ne":
                value = self._extract_value_from_response(response, args[0])
                assert value != args[1], f"不等于断言失败: {args[0]} 等于 {args[1]}"
            elif assertion_type == "gt":
                value = self._extract_value_from_response(response, args[0])
                assert value > args[1], f"大于断言失败: {args[0]} ({value}) 不大于 {args[1]}"
            elif assertion_type == "ge":
                value = self._extract_value_from_response(response, args[0])
                assert value >= args[1], f"大于等于断言失败: {args[0]} ({value}) 不大于等于 {args[1]}"
            elif assertion_type == "lt":
                value = self._extract_value_from_response(response, args[0])
                assert value < args[1], f"小于断言失败: {args[0]} ({value}) 不小于 {args[1]}"
            elif assertion_type == "le":
                value = self._extract_value_from_response(response, args[0])
                assert value <= args[1], f"小于等于断言失败: {args[0]} ({value}) 不小于等于 {args[1]}"
            elif assertion_type == "contains":
                self.assert_contains(response, args[1], field=args[0] if len(args) > 2 else None)
            elif assertion_type == "type":
                type_map = {
                    "str": str,
                    "int": int,
                    "float": float,
                    "bool": bool,
                    "list": list,
                    "dict": dict
                }
                expected_type = type_map.get(args[1], str)
                self.assert_type(response, args[0], expected_type)
            elif assertion_type == "length":
                self.assert_length(response, args[0], args[1])
            elif assertion_type == "regex":
                self.assert_regex(response, args[0], args[1])
            else:
                raise ValueError(f"不支持的断言类型: {assertion_type}")

            return {"success": True, "type": assertion_type}

        except AssertionError as e:
            return {"success": False, "type": assertion_type, "error": str(e)}

    def _extract_value(self, data: Dict, path: str) -> Any:
        """从数据中提取值(内部方法)"""
        # 处理jsonpath表达式
        if path.startswith("$"):
            result = jsonpath.jsonpath(data, path)
            if result:
                return result[0] if len(result) == 1 else result
            raise ValueError(f"无法提取路径: {path}")

        # 处理点号分隔的路径
        parts = path.split(".")
        current = data

        for part in parts:
            if isinstance(current, dict) and part in current:
                current = current[part]
            else:
                raise ValueError(f"无法提取路径: {path} (在 {part} 处失败)")

        return current

    def _extract_value_from_response(self, response: Response, path: str) -> Any:
        """从响应中提取值(内部方法)"""
        if path == "status_code":
            return response.status_code

        try:
            response_json = response.json()
            return self._extract_value(response_json, path)
        except:
            raise AssertionError("响应不是JSON格式")

7.4 YAML用例中的断言配置

name: 用户登录测试
request:
  method: POST
  url: /api/login
  json:
    username: admin
    password: "123456"
validate:
  - eq: [status_code, 200]                    # 状态码等于200
  - eq: [body.code, 0]                        # body.code等于0
  - eq: [body.message, "登录成功"]            # body.message等于"登录成功"
  - ne: [body.data.token, ""]                 # body.data.token不等于空字符串
  - type: [body.data.token, str]              # body.data.token是字符串类型
  - length: [body.data.permissions, 3]         # body.data.permissions长度为3
  - contains: [body.message, "成功"]          # body.message包含"成功"
  - regex: [body.data.email, "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"]  # 邮箱格式验证

7.5 使用示例

from utils.assertion import Assertion
import requests

# 创建断言器
assertion = Assertion()

# 发送请求
response = requests.post("https://api.example.com/login", json={
    "username": "admin",
    "password": "123456"
})

# 执行各种断言
assertion.assert_status_code(response, 200)
assertion.assert_json_path(response, "body.code", 0)
assertion.assert_type(response, "body.data.token", str)
assertion.assert_contains(response, "success")

8. 主题7:数据库断言封装

8.1 为什么需要数据库断言

在接口自动化测试中,有时候需要验证接口操作是否正确地影响了数据库。例如:

  • 创建用户接口:需要验证数据库中是否真的创建了用户记录
  • 删除订单接口:需要验证数据库中是否真的删除了订单记录
  • 更新商品接口:需要验证数据库中商品信息是否真的被更新

8.2 数据库辅助工具实现

创建 utils/db_helper.py

"""
数据库辅助工具
用于执行数据库查询和断言
"""
import pymysql
from typing import Dict, List, Any, Optional, Tuple
from contextlib import contextmanager

class DBHelper:
    """数据库辅助类"""

    def __init__(self, host: str, port: int, user: str, password: str, database: str):
        """
        初始化数据库连接

        参数:
            host: 数据库主机
            port: 数据库端口
            user: 用户名
            password: 密码
            database: 数据库名
        """
        self.config = {
            "host": host,
            "port": port,
            "user": user,
            "password": password,
            "database": database,
            "charset": "utf8mb4",
            "cursorclass": pymysql.cursors.DictCursor
        }

    @contextmanager
    def get_connection(self):
        """
        获取数据库连接(上下文管理器)

        使用示例:
            with db_helper.get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute("SELECT * FROM users WHERE id = %s", (1,))
                result = cursor.fetchone()
        """
        conn = pymysql.connect(**self.config)
        try:
            yield conn
        finally:
            conn.close()

    def query_one(self, sql: str, params: Optional[Tuple] = None) -> Optional[Dict[str, Any]]:
        """
        查询单条记录

        参数:
            sql: SQL语句
            params: SQL参数

        返回:
            查询结果(字典)或None

        示例:
            user = db_helper.query_one("SELECT * FROM users WHERE id = %s", (1,))
        """
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, params)
            return cursor.fetchone()

    def query_all(self, sql: str, params: Optional[Tuple] = None) -> List[Dict[str, Any]]:
        """
        查询多条记录

        参数:
            sql: SQL语句
            params: SQL参数

        返回:
            查询结果列表

        示例:
            users = db_helper.query_all("SELECT * FROM users WHERE status = %s", (1,))
        """
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, params)
            return cursor.fetchall()

    def execute(self, sql: str, params: Optional[Tuple] = None) -> int:
        """
        执行SQL语句(INSERT、UPDATE、DELETE)

        参数:
            sql: SQL语句
            params: SQL参数

        返回:
            影响的行数

        示例:
            rows = db_helper.execute("UPDATE users SET status = %s WHERE id = %s", (1, 100))
        """
        with self.get_connection() as conn:
            cursor = conn.cursor()
            rows = cursor.execute(sql, params)
            conn.commit()
            return rows

    def assert_record_exists(self, table: str, conditions: Dict[str, Any], 
                            message: Optional[str] = None):
        """
        断言记录存在

        参数:
            table: 表名
            conditions: 查询条件(字典)
            message: 错误消息

        示例:
            db_helper.assert_record_exists(
                "users",
                {"id": 1, "status": 1},
                "用户不存在或状态不正确"
            )
        """
        where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
        sql = f"SELECT * FROM {table} WHERE {where_clause}"
        params = tuple(conditions.values())

        result = self.query_one(sql, params)

        error_msg = message or f"记录不存在: {table} WHERE {conditions}"
        assert result is not None, error_msg

    def assert_record_not_exists(self, table: str, conditions: Dict[str, Any],
                                 message: Optional[str] = None):
        """
        断言记录不存在

        参数:
            table: 表名
            conditions: 查询条件
            message: 错误消息

        示例:
            db_helper.assert_record_not_exists(
                "users",
                {"id": 999},
                "用户应该不存在"
            )
        """
        where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
        sql = f"SELECT * FROM {table} WHERE {where_clause}"
        params = tuple(conditions.values())

        result = self.query_one(sql, params)

        error_msg = message or f"记录应该不存在: {table} WHERE {conditions}"
        assert result is None, error_msg

    def assert_field_value(self, table: str, field: str, value: Any,
                          conditions: Dict[str, Any], message: Optional[str] = None):
        """
        断言字段值

        参数:
            table: 表名
            field: 字段名
            value: 期望的值
            conditions: 查询条件
            message: 错误消息

        示例:
            db_helper.assert_field_value(
                "users",
                "status",
                1,
                {"id": 1},
                "用户状态不正确"
            )
        """
        where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
        sql = f"SELECT {field} FROM {table} WHERE {where_clause}"
        params = tuple(conditions.values())

        result = self.query_one(sql, params)

        error_msg = message or f"字段值不正确: {table}.{field} WHERE {conditions}"
        assert result is not None, f"记录不存在: {error_msg}"

        actual_value = result[field]
        assert actual_value == value, 
            f"{error_msg} 期望 {value}, 实际 {actual_value}"

    def assert_count(self, table: str, expected_count: int,
                     conditions: Optional[Dict[str, Any]] = None,
                     message: Optional[str] = None):
        """
        断言记录数量

        参数:
            table: 表名
            expected_count: 期望的数量
            conditions: 查询条件(可选)
            message: 错误消息

        示例:
            db_helper.assert_count("users", 10, {"status": 1}, "活跃用户数量不正确")
        """
        if conditions:
            where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
            sql = f"SELECT COUNT(*) as count FROM {table} WHERE {where_clause}"
            params = tuple(conditions.values())
        else:
            sql = f"SELECT COUNT(*) as count FROM {table}"
            params = None

        result = self.query_one(sql, params)
        actual_count = result["count"]

        error_msg = message or f"记录数量不正确: {table}"
        assert actual_count == expected_count, 
            f"{error_msg} 期望 {expected_count}, 实际 {actual_count}"

8.3 在YAML用例中使用数据库断言

name: 创建用户测试
request:
  method: POST
  url: /api/user/create
  json:
    username: test_user
    email: test@example.com
validate:
  - eq: [status_code, 200]
  - eq: [body.code, 0]
extract:
  user_id: body.data.user_id
db_assertions:
  - type: exists                              # 断言记录存在
    table: users
    conditions:
      id: ${user_id}
      username: test_user
  - type: field_value                         # 断言字段值
    table: users
    field: email
    value: test@example.com
    conditions:
      id: ${user_id}
  - type: count                               # 断言记录数量
    table: users
    expected_count: 1
    conditions:
      username: test_user

8.4 在用例执行器中集成数据库断言

修改 utils/case_executor.py,添加数据库断言支持:

# 在CaseExecutor类中添加数据库断言方法

def execute(self, case: Dict[str, Any]) -> Dict[str, Any]:
    # ... 前面的代码 ...

    # 执行数据库断言
    db_assertion_results = []
    if "db_assertions" in case:
        from config.config import Config
        from utils.db_helper import DBHelper

        config = Config()
        db_config = config.db_config

        db_helper = DBHelper(
            host=db_config["host"],
            port=db_config["port"],
            user=db_config["username"],
            password=db_config["password"],
            database=db_config["database"]
        )

        # 替换变量
        db_assertions = self.extractor.replace_variables(case["db_assertions"])

        for db_assertion in db_assertions:
            try:
                assertion_type = db_assertion["type"]

                if assertion_type == "exists":
                    db_helper.assert_record_exists(
                        db_assertion["table"],
                        db_assertion["conditions"]
                    )
                    db_assertion_results.append({"success": True, "type": assertion_type})

                elif assertion_type == "not_exists":
                    db_helper.assert_record_not_exists(
                        db_assertion["table"],
                        db_assertion["conditions"]
                    )
                    db_assertion_results.append({"success": True, "type": assertion_type})

                elif assertion_type == "field_value":
                    db_helper.assert_field_value(
                        db_assertion["table"],
                        db_assertion["field"],
                        db_assertion["value"],
                        db_assertion["conditions"]
                    )
                    db_assertion_results.append({"success": True, "type": assertion_type})

                elif assertion_type == "count":
                    db_helper.assert_count(
                        db_assertion["table"],
                        db_assertion["expected_count"],
                        db_assertion.get("conditions")
                    )
                    db_assertion_results.append({"success": True, "type": assertion_type})

            except AssertionError as e:
                db_assertion_results.append({
                    "success": False,
                    "type": assertion_type,
                    "error": str(e)
                })

    # 更新返回结果
    all_db_passed = all(
        r.get("success", False) for r in db_assertion_results
    ) if db_assertion_results else True

    return {
        "success": all_passed and all_passed,  # 结合HTTP断言和数据库断言
        "case_name": case.get("name", "unknown"),
        "response": response,
        "assertions": assertion_results,
        "db_assertions": db_assertion_results,
        "extracted_vars": self.extractor.context.get_all()
    }

8.5 使用示例

from utils.db_helper import DBHelper
from config.config import Config

# 创建数据库辅助对象
config = Config()
db_config = config.db_config

db_helper = DBHelper(
    host=db_config["host"],
    port=db_config["port"],
    user=db_config["username"],
    password=db_config["password"],
    database=db_config["database"]
)

# 断言用户存在
db_helper.assert_record_exists(
    "users",
    {"id": 1, "status": 1}
)

# 断言字段值
db_helper.assert_field_value(
    "users",
    "email",
    "admin@example.com",
    {"id": 1}
)

# 断言记录数量
db_helper.assert_count("users", 10, {"status": 1})

9. 主题8:Exception异常处理封装

9.1 为什么需要统一异常处理

在测试过程中,可能会遇到各种异常:

  • 网络异常(连接超时、DNS解析失败等)
  • 业务异常(接口返回错误码)
  • 数据异常(JSON解析失败、数据格式错误等)
  • 断言异常(断言失败)

统一的异常处理可以让错误信息更清晰,便于定位问题。

9.2 自定义异常类

创建 utils/exceptions.py

"""
自定义异常类
定义测试框架中使用的各种异常
"""

class TestFrameworkException(Exception):
    """测试框架基础异常"""
    pass

class CaseLoadException(TestFrameworkException):
    """用例加载异常"""
    pass

class RequestException(TestFrameworkException):
    """请求异常"""
    pass

class AssertionException(TestFrameworkException):
    """断言异常"""
    pass

class DatabaseException(TestFrameworkException):
    """数据库异常"""
    pass

class VariableException(TestFrameworkException):
    """变量异常"""
    pass

class ConfigException(TestFrameworkException):
    """配置异常"""
    pass

9.3 异常处理器实现

创建 utils/exception_handler.py

"""
异常处理器
统一处理测试过程中的异常
"""
import traceback
from typing import Optional, Callable
from utils.exceptions import TestFrameworkException
from utils.logger import HTTPLogger

class ExceptionHandler:
    """异常处理器"""

    def __init__(self, logger: Optional[HTTPLogger] = None):
        """
        初始化异常处理器

        参数:
            logger: 日志记录器
        """
        self.logger = logger or HTTPLogger()
        self.error_count = 0
        self.error_details = []

    def handle(self, exception: Exception, context: Optional[dict] = None) -> dict:
        """
        处理异常

        参数:
            exception: 异常对象
            context: 上下文信息(用例名、请求信息等)

        返回:
            错误信息字典
        """
        self.error_count += 1

        error_info = {
            "type": type(exception).__name__,
            "message": str(exception),
            "traceback": traceback.format_exc(),
            "context": context or {}
        }

        self.error_details.append(error_info)

        # 记录日志
        error_msg = f"异常发生: {error_info['type']} - {error_info['message']}"
        if context:
            error_msg += f"n上下文: {context}"

        self.logger.error(error_msg, exc_info=True)

        return error_info

    def wrap_function(self, func: Callable, context: Optional[dict] = None):
        """
        包装函数,自动处理异常

        参数:
            func: 要包装的函数
            context: 上下文信息

        返回:
            包装后的函数

        示例:
            handler = ExceptionHandler()
            safe_execute = handler.wrap_function(execute_case, {"case_name": "test_login"})
            result = safe_execute()
        """
        def wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except TestFrameworkException as e:
                return self.handle(e, context)
            except Exception as e:
                return self.handle(e, context)

        return wrapper

    def get_summary(self) -> dict:
        """
        获取异常统计摘要

        返回:
            统计信息字典
        """
        return {
            "total_errors": self.error_count,
            "error_details": self.error_details
        }

    def clear(self):
        """清空错误记录"""
        self.error_count = 0
        self.error_details.clear()

9.4 在用例执行器中使用异常处理

修改 utils/case_executor.py

# 在CaseExecutor类中添加异常处理

from utils.exception_handler import ExceptionHandler

class CaseExecutor:
    def __init__(self, base_url: str = ""):
        # ... 前面的代码 ...
        self.exception_handler = ExceptionHandler()

    def execute(self, case: Dict[str, Any]) -> Dict[str, Any]:
        """执行用例(带异常处理)"""
        context = {
            "case_name": case.get("name", "unknown"),
            "case_file": case.get("_file_path", "unknown")
        }

        try:
            # ... 执行用例的逻辑 ...
            return result

        except Exception as e:
            # 处理异常
            error_info = self.exception_handler.handle(e, context)
            return {
                "success": False,
                "error": error_info,
                "case_name": case.get("name", "unknown")
            }

9.5 使用示例

from utils.exception_handler import ExceptionHandler
from utils.case_executor import CaseExecutor

# 创建异常处理器
handler = ExceptionHandler()

# 包装函数
def execute_case():
    executor = CaseExecutor()
    return executor.execute("test_login.yaml")

safe_execute = handler.wrap_function(execute_case, {"case_name": "test_login"})
result = safe_execute()

# 获取异常统计
summary = handler.get_summary()
print(f"总错误数: {summary['total_errors']}")

10. 主题9:Allure企业级测试报告改进

10.1 Allure报告基础配置

10.1.1 安装Allure

# 安装allure-pytest插件
pip install allure-pytest

# 安装Allure命令行工具(见环境准备章节)

10.1.2 pytest.ini配置

创建 pytest.ini

[pytest]
# Allure报告配置
addopts = 
    --alluredir=reports/allure-results
    --clean-alluredir
    -v
    --tb=short

# 测试用例目录
testpaths = tests

# Python文件匹配模式
python_files = test_*.py

# 测试类匹配模式
python_classes = Test*

# 测试函数匹配模式
python_functions = test_*

10.2 Allure装饰器使用

10.2.1 基础装饰器

import pytest
import allure

@allure.epic("用户管理")
@allure.feature("用户登录")
@allure.story("正常登录")
@allure.severity(allure.severity_level.CRITICAL)
@allure.description("测试用户正常登录功能")
def test_login():
    """测试登录"""
    with allure.step("发送登录请求"):
        response = requests.post("/api/login", json={
            "username": "admin",
            "password": "123456"
        })

    with allure.step("验证响应"):
        assert response.status_code == 200
        assert response.json()["code"] == 0

10.2.2 动态添加步骤

import allure

def test_user_flow():
    with allure.step("步骤1: 用户登录"):
        login_response = login()
        allure.attach(
            login_response.text,
            "登录响应",
            allure.attachment_type.TEXT
        )

    with allure.step("步骤2: 获取用户信息"):
        user_info = get_user_info(login_response.json()["token"])
        allure.attach(
            str(user_info),
            "用户信息",
            allure.attachment_type.JSON
        )

10.3 在YAML用例中集成Allure

创建 utils/allure_helper.py

"""
Allure报告辅助工具
为YAML用例自动生成Allure报告
"""
import allure
from typing import Dict, Any
from requests import Response
import json

class AllureHelper:
    """Allure报告辅助类"""

    @staticmethod
    def add_case_info(case: Dict[str, Any]):
        """
        添加用例信息到Allure报告

        参数:
            case: 用例字典
        """
        # 添加标题
        if "name" in case:
            allure.dynamic.title(case["name"])

        # 添加描述
        if "description" in case:
            allure.dynamic.description(case["description"])

        # 添加标签
        if "tags" in case:
            for tag in case["tags"]:
                allure.dynamic.tag(tag)

        # 添加严重程度
        severity_map = {
            "critical": allure.severity_level.CRITICAL,
            "high": allure.severity_level.NORMAL,
            "medium": allure.severity_level.NORMAL,
            "low": allure.severity_level.MINOR,
            "trivial": allure.severity_level.TRIVIAL
        }
        if "severity" in case:
            severity = severity_map.get(case["severity"].lower(), allure.severity_level.NORMAL)
            allure.dynamic.severity(severity)

    @staticmethod
    def attach_request(request_config: Dict[str, Any]):
        """
        附加请求信息到Allure报告

        参数:
            request_config: 请求配置
        """
        with allure.step("请求信息"):
            request_info = {
                "method": request_config.get("method"),
                "url": request_config.get("url"),
                "headers": request_config.get("headers"),
                "params": request_config.get("params"),
                "json": request_config.get("json"),
                "data": request_config.get("data")
            }

            allure.attach(
                json.dumps(request_info, ensure_ascii=False, indent=2),
                "请求详情",
                allure.attachment_type.JSON
            )

    @staticmethod
    def attach_response(response: Response):
        """
        附加响应信息到Allure报告

        参数:
            response: Response对象
        """
        with allure.step("响应信息"):
            # 响应头
            allure.attach(
                json.dumps(dict(response.headers), ensure_ascii=False, indent=2),
                "响应头",
                allure.attachment_type.JSON
            )

            # 响应体
            try:
                response_json = response.json()
                allure.attach(
                    json.dumps(response_json, ensure_ascii=False, indent=2),
                    "响应体",
                    allure.attachment_type.JSON
                )
            except:
                allure.attach(
                    response.text,
                    "响应体",
                    allure.attachment_type.TEXT
                )

            # 状态码
            allure.attach(
                str(response.status_code),
                "状态码",
                allure.attachment_type.TEXT
            )

    @staticmethod
    def attach_assertions(assertion_results: list):
        """
        附加断言结果到Allure报告

        参数:
            assertion_results: 断言结果列表
        """
        with allure.step("断言结果"):
            passed = sum(1 for r in assertion_results if r.get("success"))
            failed = len(assertion_results) - passed

            summary = f"通过: {passed}, 失败: {failed}"
            allure.attach(
                summary,
                "断言摘要",
                allure.attachment_type.TEXT
            )

            # 详细的断言结果
            allure.attach(
                json.dumps(assertion_results, ensure_ascii=False, indent=2),
                "断言详情",
                allure.attachment_type.JSON
            )

10.4 在用例执行器中集成Allure

修改 utils/case_executor.py

# 在execute方法中添加Allure支持

from utils.allure_helper import AllureHelper

def execute(self, case: Dict[str, Any]) -> Dict[str, Any]:
    """执行用例(带Allure报告)"""
    # 添加用例信息
    AllureHelper.add_case_info(case)

    # 附加请求信息
    AllureHelper.attach_request(case["request"])

    # ... 执行请求 ...

    # 附加响应信息
    AllureHelper.attach_response(response)

    # ... 执行断言 ...

    # 附加断言结果
    AllureHelper.attach_assertions(assertion_results)

    return result

10.5 生成和查看Allure报告

10.5.1 运行测试生成报告

# 运行测试(会自动生成Allure结果)
pytest

# 生成HTML报告
allure generate reports/allure-results -o reports/allure-report --clean

# 打开报告(会自动打开浏览器)
allure open reports/allure-report

10.5.2 在代码中生成报告

import subprocess
import os

def generate_allure_report():
    """生成Allure报告"""
    results_dir = "reports/allure-results"
    report_dir = "reports/allure-report"

    # 生成报告
    subprocess.run([
        "allure", "generate",
        results_dir,
        "-o", report_dir,
        "--clean"
    ])

    # 打开报告
    subprocess.run(["allure", "open", report_dir])

10.6 Allure报告增强

10.6.1 添加环境信息

创建 reports/allure-results/environment.properties

Browser=Chrome
Browser.Version=120
Stand=Test
API.BaseURL=https://api.test.example.com
Python.Version=3.9
Pytest.Version=7.4.3

10.6.2 添加分类

创建 reports/allure-results/categories.json

[
  {
    "name": "Ignored tests",
    "matchedStatuses": ["skipped"]
  },
  {
    "name": "Infrastructure problems",
    "matchedStatuses": ["broken", "failed"],
    "messageRegex": ".*Connection.*"
  },
  {
    "name": "Outdated tests",
    "matchedStatuses": ["broken"],
    "traceRegex": ".*FileNotFoundException.*"
  }
]

11. 主题10:全局配置文件封装

11.1 配置文件结构设计

11.1.1 多环境配置

创建 config/test_config.yaml

# 测试环境配置
api:
  base_url: https://api.test.example.com
  timeout: 30
  retry_times: 3

database:
  host: test-db.example.com
  port: 3306
  username: test_user
  password: test_password
  database: test_db

logging:
  level: DEBUG
  file: logs/test.log
  format: "%(asctime)s [%(levelname)8s] %(name)s: %(message)s"
  date_format: "%Y-%m-%d %H:%M:%S"

allure:
  report_dir: reports/allure-results
  report_url: reports/allure-report

test_data:
  default_username: admin
  default_password: "123456"

创建 config/dev_config.yaml

# 开发环境配置
api:
  base_url: http://localhost:8000
  timeout: 10
  retry_times: 1

database:
  host: localhost
  port: 3306
  username: root
  password: root
  database: dev_db

logging:
  level: INFO
  file: logs/dev.log
  format: "%(asctime)s [%(levelname)8s] %(name)s: %(message)s"
  date_format: "%Y-%m-%d %H:%M:%S"

11.2 配置管理类实现

创建 config/config.py

"""
全局配置管理模块
"""
import os
import yaml
from pathlib import Path
from typing import Dict, Any, Optional

class Config:
    """配置管理类"""

    _instance = None
    _config = None

    def __new__(cls, env: Optional[str] = None):
        """单例模式"""
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self, env: Optional[str] = None):
        """初始化配置"""
        if self._initialized:
            return

        # 从环境变量获取环境名称,默认test
        self.env = env or os.getenv("TEST_ENV", "test")
        self.base_dir = Path(__file__).parent.parent
        self.config_file = self.base_dir / "config" / f"{self.env}_config.yaml"
        self._config = self._load_config()
        self._initialized = True

    def _load_config(self) -> Dict[str, Any]:
        """加载配置文件"""
        if not self.config_file.exists():
            raise FileNotFoundError(f"配置文件不存在: {self.config_file}")

        with open(self.config_file, "r", encoding="utf-8") as f:
            config = yaml.safe_load(f)

        # 环境变量可以覆盖配置文件
        self._override_with_env(config)

        return config

    def _override_with_env(self, config: Dict[str, Any]):
        """使用环境变量覆盖配置"""
        # API配置
        if "API_BASE_URL" in os.environ:
            config.setdefault("api", {})["base_url"] = os.environ["API_BASE_URL"]

        if "API_TIMEOUT" in os.environ:
            config.setdefault("api", {})["timeout"] = int(os.environ["API_TIMEOUT"])

        # 数据库配置
        if "DB_HOST" in os.environ:
            config.setdefault("database", {})["host"] = os.environ["DB_HOST"]

        if "DB_PORT" in os.environ:
            config.setdefault("database", {})["port"] = int(os.environ["DB_PORT"])

        if "DB_USERNAME" in os.environ:
            config.setdefault("database", {})["username"] = os.environ["DB_USERNAME"]

        if "DB_PASSWORD" in os.environ:
            config.setdefault("database", {})["password"] = os.environ["DB_PASSWORD"]

    @property
    def base_url(self) -> str:
        """获取API基础URL"""
        return self._config["api"]["base_url"]

    @property
    def timeout(self) -> int:
        """获取超时时间"""
        return self._config["api"]["timeout"]

    @property
    def db_config(self) -> Dict[str, Any]:
        """获取数据库配置"""
        return self._config.get("database", {})

    @property
    def logging_config(self) -> Dict[str, Any]:
        """获取日志配置"""
        return self._config.get("logging", {})

    @property
    def allure_config(self) -> Dict[str, Any]:
        """获取Allure配置"""
        return self._config.get("allure", {})

    @property
    def test_data(self) -> Dict[str, Any]:
        """获取测试数据配置"""
        return self._config.get("test_data", {})

    def get(self, key: str, default: Any = None) -> Any:
        """
        获取配置值(支持点号分隔的键)

        参数:
            key: 配置键,如 "api.base_url"
            default: 默认值

        返回:
            配置值
        """
        keys = key.split(".")
        value = self._config

        for k in keys:
            if isinstance(value, dict) and k in value:
                value = value[k]
            else:
                return default

        return value

    def reload(self):
        """重新加载配置"""
        self._config = self._load_config()

11.3 使用示例

from config.config import Config

# 创建配置对象(单例)
config = Config(env="test")

# 获取配置值
base_url = config.base_url
timeout = config.timeout
db_config = config.db_config

# 使用点号分隔的键获取配置
api_timeout = config.get("api.timeout", 30)

# 重新加载配置
config.reload()

12. 主题11:流程用例封装

12.1 什么是流程用例

流程用例是指将多个接口按照业务流程组合在一起的测试用例。例如:

  1. 用户注册 → 用户登录 → 获取用户信息 → 更新用户信息 → 用户注销

12.2 流程用例YAML格式

创建 tests/test_cases/test_user_flow.yaml

name: 用户完整流程测试
description: 测试用户从注册到注销的完整流程
type: flow
steps:
  - name: 用户注册
    case_file: test_register.yaml
    extract:
      user_id: body.data.user_id

  - name: 用户登录
    case_file: test_login.yaml
    extract:
      token: body.data.token

  - name: 获取用户信息
    case_file: test_user_info.yaml
    depends_on: [用户登录]

  - name: 更新用户信息
    case_file: test_update_user.yaml
    depends_on: [用户登录]

  - name: 用户注销
    case_file: test_logout.yaml
    depends_on: [用户登录]

12.3 流程用例执行器

创建 utils/flow_executor.py

"""
流程用例执行器
执行包含多个步骤的流程用例
"""
from typing import Dict, List, Any
from utils.case_executor import CaseExecutor
from utils.yaml_loader import YamlLoader

class FlowExecutor:
    """流程用例执行器"""

    def __init__(self, base_url: str = ""):
        """
        初始化流程执行器

        参数:
            base_url: API基础URL
        """
        self.executor = CaseExecutor(base_url=base_url)
        self.loader = YamlLoader()

    def execute_flow(self, flow_case: Dict[str, Any]) -> Dict[str, Any]:
        """
        执行流程用例

        参数:
            flow_case: 流程用例字典

        返回:
            执行结果字典
        """
        flow_name = flow_case.get("name", "unknown")
        steps = flow_case.get("steps", [])

        results = []
        all_passed = True

        for step in steps:
            step_name = step.get("name", "unknown")
            case_file = step.get("case_file")

            if not case_file:
                raise ValueError(f"步骤 '{step_name}' 缺少 case_file")

            # 加载步骤用例
            step_case = self.loader.load_file(case_file)

            # 执行步骤
            step_result = self.executor.execute(step_case)

            # 提取变量(如果步骤中定义了extract)
            if "extract" in step:
                # extract已经在executor.execute中处理了
                pass

            results.append({
                "step_name": step_name,
                "result": step_result
            })

            # 如果步骤失败,可以选择继续或停止
            if not step_result["success"]:
                all_passed = False
                # 根据配置决定是否继续执行后续步骤
                if flow_case.get("stop_on_failure", True):
                    break

        return {
            "success": all_passed,
            "flow_name": flow_name,
            "steps": results
        }

12.4 使用示例

from utils.flow_executor import FlowExecutor
from utils.yaml_loader import YamlLoader

# 创建流程执行器
flow_executor = FlowExecutor(base_url="https://api.example.com")

# 加载流程用例
loader = YamlLoader()
flow_case = loader.load_file("test_user_flow.yaml")

# 执行流程
result = flow_executor.execute_flow(flow_case)

# 检查结果
if result["success"]:
    print("流程测试通过")
else:
    print("流程测试失败")
    for step in result["steps"]:
        if not step["result"]["success"]:
            print(f"失败步骤: {step['step_name']}")

13. 主题12:数据驱动封装

13.1 什么是数据驱动

数据驱动是指将测试数据和测试逻辑分离,使用不同的测试数据执行相同的测试逻辑。

13.2 数据驱动YAML格式

创建 data/test_users.yaml

# 测试用户数据
users:
  - username: admin
    password: "123456"
    expected_code: 0
    expected_message: "登录成功"

  - username: test_user
    password: "test123"
    expected_code: 0
    expected_message: "登录成功"

  - username: invalid_user
    password: "wrong_password"
    expected_code: 1001
    expected_message: "用户名或密码错误"

13.3 数据驱动执行器

修改 utils/case_executor.py,添加数据驱动支持:

# 在CaseExecutor类中添加数据驱动方法

def execute_with_data(self, case: Dict[str, Any], data: Dict[str, Any]) -> Dict[str, Any]:
    """
    使用数据执行用例(数据驱动)

    参数:
        case: 用例字典
        data: 测试数据字典

    返回:
        执行结果字典
    """
    # 合并用例和数据
    merged_case = case.copy()

    # 替换用例中的变量为数据值
    if "request" in merged_case:
        request = merged_case["request"]

        # 替换json中的数据
        if "json" in request:
            for key, value in data.items():
                if key in request["json"]:
                    request["json"][key] = value

        # 替换params中的数据
        if "params" in request:
            for key, value in data.items():
                if key in request["params"]:
                    request["params"][key] = value

    # 替换断言中的期望值
    if "validate" in merged_case:
        for assertion in merged_case["validate"]:
            assertion_type = list(assertion.keys())[0]
            args = assertion[assertion_type]

            # 如果期望值在数据中,使用数据中的值
            if len(args) >= 2 and args[1] in data:
                args[1] = data[args[1]]

    # 执行用例
    return self.execute(merged_case)

13.4 使用示例

import pytest
from utils.case_executor import CaseExecutor
from utils.yaml_loader import YamlLoader
import yaml

# 加载测试数据
with open("data/test_users.yaml", "r", encoding="utf-8") as f:
    test_data = yaml.safe_load(f)

# 创建执行器
executor = CaseExecutor(base_url="https://api.example.com")

# 加载用例
loader = YamlLoader()
case = loader.load_file("test_login.yaml")

# 数据驱动执行
@pytest.mark.parametrize("user_data", test_data["users"])
def test_login_with_data(user_data):
    """数据驱动测试登录"""
    result = executor.execute_with_data(case, user_data)

    # 验证结果
    assert result["success"] == True
    assert result["response"].json()["code"] == user_data["expected_code"]

14. 主题13:MD5、Base64、RSA等加密封装

14.1 加密工具类实现

创建 utils/encrypt.py

"""
加密工具模块
支持MD5、Base64、RSA等加密算法
"""
import hashlib
import base64
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5
from typing import Union

class EncryptHelper:
    """加密辅助类"""

    @staticmethod
    def md5(text: str, encoding: str = "utf-8") -> str:
        """
        MD5加密

        参数:
            text: 要加密的文本
            encoding: 编码方式

        返回:
            MD5哈希值(32位小写)

        示例:
            EncryptHelper.md5("hello world")
            # 返回: "5eb63bbbe01eeed093cb22bb8f5acdc3"
        """
        md5_hash = hashlib.md5()
        md5_hash.update(text.encode(encoding))
        return md5_hash.hexdigest()

    @staticmethod
    def md5_upper(text: str, encoding: str = "utf-8") -> str:
        """MD5加密(大写)"""
        return EncryptHelper.md5(text, encoding).upper()

    @staticmethod
    def sha256(text: str, encoding: str = "utf-8") -> str:
        """
        SHA256加密

        参数:
            text: 要加密的文本
            encoding: 编码方式

        返回:
            SHA256哈希值
        """
        sha256_hash = hashlib.sha256()
        sha256_hash.update(text.encode(encoding))
        return sha256_hash.hexdigest()

    @staticmethod
    def base64_encode(text: Union[str, bytes], encoding: str = "utf-8") -> str:
        """
        Base64编码

        参数:
            text: 要编码的文本或字节
            encoding: 编码方式

        返回:
            Base64编码后的字符串

        示例:
            EncryptHelper.base64_encode("hello world")
            # 返回: "aGVsbG8gd29ybGQ="
        """
        if isinstance(text, str):
            text = text.encode(encoding)
        return base64.b64encode(text).decode("utf-8")

    @staticmethod
    def base64_decode(encoded_text: str, encoding: str = "utf-8") -> str:
        """
        Base64解码

        参数:
            encoded_text: Base64编码的字符串
            encoding: 编码方式

        返回:
            解码后的字符串

        示例:
            EncryptHelper.base64_decode("aGVsbG8gd29ybGQ=")
            # 返回: "hello world"
        """
        decoded_bytes = base64.b64decode(encoded_text)
        return decoded_bytes.decode(encoding)

    @staticmethod
    def rsa_encrypt(text: str, public_key: str) -> str:
        """
        RSA加密

        参数:
            text: 要加密的文本
            public_key: 公钥(PEM格式)

        返回:
            Base64编码的加密结果

        示例:
            public_key = """-----BEGIN PUBLIC KEY-----
            ...
            -----END PUBLIC KEY-----"""
            encrypted = EncryptHelper.rsa_encrypt("hello", public_key)
        """
        # 加载公钥
        rsa_key = RSA.importKey(public_key)
        cipher = PKCS1_v1_5.new(rsa_key)

        # 加密
        encrypted_bytes = cipher.encrypt(text.encode("utf-8"))

        # Base64编码
        return base64.b64encode(encrypted_bytes).decode("utf-8")

    @staticmethod
    def rsa_decrypt(encrypted_text: str, private_key: str) -> str:
        """
        RSA解密

        参数:
            encrypted_text: Base64编码的加密文本
            private_key: 私钥(PEM格式)

        返回:
            解密后的文本
        """
        # 加载私钥
        rsa_key = RSA.importKey(private_key)
        cipher = PKCS1_v1_5.new(rsa_key)

        # Base64解码
        encrypted_bytes = base64.b64decode(encrypted_text)

        # 解密
        decrypted_bytes = cipher.decrypt(encrypted_bytes, None)

        return decrypted_bytes.decode("utf-8")

14.2 在YAML用例中使用加密

name: 加密登录测试
request:
  method: POST
  url: /api/login
  json:
    username: admin
    password: ${md5("123456")}  # 使用MD5加密密码

14.3 在变量替换中支持加密函数

修改 utils/extractor.py

# 在replace_variables方法中添加加密函数支持

import re
from utils.encrypt import EncryptHelper

def replace_variables(self, data: Any) -> Any:
    """替换变量(支持加密函数)"""
    if isinstance(data, str):
        # 替换加密函数
        # ${md5("text")}
        data = re.sub(
            r'${md5(["']([^"']+)["'])}',
            lambda m: EncryptHelper.md5(m.group(1)),
            data
        )

        # ${base64_encode("text")}
        data = re.sub(
            r'${base64_encode(["']([^"']+)["'])}',
            lambda m: EncryptHelper.base64_encode(m.group(1)),
            data
        )

        # ... 其他加密函数 ...

        # 替换普通变量
        pattern = r'${([^}:]+)(?::([^}]+))?}'
        # ... 原有的变量替换逻辑 ...

    # ... 其他类型的处理 ...

15. 主题14:接口签名验证封装

15.1 签名工具类实现

创建 utils/signature.py

"""
接口签名工具
生成和验证接口签名
"""
import hashlib
import hmac
import time
import random
import string
from typing import Dict, Any, Optional
from utils.encrypt import EncryptHelper

class SignatureHelper:
    """签名辅助类"""

    @staticmethod
    def generate_md5_sign(params: Dict[str, Any], secret_key: str, 
                          exclude_keys: Optional[list] = None) -> str:
        """
        生成MD5签名

        参数:
            params: 请求参数字典
            secret_key: 密钥
            exclude_keys: 排除的键(不参与签名)

        返回:
            签名字符串

        签名规则:
            1. 按键名排序
            2. 拼接成 key1=value1&key2=value2 格式
            3. 末尾添加密钥
            4. MD5加密
        """
        exclude_keys = exclude_keys or ["sign"]

        # 过滤并排序参数
        filtered_params = {
            k: v for k, v in params.items()
            if k not in exclude_keys and v is not None
        }

        sorted_params = sorted(filtered_params.items())

        # 拼接字符串
        sign_string = "&".join([f"{k}={v}" for k, v in sorted_params])
        sign_string += f"&key={secret_key}"

        # MD5加密
        return EncryptHelper.md5(sign_string).upper()

    @staticmethod
    def generate_hmac_sha256_sign(params: Dict[str, Any], secret_key: str,
                                  exclude_keys: Optional[list] = None) -> str:
        """
        生成HMAC-SHA256签名

        参数:
            params: 请求参数字典
            secret_key: 密钥
            exclude_keys: 排除的键

        返回:
            签名字符串
        """
        exclude_keys = exclude_keys or ["sign"]

        # 过滤并排序参数
        filtered_params = {
            k: v for k, v in params.items()
            if k not in exclude_keys and v is not None
        }

        sorted_params = sorted(filtered_params.items())

        # 拼接字符串
        sign_string = "&".join([f"{k}={v}" for k, v in sorted_params])

        # HMAC-SHA256加密
        signature = hmac.new(
            secret_key.encode("utf-8"),
            sign_string.encode("utf-8"),
            hashlib.sha256
        ).hexdigest().upper()

        return signature

    @staticmethod
    def add_timestamp_and_nonce(params: Dict[str, Any]) -> Dict[str, Any]:
        """
        添加时间戳和随机数

        参数:
            params: 请求参数字典

        返回:
            添加了时间戳和随机数的参数字典
        """
        params = params.copy()
        params["timestamp"] = str(int(time.time()))
        params["nonce"] = "".join(
            random.choices(string.ascii_letters + string.digits, k=16)
        )
        return params

    @staticmethod
    def verify_sign(params: Dict[str, Any], secret_key: str,
                   sign_key: str = "sign", algorithm: str = "md5") -> bool:
        """
        验证签名

        参数:
            params: 请求参数字典(包含sign)
            secret_key: 密钥
            sign_key: 签名字段名
            algorithm: 签名算法(md5或hmac_sha256)

        返回:
            签名是否正确
        """
        if sign_key not in params:
            return False

        received_sign = params[sign_key]

        # 根据算法生成签名
        if algorithm == "md5":
            generated_sign = SignatureHelper.generate_md5_sign(params, secret_key)
        elif algorithm == "hmac_sha256":
            generated_sign = SignatureHelper.generate_hmac_sha256_sign(params, secret_key)
        else:
            raise ValueError(f"不支持的签名算法: {algorithm}")

        return received_sign == generated_sign

15.2 在HTTP客户端中自动添加签名

修改 clients/http_client.py

# 在HttpClient类中添加签名支持

from utils.signature import SignatureHelper

class HttpClient:
    def __init__(self, base_url: str = "", secret_key: Optional[str] = None):
        # ... 前面的代码 ...
        self.secret_key = secret_key

    def request(self, method: str, url: str, add_sign: bool = False,
                sign_algorithm: str = "md5", **kwargs) -> requests.Response:
        """
        发送HTTP请求(支持自动添加签名)

        参数:
            method: HTTP方法
            url: 请求URL
            add_sign: 是否添加签名
            sign_algorithm: 签名算法
            **kwargs: 其他requests参数
        """
        # 如果需要添加签名
        if add_sign and self.secret_key:
            # 获取请求参数
            params = kwargs.get("params", {})
            json_data = kwargs.get("json", {})
            data = kwargs.get("data", {})

            # 合并参数
            all_params = {**params, **json_data, **data}

            # 添加时间戳和随机数
            all_params = SignatureHelper.add_timestamp_and_nonce(all_params)

            # 生成签名
            if sign_algorithm == "md5":
                sign = SignatureHelper.generate_md5_sign(all_params, self.secret_key)
            elif sign_algorithm == "hmac_sha256":
                sign = SignatureHelper.generate_hmac_sha256_sign(all_params, self.secret_key)
            else:
                raise ValueError(f"不支持的签名算法: {sign_algorithm}")

            # 添加签名到参数
            all_params["sign"] = sign

            # 更新kwargs
            if json_data:
                kwargs["json"] = all_params
            elif data:
                kwargs["data"] = all_params
            else:
                kwargs["params"] = all_params

        # ... 发送请求 ...

15.3 在YAML用例中使用签名

name: 带签名的接口测试
request:
  method: POST
  url: /api/secure/endpoint
  add_sign: true                    # 启用签名
  sign_algorithm: md5               # 签名算法
  json:
    user_id: 123
    action: get_data

16. 主题15:基础路径Base_URL封装

16.1 为什么需要Base_URL封装

在测试中,不同环境有不同的API基础URL:

  • 测试环境:https://api.test.example.com
  • 开发环境:http://localhost:8000
  • 生产环境:https://api.example.com

统一管理Base_URL可以方便切换环境。

16.2 Base_URL管理器实现

修改 clients/http_client.py

# 在HttpClient类中完善Base_URL处理

class HttpClient:
    def __init__(self, base_url: str = ""):
        """
        初始化HTTP客户端

        参数:
            base_url: API基础URL(如果为空,从配置中读取)
        """
        if not base_url:
            from config.config import Config
            config = Config()
            base_url = config.base_url

        self.base_url = base_url.rstrip("/")  # 移除末尾的斜杠
        # ... 其他初始化代码 ...

    def request(self, method: str, url: str, **kwargs) -> requests.Response:
        """
        发送HTTP请求

        参数:
            method: HTTP方法
            url: 请求URL(如果以http开头,视为完整URL;否则拼接base_url)
            **kwargs: 其他requests参数
        """
        # 如果URL是完整的(以http开头),直接使用
        if url.startswith("http://") or url.startswith("https://"):
            full_url = url
        else:
            # 确保URL以斜杠开头
            if not url.startswith("/"):
                url = "/" + url
            full_url = f"{self.base_url}{url}"

        # ... 发送请求 ...

16.3 使用示例

from clients.http_client import HttpClient
from config.config import Config

# 方式1:使用配置中的Base_URL
client = HttpClient()  # 自动从配置中读取

# 方式2:手动指定Base_URL
client = HttpClient(base_url="https://api.example.com")

# 发送请求(自动拼接Base_URL)
response = client.get("/api/user/info")  # 实际请求: https://api.example.com/api/user/info

# 完整URL(不拼接Base_URL)
response = client.get("https://other-api.com/data")  # 直接使用完整URL

17. 主题16:完整零代码极限封装

17.1 零代码框架设计

零代码框架的目标是:测试人员只需要编写YAML用例,不需要写任何Python代码

17.2 自动化测试运行器

创建 runner/auto_runner.py

"""
自动化测试运行器
自动发现和执行YAML用例,无需编写Python代码
"""
import pytest
from pathlib import Path
from utils.yaml_loader import YamlLoader
from utils.case_executor import CaseExecutor
from utils.case_sorter import CaseSorter
from config.config import Config
import allure

class AutoRunner:
    """自动化测试运行器"""

    def __init__(self):
        self.config = Config()
        self.loader = YamlLoader()
        self.executor = CaseExecutor(base_url=self.config.base_url)
        self.sorter = CaseSorter()

    def discover_and_run(self, case_dir: Optional[str] = None):
        """
        自动发现并运行所有YAML用例

        参数:
            case_dir: 用例目录(可选)
        """
        # 加载所有用例
        cases = self.loader.load_dir(case_dir)

        # 排序用例
        sorted_cases = self.sorter.sort_cases(cases)

        # 执行用例
        results = []
        for case in sorted_cases:
            # 添加Allure信息
            allure.dynamic.title(case.get("name", "unknown"))
            if "description" in case:
                allure.dynamic.description(case["description"])

            # 执行用例
            result = self.executor.execute(case)
            results.append(result)

            # 断言结果
            assert result["success"], f"用例失败: {case.get('name', 'unknown')}"

        return results

17.3 pytest自动发现YAML用例

创建 tests/conftest.py

"""
pytest配置文件
自动发现YAML用例并转换为pytest测试
"""
import pytest
from utils.yaml_loader import YamlLoader
from utils.case_executor import CaseExecutor
from utils.case_sorter import CaseSorter
from config.config import Config
import allure

# 全局fixture
@pytest.fixture(scope="session")
def config():
    """配置fixture"""
    return Config()

@pytest.fixture(scope="session")
def executor(config):
    """用例执行器fixture"""
    return CaseExecutor(base_url=config.base_url)

# 自动发现YAML用例
def pytest_generate_tests(metafunc):
    """自动生成测试用例"""
    if "yaml_case" in metafunc.fixturenames:
        loader = YamlLoader()
        cases = loader.load_dir()
        sorter = CaseSorter()
        sorted_cases = sorter.sort_cases(cases)

        metafunc.parametrize("yaml_case", sorted_cases)

# 测试函数
def test_yaml_case(executor, yaml_case):
    """执行YAML用例"""
    # 添加Allure信息
    allure.dynamic.title(yaml_case.get("name", "unknown"))
    if "description" in yaml_case:
        allure.dynamic.description(yaml_case["description"])

    # 执行用例
    result = executor.execute(yaml_case)

    # 断言
    assert result["success"], f"用例失败: {yaml_case.get('name', 'unknown')}"

17.4 使用pytest运行零代码测试

# 运行所有YAML用例
pytest tests/ -v

# 运行特定标签的用例
pytest tests/ -v -m "smoke"

# 生成Allure报告
pytest tests/ --alluredir=reports/allure-results
allure generate reports/allure-results -o reports/allure-report --clean
allure open reports/allure-report

17.5 完整的YAML用例示例

name: 用户完整流程测试
description: 测试用户从注册到注销的完整流程
tags: [smoke, user]
severity: critical
order: 1

steps:
  - name: 用户注册
    case_file: test_register.yaml
    extract:
      user_id: body.data.user_id

  - name: 用户登录
    case_file: test_login.yaml
    extract:
      token: body.data.token

  - name: 获取用户信息
    case_file: test_user_info.yaml

  - name: 更新用户信息
    case_file: test_update_user.yaml

  - name: 用户注销
    case_file: test_logout.yaml

18. 总结

18.1 框架特点

本教程构建的零代码接口自动化测试框架具有以下特点:

  1. 零代码:测试人员只需要编写YAML用例,无需编写Python代码
  2. 易维护:用例与代码分离,易于维护和修改
  3. 功能完整:支持断言、变量提取、数据驱动、加密、签名等
  4. 企业级:支持Allure报告、数据库断言、异常处理等
  5. 可扩展:易于扩展新功能

18.2 使用流程

  1. 编写YAML用例:在 tests/test_cases/ 目录下编写YAML用例
  2. 配置环境:在 config/ 目录下配置不同环境的配置
  3. 运行测试:使用 pytest 命令运行测试
  4. 查看报告:使用 allure 查看测试报告

18.3 最佳实践

  1. 用例命名规范:使用有意义的用例名称
  2. 用例组织:按模块组织用例文件
  3. 变量管理:合理使用变量提取和替换
  4. 断言完整:确保断言覆盖关键验证点
  5. 日志记录:充分利用日志功能定位问题

18.4 扩展方向

  1. 支持更多协议:WebSocket、gRPC等
  2. 性能测试:集成性能测试功能
  3. CI/CD集成:与Jenkins、GitLab CI等集成
  4. 可视化界面:开发Web界面管理用例
  5. Mock服务:集成Mock服务功能

19. 附录

19.1 完整项目结构

api_test_framework/
├── config/                      # 配置目录
│   ├── __init__.py
│   ├── config.py
│   ├── test_config.yaml
│   └── dev_config.yaml
│
├── clients/                     # HTTP客户端
│   ├── __init__.py
│   └── http_client.py
│
├── utils/                       # 工具类
│   ├── __init__.py
│   ├── logger.py
│   ├── yaml_loader.py
│   ├── case_executor.py
│   ├── case_sorter.py
│   ├── extractor.py
│   ├── assertion.py
│   ├── db_helper.py
│   ├── encrypt.py
│   ├── signature.py
│   ├── exception_handler.py
│   ├── allure_helper.py
│   ├── flow_executor.py
│   └── file_watcher.py
│
├── tests/                       # 测试用例
│   ├── __init__.py
│   ├── conftest.py
│   └── test_cases/
│       ├── test_login.yaml
│       ├── test_user.yaml
│       └── test_flow.yaml
│
├── data/                        # 测试数据
│   └── test_users.yaml
│
├── logs/                        # 日志目录
├── reports/                     # 报告目录
├── requirements.txt
├── pytest.ini
└── README.md

19.2 常用命令

# 安装依赖
pip install -r requirements.txt

# 运行测试
pytest

# 运行特定用例
pytest tests/test_cases/test_login.yaml

# 生成Allure报告
pytest --alluredir=reports/allure-results
allure generate reports/allure-results -o reports/allure-report --clean
allure open reports/allure-report

# 运行并生成HTML报告
pytest --html=reports/report.html --self-contained-html

19.3 参考资源


教程结束

希望这份详细的教程能够帮助你从零开始构建一个完整的零代码接口自动化测试框架!如果在学习过程中遇到问题,欢迎随时查阅相关文档或寻求帮助。

祝你学习愉快!🎉

发表评论