接口测试框架¶
数据依赖模块¶
- 从json中读取依赖
{
"infoq_004": {
"payload": {
"$aid": {"infoq_002": "data.aid"},
"$comment_id": {"infoq_002": "data.id"},
"content": "正常回复"
}
}
}
# 从json文件中获取case依赖数据
case_load = get_value_from_path(self.case_load_path, f"{case_id}")
# 从依赖数据中取出payload
payload = case_load.get("payload")
# 判断payload是否存在
if payload:
logging.info(f"{case_id}的请求参数: {payload}")
# 判断payload是否有依赖
for k,v in payload.items():
# 获取依赖
if k[0] == "$":
for depended_case_id, depended_key in v.items():
logging.info(f"{'-'*10}")
logging.info(f"需要依赖{depended_case_id}的「{depended_key}」字段")
# 逐层递归查看依赖是否还有依赖,请求依赖接口获取数据
depended_case_index = self.get_row_index(depended_case_id)
depend_case_response = self.run_case(depended_case_index, depended_case_id)
logging.info(f"{depended_case_id}的返回结果: {depend_case_response.json()}")
logging.info(f"{depended_case_id}的返回状态码: {depend_case_response.status_code}")
logging.info(f"{'-'*10}")
# 根据被依赖的key去依赖case的返回结果中取到对应的依赖值,赋值给依赖key
# 这里的depend_case_response需要使用json格式
payload[k.lstrip("$")] = get_value_from_obj(depend_case_response.json(), depended_key)
# 因为前面size增加了,这里size减小相当于size没变,所以迭代时不会报错
payload.pop(k)
logging.info(f"{case_id}的请求参数(替换依赖后): {payload}")
else:
payload = None
logging.info(f"{case_id}的请求参数: {payload}")
- 从excel中读取依赖
# 从excel获取payload
payload_str = self.get_field(index, "payload")
if payload_str:
payload = json.loads(payload_str)
logging.info(f"{case_id}的请求参数: {payload}")
# 判断payload是否有依赖
for k,v in payload.items():
# 获取依赖
if k[0] == "$":
for depended_case_id, depended_key in v.items():
logging.info(f"{'-'*10}")
logging.info(f"需要依赖{depended_case_id}的「{depended_key}」字段")
# 逐层递归查看依赖是否还有依赖,请求依赖接口获取数据
depended_case_index = self.get_row_index(depended_case_id)
depend_case_response = self.run_case(depended_case_index, depended_case_id)
logging.info(f"{depended_case_id}的返回结果: {depend_case_response.json()}")
logging.info(f"{depended_case_id}的返回状态码: {depend_case_response.status_code}")
logging.info(f"{'-'*10}")
# 根据被依赖的key去依赖case的返回结果中取到对应的依赖值,赋值给依赖key
# 这里的depend_case_response需要使用json格式
payload[k.lstrip("$")] = get_value_from_obj(depend_case_response.json(), depended_key)
# 因为前面size增加了,这里size减小相当于size没变,所以迭代时不会报错
payload.pop(k)
logging.info(f"{case_id}的请求参数(替换依赖后): {payload}")
else:
payload = None
logging.info(f"{case_id}的请求参数: {payload}")
数据校验模块¶
主要封装了deepdiff模块
可校验字段类型,字段值,是否缺少字段,还可以忽略校验
{
"infoq_002": {
"expect_response": {
"data":{
"content": "正常评论"
}
},
"is_check_type": 1,
"is_check_value": 1,
"uncheck_list_type": [],
"uncheck_list_value": []
}
}
import json
import jsonpath
from deepdiff import DeepDiff
# 从json文件读取json字段对应的值
def get_value_from_path(json_path, key):
key_rex = "$." + key
with open(json_path, "r", encoding='UTF-8') as f:
json_obj = json.load(f)
result = jsonpath.jsonpath(json_obj, key_rex) # 得到一个单值列表
if result:
for i in result: # 从列表中循环出来
return i
else:
return None
# 从字典对象读取json字段对应的值
def get_value_from_obj(json_obj, key):
key_rex = "$." + key
result = jsonpath.jsonpath(json_obj, key_rex) # 得到一个单值列表
if result:
for i in result: # 从列表中循环出来
return i
else:
return None
class HandleDiff():
def __init__(self, expected_dict, actually_dict):
self.expected_dict = expected_dict
self.actually_dict = actually_dict
self.first_key = self.loop(self.expected_dict)
self.first_key_root = self.convert_to_root(self.first_key)
self.result = DeepDiff(self.expected_dict, self.actually_dict, ignore_order=True, ignore_string_case=True, verbose_level=0).to_dict()
self.result.pop("dictionary_item_added", None)
# print("Diff1:", self.result, "\n", "======")
# 循环dict中的每一个key及子key,返回list
@staticmethod
def loop(x):
_list=[]
def inner_loop(key):
value = get_value_from_obj(x, key)
if type(value) == dict:
for i in value:
r = f"{key}.{i}"
_list.append(r)
inner_loop(r)
for i in x:
_list.append(i)
inner_loop(i)
return _list
# 将a.b形式的key转换为root[a][b]形式
@staticmethod
def convert_to_root(x):
_list = []
for key in x:
temp = []
for i in key.split("."):
j = "['" + i + "']"
temp.append(j)
r = "root" + "".join(temp)
_list.append(r)
return(_list)
# 整合
def check(self, is_check_type=0, is_check_value=0, uncheck_list_type=[], uncheck_list_value=[]):
report = {}
if self.result:
dict_item_removed = self.result.get("dictionary_item_removed")
type_changes = self.result.get("type_changes")
if dict_item_removed:
temp = []
for i in dict_item_removed:
if i in self.first_key_root:
temp.append(i)
if temp:
report["实际结果比预期缺少字段"] = temp
if is_check_type:
if type_changes:
if uncheck_list_type:
uncheck_list_root = self.convert_to_root(uncheck_list_type)
for i in uncheck_list_root:
if i in type_changes:
type_changes.pop(i, None)
report["实际结果中值的类型与预期不符"] = type_changes
if is_check_value:
if uncheck_list_value:
uncheck_list_root = self.convert_to_root(uncheck_list_value)
self.result = DeepDiff(self.expected_dict, self.actually_dict, exclude_paths=uncheck_list_root , ignore_order=True, ignore_string_case=True, verbose_level=0).to_dict()
# 这里很关键
x1 = report.get("实际结果比预期缺少字段", "")
x2 = self.result.get("dictionary_item_removed", "")
report["实际结果比预期缺少字段"] = set(x1) | set(x2)
self.result.pop("dictionary_item_removed", None)
self.result.pop("type_changes", None)
self.result.pop("dictionary_item_added", None)
report["实际结果中的值与预期不一致"] = self.result
# 去除一些空的情况:{'实际结果中值的类型与预期不符': {}, '实际结果比预期缺少字段': set(), '实际结果中的值与预期不一致': {}}
empty_key = [k for k,v in report.items() if not v]
for k in empty_key:
report.pop(k, None)
return report
if __name__ == "__main__":
old = {
"a":1,
"b":[2,3,4],
"c":{
"d":4,
"e":[{
"id": 81,
"name": "区块链",
"alias": "blockchain"
},2]
},
"f":5
}
new = {
"a":1,
"b":[2,3,4],
"c":{
"d":4,
"e":[{
"id": 81,
"name": "区块链",
"alias": "blockchain"
},2]
},
"f":5,
"g":6
}
is_check_type = 1
is_check_value = 1
uncheck_list_type = []
uncheck_list_value = []
diff_json = HandleDiff(old, new)
report = diff_json.check(is_check_type, is_check_value, uncheck_list_type, uncheck_list_value)
print(report)
def check(self, index, case_id, run_response):
case_load = get_value_from_path(self.case_load_path, f"{case_id}")
col_index_result = self.get_col_index("test_result")
expect_response = case_load.get("expect_response")
logging.info(f"{case_id}期望返回: expect_response")
if expect_response:
is_check_type = case_load.get("is_check_type")
is_check_value = case_load.get("is_check_value")
uncheck_list_type = case_load.get("uncheck_list_type")
uncheck_list_value = case_load.get("uncheck_list_value")
# 结果对比
# 这里run_response要使用json格式
diff_json = HandleDiff(expect_response, run_response.json())
report = diff_json.check(is_check_type, is_check_value, uncheck_list_type, uncheck_list_value)
# 将测试结果写入Excel
if report:
# 标失败色
self.write_test_result(index, col_index_result, "Fail", "ff4c00")
# 失败原因
col_index_fail_cause = self.get_col_index("fail_cause")
self.write_test_result(index, col_index_fail_cause, f"{report}")
# extra
col_index_extra = self.get_col_index("extra")
self.write_test_result(index, col_index_extra, f"request-id: {run_response['extra']['request-id']}")
else:
# 标成功色
self.write_test_result(index, col_index_result, "Pass", "00e500")
else:
self.write_test_result(index, col_index_result, "未配置expect_response,无法判断测试是否成功")
最后更新:
2021-08-17