5

利用pytest hook函数实现自动化测试结果推送企业微信

 2 years ago
source link: https://blog.51cto.com/u_12612738/5542722
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

通常,自动化测试用例在执行完成后,都会发送一个结果,以通知测试人员或测试leader测试的结果。如有测试失败的情况,测试人员再去查看具体的测试报告,检查是哪个场景没有测试通过。当前较为流行的提醒方式有:

  • 企业微信、钉钉等push消息

由于我们公司所使用的办公软件是企业微信,因此,在实现测试结果通知提醒的功能时,选用的是企业微信。当前较为流行的实现方式有两种形式:

  • 企业微信应用通知:需要在企业微信中创建一个应用,再获取Secret
  • 普通群消息推送:需要在群中添加一个群机器人(会自动生成webhook_url,以供后续接口调用)

由于方式一需要在企业微信中创建应用(需要管理员操作权限),总体实现起来较为繁琐,因此我选用的是第二种群机器人的实现方式。

一、实现原理及实现效果

1.外部链路流程

利用pytest hook函数实现自动化测试结果推送企业微信_自动化测试

2.内部调用原理及过程

利用pytest hook函数实现自动化测试结果推送企业微信_pytest hook_02
1)各模块&方法功能:
  • RedisHandler基类:用于初始化redis连接、查询数据、写入数据
  • CaseCount基类:用于初始化用例统计、获取统计成功&失败&跳过&报错的用例数、计算用例通过率
  • EnterpriseWechatNotification基类:用于定义发送企业微信消息的内容模板、定义调用hook_url(群机器人)发送消息方法
  • hook方法pytest_runtest_makereport:用于获取pytest执行后的测试结果、将结果写入缓存、生成控制台测试报告
  • hook方法pytest_terminal_summary:用于获取执行结果、调用发送消息方法发送微信消息
2)具体调用原理、流程:
  • 已添加企业微信群机器人,并记住hook地址;
  • python+pytest已编写测试用例;
利用pytest hook函数实现自动化测试结果推送企业微信_自动化测试_03

② pytest运行测试用例,RedisHandler连接redis                                     ,pytest_runtest_makereport获取用例执行结果,并:

  • 调用RedisHandler中的写入缓存方法,将结果写入缓存;
  • 调用CaseCount中的计算用例通过率方法获取用例通过率;
  • 将获取到的各条测试结果分输出到控制台进行展示:↓(Windows本地运行效果)
利用pytest hook函数实现自动化测试结果推送企业微信_Jenkins_04

③ pytest_terminal_summary方法:

  • 分别调用CaseCount中的获取通过、失败、跳过、报错的用例条数的方法(此方法调用RedisHandler中的get_key方法),获取到各个(通过、失败、跳过、报错)执行结果统计;
  • 调用CaseCount中的计算用例通过率方法获取用例通过率;
  • 调用EnterpriseWechatNotification中的发送企业微信消息方法,将获取到的各个(通过、失败、跳过、报错)执行结果的数量统计与EnterpriseWechatNotification中预定义的模板进行拼接,发送到企业微信;
  • 将获取到的各个(通过、失败、跳过、报错)执行结果与用例通过率一起,输出到控制台展示:↓(Windows本地运行效果)
利用pytest hook函数实现自动化测试结果推送企业微信_自动化测试_05

二、编码实现

1.各个基类

RedisHandler基类:
import redis


class RedisHandler:
def __init__(self, host, port=6379, db=0):
self.client = redis.StrictRedis(host=host, port=port, db=db) # 生成客户端连接,StrictRedis()默认使用连接池,不必再单独使用ConnectPool

def set_string(self, name: str, value, ex=None, px=None, nx=False, xx=False) -> None:
"""
缓存中写入str(单个)
:param name: 缓存名称
:param value: 缓存值
:param ex: 过期时间(秒)
:param px: 过期时间(毫秒)
:param nx: 如果设置为True,则只有name不存在时,当前set操作才执行(新增)
:param xx: 如果设置为True,则只有name不存在时,当前set操作才执行(修改)
:return:
"""
self.client.set(name, value=value, ex=ex, px=px, nx=nx, xx=xx)

