Commit e71631cd by taoke
parents 1fe343dd 49752ba9
...@@ -3,7 +3,21 @@ __author__ = 'jiaqiying' ...@@ -3,7 +3,21 @@ __author__ = 'jiaqiying'
__data__ = "2021-05-06 03:01" __data__ = "2021-05-06 03:01"
from enum import Enum from enum import Enum
# 用例优先级枚举
class CaseGrade(Enum): class CaseGrade(Enum):
LOW = 0 LOW = 0
MIDDLE = 1 MIDDLE = 1
HIGH = 2 HIGH = 2
\ No newline at end of file
# 产品名称枚举
class ProductName(Enum):
# 斑马信用app
BMC = "bmc"
# 企业云平台
BMY = "bmy"
# sso平台
SSO = "sso"
\ No newline at end of file
...@@ -9,23 +9,35 @@ from config import * ...@@ -9,23 +9,35 @@ from config import *
def request_main(url, headers, method, data): def request_main(url, headers, method, data):
"""封装requests的通用请求方法""" """封装requests的通用请求方法"""
res = None res = None
def request_by_method(method, headers):
inner_res = None
try:
header_content_type = headers["Content-Type"]
except KeyError:
header_content_type = headers["mimeType"]
try:
if method.upper() == "GET":
inner_res = requests.get(url=url, headers=headers, params=data)
elif method.upper() == "POST":
if header_content_type == "application/json":
inner_res = requests.post(url=url, headers=headers, data=json.dumps(data))
elif header_content_type in ["application/x-www-form-urlencoded"]:
inner_res = requests.post(url=url, headers=headers, data=data)
return inner_res
except Exception as e:
# logging.log(str(e))
raise Exception
if headers == None or headers == {} or headers == "": if headers == None or headers == {} or headers == "":
# 如果传的headers为空,使用各自产品的通用headers # 如果传的headers为空,使用各自产品的通用headers
headers = get_headers() headers = get_headers()
header_content_type = headers["Content-Type"]
try: try:
if method.upper() == "GET": res = request_by_method(method, headers)
res = requests.get(url=url, headers=headers, params=data) except requests.exceptions.ConnectionError as e:
elif method.upper() == "POST": logging.log(str(e))
if header_content_type in ["application/x-www-form-urlencoded"]: except requests.exceptions.RequestException as e:
res = requests.post(url=url, headers=headers, data=data)
elif header_content_type == "application/json":
res = requests.post(url=url, headers=headers, data=json.dumps(data))
except Exception as e:
logging.log(str(e)) logging.log(str(e))
raise Exception
if res != None: if res != None:
return res.json() return res.json()
return res return res
...@@ -38,16 +50,18 @@ def get_headers(): ...@@ -38,16 +50,18 @@ def get_headers():
headers = BMCConfig.headers headers = BMCConfig.headers
elif name == BmyConfig.name: elif name == BmyConfig.name:
headers = BmyConfig.headers headers = BmyConfig.headers
# elif name == SsoConfig.name: elif name == SSOConfig.name:
# headers = SsoConfig.headers headers = SSOConfig.headers
return headers return headers
def get_case_dir(product_name): def get_case_dir(product_name):
"""根据传入的产品名来运行对应产品的测试用例目录""" """根据传入的产品名来运行对应产品的测试用例目录"""
test_case_dir = BaseConfig.name test_case_dir = BaseConfig.test_case_dir
if product_name == BMCConfig.name: if product_name == BMCConfig.name:
test_case_dir = BMCConfig.test_case_dir test_case_dir = BMCConfig.test_case_dir
if product_name == BmyConfig.name: if product_name == BmyConfig.name:
test_case_dir = BmyConfig.test_case_dir test_case_dir = BmyConfig.test_case_dir
if product_name == SSOConfig.name:
test_case_dir = SSOConfig.test_case_dir
return test_case_dir return test_case_dir
\ No newline at end of file
# _*_ coding:utf-8 _*_
__author__ = 'fanxun'
__data__ = "2021-05-10 15:47"
import os, time
import logging
from logging.handlers import TimedRotatingFileHandler
from config import BaseConfig
# 日志级别
CRITICAL = 50
FATAL = CRITICAL
ERROR = 40
WARNING = 30
WARN = WARNING
INFO = 20
DEBUG = 10
NOTSET = 0
LOG_PATH = BaseConfig.log_path
class LogHandler(logging.Logger):
"""
日志处理
"""
def __init__(self, name, level=DEBUG, stream=True, file=True):
self.name = name
self.level = level
logging.Logger.__init__(self, self.name, level=level)
self.__determine_folder()
if stream:
self.__setStreamHandler__()
if file:
self.__setFileHandler__()
def __determine_folder(self):
"""判断日志路径是否存在"""
if not os.path.exists(LOG_PATH):
os.mkdir(LOG_PATH)
def __setFileHandler__(self, level=None):
"""
输出到文件
"""
file_name = os.path.join(LOG_PATH, f'{self.name}{time.strftime(r"%Y%m%d%H%M%S", time.localtime())}.log')
# 设置日志回滚, 保存在log目录, 一天保存一个文件, 保留7天
file_handler = TimedRotatingFileHandler(filename=file_name, when='D', interval=1, backupCount=7)
file_handler.suffix = '%Y%m%d.log'
if not level:
file_handler.setLevel(self.level)
else:
file_handler.setLevel(level)
formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
file_handler.setFormatter(formatter)
self.file_handler = file_handler
self.addHandler(file_handler)
def __setStreamHandler__(self, level=None):
"""
输出到控制台
"""
stream_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
stream_handler.setFormatter(formatter)
if not level:
stream_handler.setLevel(self.level)
else:
stream_handler.setLevel(level)
self.addHandler(stream_handler)
def resetName(self, name):
self.name = name
self.removeHandler(self.file_handler)
self.__setFileHandler__()
logger = LogHandler('log')
if __name__ == '__main__':
log = LogHandler('test')
log.info('this is a test msg')
log.error('hoo')
...@@ -16,6 +16,9 @@ class BaseConfig(): ...@@ -16,6 +16,9 @@ class BaseConfig():
# 钉钉相关 # 钉钉相关
webhook = '' webhook = ''
# 日志相关
log_path = r'' # 日志路径
class BMCConfig(BaseConfig): class BMCConfig(BaseConfig):
"""斑马信用app的配置类""" """斑马信用app的配置类"""
...@@ -68,8 +71,10 @@ class BmyConfig(BaseConfig): ...@@ -68,8 +71,10 @@ class BmyConfig(BaseConfig):
class SSOConfig(BaseConfig): class SSOConfig(BaseConfig):
"""SSO配置类""" """SSO配置类"""
name = "sso"
sso_username = 'robot_fanxun' # SSO登录名 sso_username = 'robot_fanxun' # SSO登录名
sso_password = 'fx123456' # sso密码 sso_password = 'fx123456' # sso密码
sso_url = r'http://testtbdzj.hikcreate.com/web/auth/users/login' # sso登录地址 sso_url = r'http://testtbdzj.hikcreate.com/web/auth/users/login' # sso登录地址
sso_salt = 'hikcreate_xj' # SSO盐值 sso_salt = 'hikcreate_xj' # SSO盐值
sso_token = '' sso_token = ''
headers = {'Content-Type': 'application/json'}
...@@ -8,14 +8,14 @@ from common.tools import get_case_dir ...@@ -8,14 +8,14 @@ from common.tools import get_case_dir
def get_parser(): def get_parser():
parser = argparse.ArgumentParser(description="argparse") parser = argparse.ArgumentParser(description="argparse")
parser.add_argument('--product', type=str, default=BaseConfig.name) parser.add_argument('--product', type=str, default=BaseConfig.current_name)
return parser return parser
if __name__ == "__main__": if __name__ == "__main__":
parser = get_parser() parser = get_parser()
args = parser.parse_args() args = parser.parse_args()
BaseConfig.name = args.product BaseConfig.current_name = args.product
# 获取要执行的产品的用例目录 # 获取要执行的产品的用例目录
test_case_dir = get_case_dir(args.product) test_case_dir = get_case_dir(args.product)
print("********此次执行的产品测试用例是:%s********"%test_case_dir) print("********此次执行的产品测试用例是:%s********"%test_case_dir)
......
import pytest,time import pytest,time
import requests import requests
from config import BaseConfig from config import SSOConfig
from common.utils.encryption import Encryption from common.utils.encryption import Encryption
# from common.utils.getExcelData import get_excelData # from common.utils.getExcelData import get_excelData
from common.tools import request_main from common.tools import request_main
...@@ -14,13 +14,13 @@ class SSOLogin(): ...@@ -14,13 +14,13 @@ class SSOLogin():
"""SSO登录""" """SSO登录"""
def _sso_pwd_encrypted(self, org_pwd): def _sso_pwd_encrypted(self, org_pwd):
"""md5加密""" """md5加密"""
encrypted_password = Encryption().get_md5(org_pwd, salt=BaseConfig.salt) encrypted_password = Encryption().get_md5(org_pwd, salt=SSOConfig.sso_salt)
return encrypted_password return encrypted_password
def sso_login(self,url, method, headers=None): def sso_login(self,url, method='post', headers=None):
"""SSO登录获取token""" """SSO登录获取token"""
encrypted_password = self._sso_pwd_encrypted(BaseConfig.password) encrypted_password = self._sso_pwd_encrypted(SSOConfig.sso_password)
req_data = {f"loginName":BaseConfig.username,"password":encrypted_password} req_data = {f"loginName":SSOConfig.sso_username,"password":encrypted_password}
res = request_main(url, headers, method, req_data) res = request_main(url, headers, method, req_data)
return res['data']['token'] return res['data']['token']
......
...@@ -2,14 +2,11 @@ __author__ = 'fanxun' ...@@ -2,14 +2,11 @@ __author__ = 'fanxun'
__data__ = "2021-05-08 10:26" __data__ = "2021-05-08 10:26"
import pytest import pytest
from config import SSOConfig from config import SSOConfig
from common.utils.encryption import Encryption from service.login import SSOLogin
from common.tools import request_main
@pytest.fixture(scope='module', autouse=True) @pytest.fixture(scope='module', autouse=True)
def sso_login(): def sso_login():
"""SSO登录获取token""" """SSO登录获取token"""
encrypted_password = Encryption().get_md5(SSOConfig.sso_password, salt=SSOConfig.sso_salt) sso_token = SSOLogin().sso_login(url=SSOConfig.sso_url)
req_data = {"loginName": SSOConfig.sso_username, "password": encrypted_password} setattr(SSOConfig, 'sso_token', sso_token)
res = request_main(SSOConfig.sso_url, headers=None, method='post', data=req_data) \ No newline at end of file
setattr(SSOConfig, 'sso_token', res['data']['token'])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment