接口自动化测试框架

接口自动化测试框架

python+unittest+requests+ddt+openpyxl+pymysql+logging+unittestreport+jenkins



|-- conf  # 配置文件层
|   |-- setting.py  # 配置文件,存放数据库连接信息、接口地址等全局配置
|-- logs  #日志管理层
|   |-- test.log  # 日志文件,记录测试过程中的详细信息(如请求详情、断言结果、错误信息)
|-- test_datas  # 数据管理层
|   |-- test_data.xlsx  # Excel 文件,用于存储测试数据
|-- test_cases  # 测试用例层
|   |-- test_1_login.py  # 登陆
|   |-- test_2_upload.py  # 图片上传
|   |-- test_3_create_product.py  # 创建商品
|   |-- test_4_member_register.py  # 用户注册
|   |-- test_5_member_login.py  # 用户登陆
|   |-- test_6_place_order.py  # 用户下单
|   |-- test_7_unittest_all.py  # 集成所有用例
|-- tools  #  工具层
|   |-- handle_excel.py  # 读取/写入Excel数据,解析测试用例和参数
|   |-- handle_db.py  # 数据库操作封装,用pymysql连接mysql数据库,操作数据库等一系列方法,进行数据校验;
|   |-- handle_phone.py  # 封装一个可以通过faker库造数据生成随机手机号的方法
|   |-- handel_extract.py  # 从接口响应结果中提取全局变量用于:鉴权、参数依赖提取
|   |-- handle_replace.py  # 封装一个参数替换的工具,对请求参数进行处理,返回可以直接发送请求的参数
|   |-- handle_path.py  # 存放路径封装
|   |-- handle_requests.py  # 请求封装,兼容图片上传的接口
|   |-- handle_reponse.py  # 响应结果处理
|   |-- handle_attribute.py  # 动态参数设置成类属性
|   |-- handle_space.py  # 空格处理工具
|   |-- handle_logs.py  # 配置和获取日志记录器
|-- main.py  # 框架执行入口
阶段 工具 作用
数据准备阶段
handle_excel.py
读取Excel测试数据(用例参数、预期结果),为数据驱动提供输入

handle_phone.py
生成唯一手机号,结合
handle_db
校验数据库防重复

handle_db.py
执行前置SQL(如清理历史数据)、生成初始数据

setting.py
提供数据库连接、账号密码等全局配置
请求准备阶段
handle_replace.py
替换动态参数(
#变量#
),整合时间戳、UUID、全局变量等数据源

handle_attribute.py
存储动态生成的参数(如Token、订单号),实现跨用例参数传递

handle_path.py
统一管理文件路径(如测试数据、图片上传路径)
请求发送阶段
handle_requests.py
发送HTTP请求,自动注入鉴权头(从
handle_attribute
读取Token)

handle_extract.py
实时提取响应中的关键数据(如Token),更新到
handle_attribute
响应处理阶段
handle_response.py
解析响应格式(JSON/文本),统一异常处理

handle_space.py
清洗响应数据中的空格/换行符,确保断言准确性
断言验证阶段
handle_response.py
断言状态码、业务状态、响应时间等基础规则

handle_db.py
执行数据库验证(如订单状态变更、数据落库)

handle_space.py
优化断言SQL语法(如
SELECT
 → 
SELECT
),避免语法错误
日志与报告阶段
handle_logs.py
记录执行日志(DEBUG/ERROR),按天滚动存储

unittestreport
聚合测试结果,生成可视化HTML报告

Jenkins
定时触发测试任务,集成到CI/CD流水线
配置文件层(conf)

在配置层存放账号登陆信息、数据库连接信息等,避免硬编码;



#setting.py 用于存放当前框架中的所有配置信息(使用不频繁,修改不频繁的数据)
#通过配置文件传参-软编码;在代码中写死-硬编码;
 
 
#管理者登陆账号信息
user_info = {"user_name":"xdfjewg","password":"dfergbu"}
 
#图片上传信息
image_info = {"file_name":"song.png","file_type":"image/png"}
 
#连接数据库信息
mysql_info = {"host":"11.111.111.11","post":"3306","user":"lemon","password":"vdnjkvs","db":"fsegjvskjv"}
 
#数据库断言替换信息,如"file_path":"2022/04/cd54sdgruissafmscsd/vds"图片上传路径
assert_db_info = {}
 
#前置sql语句执行需要替换的参数
setup_sql_info = {}
工具层(tools)
路径管理工具handle_path

该文档定义了项目关键路径配置脚本,通过 
os
和 
time
库动态生成项目根目录、测试数据路径、图片路径、测试报告目录等核心路径,并创建时间戳格式的测试报告名称。

os​​:操作系统交互库,用于路径操作


os.path.dirname()
:获取父目录路径


os.path.join()
:智能拼接路径