def incr(self, key):
"""
使用incr方法,处理并发问题
当key不存在时,会先初始为0,每次调用,则会+1
:param key:
:return:
"""
self.client.incr(key)

def get_key(self, name):
"""读取缓存"""
result = self.client.get(name)
return result
CaseCount基类:
from api_test.common.redis_handler import RedisHandler
from api_test.config.db_config import DBConfig


class CaseCountName:
ERROR: str = "error_count"
FAILED: str = "failed_count"
PASSED: str = "passed_count"
SKIP: str = "skip_count"
TOTAL: str = "total_count"


class CaseCount:
"""
redis 缓存统计用例执行情况
"""

def __init__(self):
self.redis = RedisHandler(host=DBConfig.redis_config.get('host')) # redis主机地址可以写死在这里,也可以从配置类中获取

def init_process(self):
"""
初始化进度、总数、成功数、失败数
"""
self.redis.set_string(CaseCountName.TOTAL, 0)
self.redis.set_string(CaseCountName.SKIP, 0)
self.redis.set_string(CaseCountName.PASSED, 0)
self.redis.set_string(CaseCountName.FAILED, 0)
self.redis.set_string(CaseCountName.ERROR, 0)

def failed_count(self):
"""失败用例数"""
return int(self.redis.get_key(CaseCountName.FAILED))

def passed_count(self):
"""通过用例总数"""
return int(self.redis.get_key(CaseCountName.PASSED))

def skip_count(self):
"""跳过用例数"""
return int(self.redis.get_key(CaseCountName.SKIP))

def error_count(self):
"""报错用例数"""
return int(self.redis.get_key(CaseCountName.ERROR))

def total_count(self):
"""用例总数"""
return int(self.redis.get_key(CaseCountName.TOTAL))

def pass_rate(self):
"""用例成功率"""
try:
rate = round((self.passed_count() + self.skip_count()) / self.total_count() * 100, 2)
return rate
except ZeroDivisionError:
raise Exception("执行失败,未检测到用例执行数量")
EnterpriseWechatNotification基类:
import os
import json
import requests
import platform


def get_env_from_jenkins(name, base=''):
"""从Jenkins中获取全局环境变量"""
return os.getenv(name) and os.getenv(name).strip() or base


ProjectName = get_env_from_jenkins("JOB_NAME") # Jenkins构建项目名称
BUILD_URL = get_env_from_jenkins("BUILD_URL") # Jenkins构建项目URL
BUILD_NUMBER = get_env_from_jenkins("BUILD_NUMBER") # Jenkins构建编号