os.path.abspath(__file__)
:获取当前文件的绝对路径

​time​​:时间处理库


time.strftime()
:格式化时间戳


time.localtime()
:获取本地时间



import os
 
import time
 
 
 
#项目根目录,其中os.path处理文件和目录路径,dirname返回路径的目录部分,不包含文件名;abspath返回指定文件绝对路径;__file__当前执行文件;
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 
 
#日志目录
log_name = time.strftime("%Y%m%d",time.localtime())
log_dir_name = os.path.join(base_dir,"logs",f"{log_name}.log")
 
 
#测试数据的路径,join拼接多个路径,生成完整路径;
data_dir = os.path.join(base_dir,"test_data","case_data.xlsx")
 
 
#图片地址
image_dir = os.path.join(base_dir,"images","song.png")
 
 
 
#测试用例文件目录
case_dir = os.path.join(base_dir,"test_cases")
 
 
 
#测试报告名称
report_name = time.strftime("%Y%m%d_%H%M%S",time.localtime())
 
 
#测试报告目录
report_dir = os.path.join(base_dir,"reports")
 
 
#历史报告目录
history_report_dir = os.path.join(base_dir,"reports","history")
excel测试用例读取工具handle_excel

定义了一个名为 
HandleExcel
的类,提供加载 Excel 文件、读取指定 sheet 页的测试用例数据(返回字典列表)、关闭文件的方法。​

​openpyxl​:
load_workbook
:加载 Excel 工作簿


iter_rows(values_only=True)
:读取单元格数据(直接获取值而非单元格对象)

​​
__init__(self, file_name):
初始化方法:加载 Excel 文件,获取所有 sheet 名称列表

​​​​
get_excel_test_cases(self, sheet_name)

根据 sheet 名称获取工作表对象:
sheet_obj = self.wb_obj[sheet_name]

读取表头和数据行:
datas = list(sheet_obj.iter_rows(values_only=True))

返回字典列表格式的测试用例数据

将每行数据与表头组合成字典:
dict(zip(case_title, case))


close_file(self)
​:关闭 Excel 文件:
self.wb_obj.close()

模块/类 方法/属性 使用场景描述

openpyxl

load_workbook()
加载指定的 Excel 文件,返回一个 
Workbook
 对象。

Workbook
 对象

.sheetnames
(属性)
获取 Excel 文件中所有 Sheet 的名称列表。

[sheet_name]
(通过键访问)
根据 Sheet 名称获取对应的 
Worksheet
 对象。

.iter_rows(values_only=True)
按行迭代 Sheet 中的数据,
values_only=True
 表示返回单元格的实际值。

.close()
关闭 Excel 文件,释放资源。
Python 内置函数
list()
将迭代器(如 
iter_rows
 的结果)转换为列表。

zip()
将表头和数据按列组合,生成键值对。

dict()
将 
zip
 生成的键值对转换为字典,形成单条用例数据。

list
 对象

.append()

将单条用例数据(字典)追加到结果列表中。

 



from openpyxl import load_workbook
 
 
class HandleExcel:
    def __init__(self,file_name):  #file_name: 测试用例文件名称(绝对路径)
        self.wb_obj = load_workbook(filename=file_name) #load_workbook加载excel
 
        #获取所有excel中sheet的名称
        self.sheet_names = self.wb_obj.sheetnames
 
    def get_excel_test_cases(self,sheet_name):  #sheet_name: excel中sheet名称
        #临时变量存放数据
        cases_list = []  
        #获取指定表单对象
        sheet_obj = self.wb_obj[sheet_name]
        #iter_rows迭代所有行数据,按行读取封装成list输出结果类似于[('id','title'),(1,'登陆成功')],values_only是False返回对象,True返回单元格对应数据,datas获取表单sheet中所有数据
        datas = list(sheet_obj.iter_rows(values_only=True))
        #获取表头
        case_title = datas[0] 
        #获取表数据
        case_datas  = datas[1:] 
        #遍历每一行数据
        for case in case_datas:
            #zip(key,val)把两个元素压缩在一起,再把每一行数据使用dict转换成字典类型,zip函数是python的内置函数
            result = dict(zip(case_title,case))
            cases_list.append(result)
        self.close_file()
        return cases_list
 
    def close_file(self):
        #关闭excel
        self.wb_obj.close()  
 
if __name__ == '__main__':
      #存放excel测试数据的文件名(放本目录下的路径)
      cl = HandleExcel(file_name="case_data.xlsx",sheet_name="login")
      cl.get_excel_test_cases()
全局属性管理器handle_attribute

全局属性管理器,存储接口返回的全局变量



class HandleAttr:
    pass
手机号生成工具handle_phone

定义了一个名为 
HandlePhone
的类,提供生成随机手机号并通过数据库查询验证其唯一性(确保未注册)的方法。


Faker
:用于生成随机手机号

通过 
tools.handle_db.mysql
模块操作数据库(执行 SQL 查询)


__init__(self)
: 初始化一个Faker对象


__check_phone(self, phone)
: 私有方法,用于检查手机号是否已存在于数据库的用户表中


get_phone(self)
: 使用Faker库循环生成随机手机号,直到生成一个数据库中不存在的手机号,然后返回



from faker import Faker
from tools.handle_db import mysql
 
class HandlePhone:
    def __init__(self):
        self.fk = Faker(locale="zh-cn")
 
 
    def __check_phone(self,phone):
        sql = "SELECT * FROM 表名 WHERE user_mobile = '{}'".format(phone)
        #去数据库查询是否注册,注册了再重新生成,直到在数据库中找不到,就表示未注册;
        result = mysql.get_datas(sql=sql)
        return result
 
    def get_phone(self):
        while True:
            #生成新的手机号
            phone = self.fk.phone_number()
            # 去数据库校验是否已注册
            result = self.__check_phone(phone=phone)
            if len(result)>0:
                #手机号已存在,需要重新生成手机号
                continue
            else:
                return phone
参数替换工具handle_replace

定义了名为 
HandleReplace
 的​​参数替换工具类​​,实现了自动化替换请求参数/SQL语句中的占位符(如 
#user_name#
),支持从配置文件、动态生成值、数据库查询、手机号生成等多数据源获取替换值


re
 – 正则匹配占位符
ast
 – 安全解析字符串为Python对象
uuid
 – 生成唯一标识符
time
 – 获取时间戳

def replace_data调用方法及其顺序如下所示:

def __handel_str         删除换行符和空格def __get_replace_keys    获取需要替换的参数名称def __set_attribute,其中__set_attribute调用方法及其顺序如下:

        3-1 def replace_sql,其中replace_sq调用方法及其顺序如下所示:

                3-1-1 def __get_replace_keys

                3-1-2 def __get_data_and_set_attribute  根据数据来源,获取数据,设置为类属性

        3-2 def __execute_sql_and_setattr

        3-3 def __get_data_and_set_attribute    根据数据来源,获取数据,设置为类属性

自定义方法名 所用模块/函数 解决的问题

__handle_str

for循环

replace函数

数据清洗:清理请求参数中的 

 和空格,确保JSON格式合法,避免解析错误


__get_replace_keys

re
模块:Python的正则表达式操作模块
findall
函数:返回字符串中所有匹配正则的子串(列表形式)
正则表达式#(w.+?)#:
匹配任意字符至少一次,但遇到第一个结束标记(如 
#
)就停止,​避免跨越多余内容;


w
 → 匹配字母/数字/下划线
.+?
 → 非贪婪匹配任意字符(至少一个)

通过正则 
#(w.+?)#
 找出用#包裹起来的参数名并提取出来识别为动态参数(如 
#user_name#

__get_data_and_set_attribute

if key in user_info:
取 
user_info[key]
 的值,并设置为
HandleAttr
 类的属性

setattr():
Python 内置函数,动态设置对象的属性值

elif key == "sessionUUID":
生成形如 
a1b2c3d4-1234-5678-90ab-cdef12345678
 的UUID并存入类属性

uuid.uuid4():uuid
 标准模块,生成全局唯一标识符(UUID 格式字符串)

elif key == "partyCode" or "bizPayNo":
生成形如 
1620000000000
 的毫秒级时间戳并存入类属性(若属性不存在)

time.time():time
 标准模块,生成唯一业务编码(如 
partyCode

bizPayNo
),避免订单号或流水号重复

elif key == "mobile":
生成形如 
13812345678
 的手机号并存入类属性(若属性不存在)

handlePhone.get_phone():
自定义类方法,生成未注册的手机号

replace_sql

re.findall()


getattr()
替换SQL中的动态参数(如 
#partyCode#
),生成可执行SQL语句

__execute_sql_and_setattr

mysql.get_data_dict()


setattr()
执行预处理后的SQL,将查询结果映射到类属性(如用户ID、订单号),供后续接口请求使用

__set_attribute

ast.literal_eval()


self.replace_sql()


self.__execute_sql_and_setattr
统一处理属性来源:
– 配置数据
– SQL查询结果
– 动态生成值,确保参数替换前数据就绪

replace_data


ast.literal_eval()


self.__handle_str


self.__set_attribute

入口方法:
1. 清洗原始数据 → 2. 提取参数名 → 3. 设置属性 → 4. 替换参数 → 5. 生成JSON请求体

 



{
    #时间戳
    "t": "",
    #用户名
    "principal":"#user_name#",
    #密码
    "credentials":"123456a",
    #类似于access-token
    "sessionUUID": session_uuid,
    #验证码
    "imageCode":"lemon"
}
逻辑:
1、传入请求参数data,excel中读取到的请求参数
2、删除请求参数中的换行符和空格
3、获取需要替换的参数名称
4、通过参数名称获取到参数的值,获取到值统一设置为类属性
5、将参数的值替换掉参数名称和#号
"""
import re  #正则表达式会用
import ast
import uuid
import time
from tools.handle_path import data_dir
from tools.handle_excel import HandleExcel
from conf.setting import user_info
from tools.handle_attribute import HandleAttr
from tools.handle_phone import HandlePhone
from tools.handle_db import mysql
from tools.handle_logs import myLog
class HandleReplace:
    def __init__(self):
        self.handle_phone =  HandlePhone()
    # 删除换行符和空格键
    # data(str): excel中获取的请求参数
    # return data:去掉空格后的请求参数
    def __handle_str(self,data:str):
        for str_data in ["
"," "]:
            data = data.replace(str_data,"")
        return data
    # 获取需要替换参数的名称
    def __get_replace_keys(self,data:str):
        #"#(w.+?)#"正则表达式
        key_list = re.findall("#(w.+?)#",data)
        return key_list
    # 根据数据的来源,获取数据,设置为类属性
        def __get_data_and_set_attribute(self,key:str,user_info):
        if key in user_info:
            # 取出来设置为类属性
            setattr(HandleAttr,key,str(user_info[key]))
        elif key == "sessionUUID":
            # 特殊处理,通过脚本生成sessionUUID
            setattr(HandleAttr,key,str(uuid.uuid4()))
        # 添加产品的唯一字段partyCode,单独处理
        elif key == "partyCode" or key == "bizPayNo":
        # 添加产品数据库断言的时候,partyCode要使用参数替换时候的partyCode,不能在sql语句替换的时候再去生产partyCode
            if hasattr(HandleAttr,key):
                pass
            else:
                setattr(HandleAttr,key,str(int(time.time()*1000)))
        # 如果是手机号(注册用户),单独生产未注册的手机号
        elif key == "mobile":
            if hasattr(HandleAttr, key):
                pass
            else:
                print("生成为注册的手机号")
                phone = self.handle_phone.get_phone()
                setattr(HandleAttr,key,str(phone))
  
    #替换sql语句中的参数
    def replace_sql(self,sql,assert_db_info):
        # 获取需要替换的参数的名称,返回类型list
        key_list = self.__get_replace_keys(data=sql)
        if len(key_list)>0:
            # 获取key的值,然后设置为类属性
            for key in key_list:
                self.__get_data_and_set_attribute(key=key,user_info=assert_db_info)
            #从类属性中查询到key的值,然后替换
            for key in key_list:
                sql = sql.replace(f"#{key}#", str(getattr(HandleAttr, key)))
            return sql
        else:
            print("不需要替换sql")
            return sql
    # 执行sql语句,将sql语句查询出来的key,val设置为类属性的key,val
    def __execute_sql_and_setattr(self,sql):
        # 调sql执行方法,获取到数据
        # 拿到数据并设置为属性
        result = mysql.get_data_dict(sql=sql)
        for dict_data in result:
            for key,value in dict_data.items():
                setattr(HandleAttr,key,str(value))
    # 调用数据来源设置属性方法,设置类属性
    def __set_attribute(self,key_list,setup_sql,setup_sql_info):
        """
        : param setup_sql (str):接口执行之前,需要从通过sql查询来获取替换数据,可以兼容多条[sql1,sql2]
        :param key_list: ["user_name","session_uuid"]
        :return:
        """
        if setup_sql:
            for sql in ast.literal_eval(setup_sql):
                # 替换sql语句中需要替换的参数
                new_sql = self.replace_sql(sql=sql,assert_db_info=setup_sql_info)
                # 执行sql语句,并设置为属性
                self.__execute_sql_and_setattr(sql=new_sql)
        # 只对excel请求参数进行属性设置
        for key in key_list:
            #key的值的来源配置文件
            #通过脚本生成的数据
            #响应结果
            #数据库
            self.__get_data_and_set_attribute(key=key,user_info=user_info)
    # 获取参数,不同的数据,来源是不是不一样,然后设置为类属性
    # data (str): excel中读取到的请求参数
    def replace_data(self,data,setup_sql,setup_sql_info):
        try:
            myLog.info(msg="replace_data参数替换入参:")
            myLog.info(msg=f"data:{data},{type(data)}")
            myLog.info(msg=f"setup_sql:{setup_sql},{type(setup_sql)}")
            myLog.info(msg=f"setup_sql_info:{setup_sql_info},{type(setup_sql_info)}")
            if data:
                # 删除换行符和空格键,返回类型str
                new_data = self.__handle_str(data=data)
                # 获取需要替换的参数的名称,返回类型list
                key_list = self.__get_replace_keys(data=new_data)
                myLog.info(msg=f"需要替换的参数名称:{key_list}")
                if len(key_list)>0:
                    # 当 len(key_list)>0 说明 参数里面需要替换数据
                    # 获取参数,并设置为类属性,无返回值
                    self.__set_attribute(key_list=key_list,setup_sql=setup_sql,setup_sql_info=setup_sql_info)
                    # 从类属性中通过key_list里面的key获取值,然后替换请求参数
                    for key in key_list:
                        new_data = new_data.replace(f"#{key}#",str(getattr(HandleAttr,key)))
                    new_data = ast.literal_eval(new_data)
                    new_data["t"] = int(time.time()*1000)
                    myLog.info(msg=f"参数替换完成后返回数据:{new_data},{type(new_data)}")
                    return new_data
                else:
                    new_data = ast.literal_eval(new_data)
                    new_data["t"] = int(time.time() * 1000)
                    myLog.info(msg=f"不需要参数替换,返回数据:{new_data},{type(new_data)}")
                    return new_data
            else:
                myLog.info(msg="没有请求参数,不需要做任何事情,返回空的dict")
                return {}
        except Exception as e:
            myLog.error(msg=f"replace_data参数替换方法执行报错,错误原因{e}")
            myLog.exception(e) #将错误信息收集到日志文件
            raise TypeError #手动抛出异常给框架
if __name__ == '__main__':
    print(case_list[0])
    cl = HandleReplace()
    cl.replace_data(data=case_list[0]["data"])
接口依赖提取工具handle_extract

定义了一个名为 
HandleExtract
的类,提供从接口响应中通过 JSONPath 表达式提取数据并存储为全局变量的方法

ast​​:用于安全地将字符串转换为字典(
ast.literal_eval
)​​jsonpath​​:通过 
jsonpath
函数根据 JSONPath 表达式从响应数据中提取值​​tools.handle_attribute.HandleAttr​​:自定义模块,用于操作全局属性、当不确定json结构深度时,使用
$..key的jsonpath表达式最合适;

​​
handle_extract(self, extract_data, response)
​​:通过 JSONPath 表达式从接口响应中提取指定数据并存储为全局属性;



"""
逻辑:
1、在excel中新增extract_data,用于存储提取数据的key以及提取表达式(jsonpath)
2、在请求需要鉴权的接口之前,去请求登陆接口,读取extract_data中的数据,获取字典的key(响应结果中key),values(jsonpath表达式),从响应结果中提取到鉴权信息,设置到类属性作为全局变量
3、如果是鉴权,就在请求需要鉴权的接口之前,将这个鉴权的token设置到请求头里面
4、如果是参数依赖,其他接口在发请求之前,去获取到相应的参数,替换自己的请求参数
"""
 
import ast
from jsonpath import jsonpath
 
 
from tools.handle_attribute import HandleAttr
 
class HandleExtract:
    # excel中extract_data字段的数据,response接口响应结果
    def handle_extract(self,extract_data,response):
        if extract_data:
            extract_data = extract_data if isinstance(extract_data,dict) else ast.literal_eval(extract_data)
            for key,value in extract_data.items():
                token = jsonpath(response,value)[0]
                #将提取出的token鉴权设置为全局变量
                setattr(HandleAttr,key,token)
        else:
            print("extract_data数据为空,不需要提取全局变量")
 
if __name__ == '__main__':
    cl = HandleExtract()
    #"$..access_token"是jsonpath提取表达式,选择所有名为access_token的属性
    extract_data = {"access_token":"$..access_token"}
    response={'access_token': 'aef4b927-ffa0-4c0c-a8a1-a844c9a7a423', 'token_type': 'bearer', 'refresh_token': '58edfd15-1f90-49f0-9094-7ad74eb72620', 'expires_in': 1295999}
    cl.handle_extract(extract_data,response)
数据库操作工具handle_db

定义了一个名为
HandleDb
的类,提供了连接数据库、关闭连接、执行SQL并返回结果(两种形式:纯值列表和字典列表)的方法。

导入pymysql模块,以及从conf.setting模块导入mysql_info(数据库配置信息)


__init__(self, mysql_info)
:初始化方法,接收一个包含数据库连接信息的字典(mysql_info)来建立数据库连接;
db_close(self)
:关闭游标和数据库连接;
get_datas(self, sql)
:执行传入的SQL语句,并返回一个列表。这个列表包含所有查询结果中每一行数据的每个值(按顺序);


get_data_dict(self, sql)
:执行传入的SQL语句,并返回整个结果集(一个列表,列表中的每个元素是一个字典,表示一行数据,字典的键是字段名,值是对应的值)。

例如:[{'mobile_code': '1234', 'user_phone':'18711277355', 'user_id':1}, …]



import pymysql
#从配置文件中导入数据库登陆信息
from conf.setting import mysql_info
 
# **mysql_info
class HandleDb:
 
    def __init__(self,mysql_info):
        self.db = pymysql.connect(
            host=mysql_info["host"],
            port=mysql_info["port"],
            user=mysql_info["user"],
            password=mysql_info["password"],
            db=mysql_info["db"],
            #设置自动提交
            autocommit=True,
            #DictCursor设置成字典类型,字典嵌套在list中,一行数据为一个字典;
            cursorclass=pymysql.cursors.DictCursor
        )
        #游标连接
        self.cur = self.db.cursor()
 
 
    #关闭数据库
    def db_close(self):
        #先关游标
        self.cur.close()
        #再关数据库连接
        self.db.close()
 
    #获取数据,sql查询到什么数据,我们就返回
    def get_datas(self,sql):
        value_list = []
        #执行sql语句
        self.cur.execute(sql)
        #获取sql查询的数据
        result = self.cur.fetchall()
        for __dict in result:
            for val in __dict.values():
                value_list.append(val)
        return value_list
 
    # 返回dict类型的数据
    def get_data_dict(self,sql):
        self.cur.execute(sql)
        result = self.cur.fetchall()
        return result
 
mysql = HandleDb(mysql_info=mysql_info)
 
if __name__ == '__main__':
    cl = HandleDb(mysql_info=mysql_info)
    cl.get_data_dict(sql=sql1)
SQL格式化与空格清理工具​handle_space

定义了一个名为 
HandleSpace
的类,提供清理字符串中的空格和换行符、识别并格式化SQL语句(在关键字前后添加空格)的方法,最终返回处理后的字典数据。​

ast​​:用于安全地将字符串转换为字典(
ast.literal_eval


handle_sql(self, dict_str)

输入:Excel中的字符串数据(可能包含SQL语句或普通字符串)处理过程:

移除字符串中的空格、换行符(

)和转义字符(
n
)将字符串安全转换为字典识别SQL语句(通过检测
SELECT/UPDATE/ALTER
等关键字)为SQL关键字前后添加空格实现格式化保留非SQL数据原样处理输出:清理后的字典数据



import ast
 
class HandleSpace:
    def handle_sql(self,dict_str): #dict_str (str): excel中assert_db字段数据(数据库断言数据,或者其他需要删除空格和换行符的字符串或者sql语句)
        str_info = ["n", "
", " "]
        sql_info =["SELECT","UPDATE","FROM","WHERE","ALTER","AS","DELETE","INSERT","ORDER","BY","DESC"]
        for i in str_info:
            # 删除空格和换行符
            dict_str = dict_str.replace(i, "")
        # 数据类型转换
        dict_str = ast.literal_eval(dict_str)
        for key, value in dict_str.items():
            # value.lower()
            # 判断valaue是sql语句就在关键字前后加空格处理,否则不加空格
            if isinstance(value, str):
                if value.startswith("SELECT") or value.startswith("UPDATE") or value.startswith("ALTER") or value.startswith("DELETE") or value.startswith("INSERT"):
                    # 给sql语句关键字前后加空格
                    for ii in sql_info:
                        value = value.replace(ii, " {} ".format(ii))
                    # 去掉sql开头和结尾的空格
                    value = value.strip()
                    # 替换sql语句
                    dict_str[key] = value
                else:
                    # 这里要还原一下不是sql语句的大小写字母,value = value.lower()
                    print("不是sql语句不用加空格")
            else:
                print("不是str类型默认不是sql语句")
        return dict_str。 #返回字典类型的数据
 
 
if __name__ == '__main__':
    cl = HandleSpace()
    test_dict = '{" expected_data": 1, "actual_ data ": "SEL ECT COU NT(*  )F  R OM tz_ atta ch_file WH  ERE file_path = '#file_path#'"}'
 
    cl.handle_sql(test_dict)
HTTP请求处理工具handle_request

定义了名为 
HandleRequests
 的​​HTTP请求处理工具类​​,实现了统一封装请求发送功能,支持普通接口调用和图片上传接口处理,自动处理鉴权头、URL参数替换和响应解析。


re
 – 正则匹配URL中的占位符
requests
 – 发送HTTP请求
requests_toolbelt.multipart
 – 处理多部分文件上传(图片上传)

分析项 所用模块/函数 解决的问题
​**
__hanele_headers(self)
**

hasattr
 + 
getattr
 + 
format
动态处理请求头:
根据 
HandleAttr
 类是否包含 
access_token
 属性,决定是否添加 
Authorization
 鉴权头。
​**
hasattr
**
Python 内置函数 安全检查 
HandleAttr
 类中是否存在 
access_token
 属性,避免因属性不存在导致程序崩溃。
​**
getattr
**
Python 内置函数 获取 
HandleAttr
 类中的 
access_token
 属性值,用于生成鉴权 Token。
​**
format
**
字符串方法

将 Token 值格式化为 
Authorization: bearer {token}
 的标准鉴权头格式。



import re  #re模块处理正则表达式
import requests
from requests_toolbelt.multipart import MultipartEncoder #使用 requests_toolbelt 发multipart/form-data请求
 
 
from tools.handle_attribute import HandleAttr
from tools.handle_reponse import HandleResponse
from tools.handle_path import image_dir
from conf.setting import image_info
 
class HandleRequests:
    def __init__(self):
        self.headers = {"Content-Type": "application/json; charset=UTF-8", "locale": "zh_cn"}
        self.handle_response = HandleResponse()
 
    #判断接口需要token和不需要token
    def __hanele_headers(self):
        if hasattr(HandleAttr,"access_token"):
            token = getattr(HandleAttr,"access_token")
            self.headers["Authorization"]="bearer{}".format(token)  #字符串格式化
        else:
            print("该接口不需要鉴权")
 
 
    # 图片上传请求处理(管理端后台上传图片)
    def __upload_image(self,method,url):
        #with语法自动关闭文件流,不需要file.close();file=image_dir文件绝对路径或相对路径;mode="rb"文件操作模式,r只读,b二进制方式;open函数负责打开文件;
        with open(file=image_dir,mode="rb") as image:
            from_data = MultipartEncoder(fields={
                # "file":("图片的名称.jpg","图片的二进制流","image/jpeg")
                "file": (image_info["file_name"], image, image_info["file_type"])
 
            })
            self.headers["Content-Type"]=from_data.content_type
            #记得缩进
            response = requests.request(method=method,url=url,data=from_data,headers=self.headers)
        self.headers["Content-Type"] ="application/json; charset=UTF-8"
        return response
 
 
 
   #请求接口地址替换数据
    def __replace_url(self,url:str):
        #在字符串中找到正则表达式所匹配的所有子串,并返回一个列表;url是待匹配的子串;
        key_list = re.findall("#(w.+?)#",url)
        if len(key_list)>0:
            for key in key_list:
                url = url.replace(f"#{key}#",str(getattr(HandleAttr,key)))
            print("替换后的请求地址:",url)
            return url
        else:
            print("不需要替换请求地址的数据")
            return url  #函数返回值放在return关键字后面,return后面什么也不写,或return也不写默认返回none;
 
 
 
    # 发送请求方法封装
    def send_requests(self,method,url,data,is_upload):
        # 鉴权请求头处理,有token和无token
        self.__hanele_headers()
        # 请求地址参数替换
        new_url = self.__replace_url(url=url)
        if str(is_upload) == "1":
            # 图片上传接口
            # 写图片上传的代码逻辑
            response = self.__upload_image(method=method,url=new_url)
            new_response = self.handle_response.handle_response(response=response)
            print("图片上传接口返回:",new_response)
            return new_response
        else:
            # 普通接口,需要token 和不需要token
            response = requests.request(method=method,url=new_url,json=data,headers=self.headers)
            new_response = self.handle_response.handle_response(response=response)
            return new_response
 
 
响应处理与断言工具handle_response

定义了名为 
HandleResponse
 的​​响应处理与断言工具类​​,继承自 
unittest.TestCase
,提供了响应数据格式化处理、接口响应断言和数据库断言的功能。


__handle_str
(利用 str.replace 清洗 Excel 数据中的换行符和空格)
handle_response
(基于 isinstance 和 requests 的 json()/text 方法,标准化 JSON/文本响应)
assert_reponse
(通过 jsonpath 动态提取字段、ast.literal_eval 安全转换预期值为字典,结合 unittest 的 assertEqual 实现动态断言)
assert_db
(调用 assertEqual数据库断言)

自定义函数 所用模块/方法 函数作用

handle_response(response)


isinstance


判断对象的数据类型并返回布尔值;
区分响应数据格式(JSON 或纯文本),确保后续处理逻辑适配不同接口返回类型

assert_reponse(expected_data, response)
动态提取实际响应字段,并与预期数据对比,执行自动化断言


__handle_str()
清洗数据

清洗excel中的预期结果的换行符(

)和空格,避免因数据格式错误(如 JSON 中存在非法空格)导致解析失败或断言不匹配


isinstance

判断对象的数据类型是字典dict并返回布尔值

ast.literal_eval(expected_data)
字符串转字典
将字符串格式的预期值(如 
"{'token_type': 'bearer'}"
)安全转换为字典,避免 
eval
 的安全风险。

jsonpath(response, "$..key")
动态字段提取
通过 JSONPath 表达式(如 
$..token_type
)从响应中提取动态字段值,支持嵌套结构,避免硬编码路径。

assert_db(extract_data, actual_data)

self.assertEqual()

直接比对数据库查询结果与预期数据,验证数据一致性。



import ast
import unittest
from jsonpath import jsonpath
 
class HandleResponse(unittest.TestCase):
 
    def __handle_str(self,data:str):    #data:excel中获取的请求参数
        for str_data in ["
"," "]:
            data = data.replace(str_data,"")
        return data             #去掉空格后的请求参数
 
    # 响应结果处理
    def handle_response(self,response):
        try:
            if isinstance(response.json(),dict): #判断一个对象是否时一个已知的类型
                return {"response_type":"json","response":response.json()}
        except Exception as e :
            if isinstance(response.text,str):
                return {"response_type":"str","response":response.text}
            else:
                return {}
 
    def assert_reponse(self,expected_data,response):
        # 期望结果: expected_data = {"token_type": "bearer"}
        # 实际结果: actual_data = {"token_type":"bearer"}
        if expected_data:
            #去空格
            expected_data = self.__handle_str(data=expected_data)
            # 三元运算符 [结果为真的值] if [条件] else [结果为假的值]
            expected_data = expected_data if isinstance(expected_data,dict) else ast.literal_eval(expected_data)
            actual_data = {}
            if response["response_type"] == "json":
                for key in expected_data:
                    actual_data[key]=jsonpath(response,f"$..{key}")[0]
                self.assertEqual(expected_data,actual_data)
            else:
                for key in expected_data:
                    actual_data[key] = response["response"]
                self.assertEqual(expected_data, actual_data)
        else:
            print("预期结果为空,不需要做接口响应的断言")
 
    # 断言数据库
    def assert_db(self,extract_data,actual_data):
        self.assertEqual(extract_data,actual_data)
 
 
 
if __name__ == '__main__':
    cl = HandleResponse()

日志管理器工具handle_logs

利用python自带模块logging创建日志器,根据日志不同级别(info,debug,warning,error,critical),统一日志输出格式、渠道、级别,执行结果的记录,将框架运行中产生的有效日志保存到指定的.txt结尾的文本文件;



import logging
from logging import handlers
from tools.handle_path import log_dir_name
 
 
def my_log():
    # 1、创建日志收集器
    project_name = logging.getLogger(name="project_name")
 
    # 2、创建日志收集渠道
    #控制台
    pycharm = logging.StreamHandler()
    file = handlers.TimedRotatingFileHandler(filename=log_dir_name,when="D",interval=1,encoding="utf-8")
 
    # 3、创建日志格式
    fmt = "【%(asctime)s-%(name)s-%(levelname)s-%(pathname)s-%(funcName)s-%(lineno)d】>>>:%(message)s"
    pycharm_fmt = logging.Formatter(fmt=fmt)
 
    # 4、渠道绑定日志格式
    pycharm.setFormatter(fmt=pycharm_fmt)
    file.setFormatter(fmt=pycharm_fmt)
 
    # 5、日志收集器设置日志级别
    project_name.setLevel(level=logging.DEBUG) #日志收集器设置日志级别
 
    # 6、给日志收集器绑定渠道
    project_name.addHandler(pycharm)
    project_name.addHandler(file)
    return project_name
 
 
myLog = my_log()
 

日志级别的优先级
从低到高为:​DEBUG < INFO < WARNING < ERROR < CRITICAL

触发顺序的逻辑

时间顺序:日志记录的顺序取决于事件发生的实际时间。例如,如果先发生了一个 
WARNING
,随后发生了一个 
ERROR
,则日志中会先记录 
WARNING
,再记录 
ERROR
。​严重性决定级别:问题的严重性决定使用哪个级别,而非“先报哪个”。例如:
WARNING:用于不影响流程但需注意的问题(如非关键字段缺失)。​ERROR:用于直接导致测试失败的问题(如接口返回 500 错误)。

过滤机制
日志系统会根据设置的日志级别过滤记录。例如:

若设置级别为 
ERROR
,则仅记录 
ERROR
 和 
CRITICAL

在测试框架中的实际应用

当测试用例遇到致命问题(如接口超时、断言失败)时,应立即记录为 ​ERROR,并可能终止测试步骤。当遇到可容忍的问题(如响应时间略长、非必选字段缺失)时,记录为 ​WARNING,测试继续执行。
应用场景

开发阶段:多用
DEBUG
定位细节问题。​日常执行:默认
INFO

WARNING
,平衡信息量和可读性。​异常排查:通过
ERROR
快速定位失败原因,
CRITICAL
优先处理阻塞性问题。

级别 核心作用 典型应用场景
DEBUG 最详细的调试信息 记录请求体、响应体、内部变量值、中间步骤数据
INFO 关键流程节点记录 记录接口请求URL、状态码、测试用例开始/结束
WARNING 潜在问题预警 响应超时、参数缺省使用默认值、非阻塞性异常
ERROR 明确的错误事件 接口调用失败、断言失败、数据解析异常
CRITICAL 致命错误导致系统崩溃 环境不可用(如数据库断开)、测试框架自身崩溃
© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容