class EnterpriseWechatNotification:
def __init__(self, hook: list):
# 企业微信群机器人的hook地址,一个机器人就一个,多个就定义多个,可以写死,也可以写在配置类中
self.hook_url_list = [f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={i}" for i in hook]
# allure生成报告的地址,Jenkins执行时会用到,Windows暂未配置allure地址
self.allure_url = f"http://192.168.1.122:8088/jenkins/job/{ProjectName}/{BUILD_NUMBER}/allure/"
self.header = {'Content-Type': 'application/json'}

def send_msg(self, result=''):
"""发送企业微信消息通知"""
global payload
linux_content = f"""** 【{ProjectName}】**
> 项目名称:{ProjectName}
> 构件编号:#{BUILD_NUMBER}
> 测试环境:{platform.system()}
> [报告链接]({self.allure_url})
> [控制台链接]({BUILD_URL})
{result}"""

windows_content = f"""** 【auto_test_project】**
> 测试环境:{platform.system()}
{result}"""
if platform.system() == "Linux":
payload = {
"msgtype": "markdown",
"markdown": {
"content": linux_content
}
}
elif platform.system() == "Windows":
payload = {
"msgtype": "markdown",
"markdown": {
"content": windows_content
}
}
for hook_url in self.hook_url_list:
requests.post(url=hook_url, headers=self.header, data=json.dumps(payload))

注意事项: get_env_from_jenkins方法为从Jenkins获取全局变量,查看全局变量的路径为:Jenkins流水线语法-全局变量-env,见下图:

利用pytest hook函数实现自动化测试结果推送企业微信_Jenkins_06

2.pytest的hook方法,定义在conftest.py中

import time
import pytest
from api_test.config.config import HookUrlConf
from api_test.common.send_enterprise_wechat import EnterpriseWechatNotification
from api_test.common.redis_handler import RedisHandler
from api_test.config.db_config import DBConfig
from api_test.common.case_count_control import CaseCountName, CaseCount

redis = RedisHandler(host=DBConfig.redis_config.get('host'))
case_count = CaseCount()
case_count.init_process() # 初始化Redis中的用例统计缓存数据


@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_makereport(item, call):
"""获取测试结果、生成测试报告"""
print('------------------------------------')
out = yield
report = out.get_result()
if report.when == 'call':
# print(f"测试报告:{report}")
# print(f"步骤:{report.when}")
print(f"用例id:{report.nodeid}")
print(f"用例描述:{str(item.function.__doc__)}")
print(f"运行结果:{report.outcome}")
"""将用例执行结果写入缓存"""
if report.outcome == 'passed':
redis.incr(CaseCountName.PASSED)
if report.outcome == 'failed':
redis.incr(CaseCountName.FAILED)

if report.when == 'setup':
if report.outcome == 'skipped':
redis.incr(CaseCountName.SKIP)

CaseCount().total_run_count()


def pytest_terminal_summary(terminalreporter, exitstatus, config):
"""收集测试结果,从Redis缓存数据中获取"""
total_case = case_count.total_count()
pass_case = case_count.passed_count()
fail_case = case_count.failed_count()
skip_case = case_count.skip_count()
error_case = case_count.error_count()
pass_rate = case_count.pass_rate()
run_time = round((time.time() - terminalreporter._sessionstarttime), 2)
print("******用例执行结果统计******")
print(f"总用例数:{total_case}条")
print(f"通过:{pass_case}条")
print(f"失败:{fail_case}条")
print(f"跳过:{skip_case}条")
print(f"报错:{error_case}条")
print(f"用例通过率:{pass_rate}%")
print(f"用时:{run_time}s")
desc = """
本次执行情况如下:
总用例数为:{}
通过用例数:<font color=\"info\">{}条</font>
失败用例数:<font color=\"warning\">{}条</font>
错误用例数:{}
跳过用例数:{}
通过率为:{} %
用时:{}s
""".format(total_case, pass_case, fail_case, error_case, skip_case, pass_rate, run_time)
EnterpriseWechatNotification(hook=HookUrlConf.HOOK_URL.value).send_msg(desc) # 执行结果发送企业微信

三、运行过程与运行效果

1.运行过程

  • Windows本地运行
  • Jenkins触发运行

2.企业微信接收到测试结果通知:

  • 通过Jenkins触发运行的通知效果:↓
利用pytest hook函数实现自动化测试结果推送企业微信_自动化测试_07
  • Windows本地手动触发运行的通知效果:↓
利用pytest hook函数实现自动化测试结果推送企业微信_pytest hook_08

小结


以上就是利用pytest的hook函数(pytest_runtest_makereport、pytest_terminal_summary‍)+redis,实现发送测试结果到企业微信的原理及过程,当然还有一些不足之处,如:

  • 不管是接口自动化测试还是UI自动化测试都可以通过这种方式来实现消息通知;
  • 除了在代码中调用pytest hook函数实现消息通知外,Jenkins也可以通过安装插件达到邮件通知、执行Python脚本达到企微消息通知的目的;
  • 测试结果的存储不一定要用到redis,也可以写在本地文件等,因为多一层调用,就多一层处理和可能面临的报错,另外redis所在服务器连接出错也会影响用例的正常运行;
  • 发送消息的内容样式支持Markdown,发送内容还可以继续优化,比如:通知哪条用例报错等等;

更多实用干货,同步首发于微信公众号【测试开发实战】,欢迎关注!

利用pytest hook函数实现自动化测试结果推送企业微信_自动化测试_09

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK