一、session鉴权的处理1. requests的会话对象

就像一个浏览器一样,它会在同一个会话中自动处理cookie信息,不需要写任何额外的代码。

import requests
session = requests.Session()  # 理解为就是一个浏览器
type(session)
requests.sessions.Session
session.post()# 登录session.get() # 获取某个数据,会自动携带上一步收到的cookie
# 课堂派案例headers = {'cookie': 'FZ_STROAGE.ketangpai.com=eyJTRUVTSU9OSUQiOiIzMTI5MjRiNTU2MzNmMDUxIiwiU0VFU0lPTkRBVEUiOjE2Mzk1NzA0NDQ3Njd9; ARK_ID=undefined; ketangpai_home_slb=3fbda3fc94d5d1be63720d9c156288d0; PHPSESSID=kmugv5id4lcecie33asikt3p96; ketangpai_home_remember=think%3A%7B%22username%22%3A%22MDAwMDAwMDAwMLV2x5eHz7dthN523LWtftmC0IDak4NubQ%22%2C%22expire%22%3A%22MDAwMDAwMDAwMLOGvd6IubtrhKiGl7G2dZ4%22%2C%22token%22%3A%22MDAwMDAwMDAwMMurrpWavLehhs1-lbO5hZWEzYfcepuomcWmmqaMiHtnr5ypzYWosKKZq6HQxtOK0ZCme5p-q6iZu2yrn4uNhJ3KedDYk7ivboS4jt6zuY2Ug6eDl36KYW0%22%2C%22sign%22%3A%2207f1bd0c97817e6d7ebe92bfe8e30fe9%22%7D',          'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.61 Safari/537.36'}res = requests.get(url='https://v4.ketangpai.com/UserApi/getUserInfo')
res.status_code
200
res.cookies
res.json()
{'status': 0, 'info': '您还未登陆!'}
session = requests.Session() # 1. 创建一个session对象
# 2. 登录login_url = 'https://v4.ketangpai.com/UserApi/login'data = {'email': '877***9301@qq.com',        'password': 'Pyt***inlan',        'remember': 0}# json data paramsresponse = session.post(url=login_url, data=data)
session.cookies
res = session.get(url='https://v4.ketangpai.com/UserApi/getUserInfo')
res.json()
{'status': 1, 'data': {'id': 'MDAwMDAwMDAwMLSGz96Iqb9phLVyoQ',  'username': '****',  'avatar': 'http://v4.ketangpai.com/Public/Common/img/40/26.png',  'usertype': '1',  'email': '877***01@qq.com',  'stno': '',  'atteststate': 0,  'attestInfo': []}}
# 如果不用session对象,每一步都需要自己处理cookie
login_url = 'https://v4.ketangpai.com/UserApi/login'data = {'email': 877***9301@qq.com',        'password': 'Pyt***inlan',        'remember': 0}# 1.登录response = requests.post(url=login_url, data=data)
response.status_code
200
response.json()
{'status': 1, 'url': '/Main/index.html', 'token': 'MDAwMDAwMDAwMMurrpWavLehhs1-lbO5hZWEzYfcepuomcWmmqaMiHtnr5ypzYWosKKZq6HQxtOK0ZCme5p-haiZu2yrn4uNhJ3KedDYk7ivboS4jt6zuY2Ug7d33n96YW0', 'isenterprise': 0, 'uid': 'MDAwMDAwMDAwMLSGz96Iqb9phLVyoQ'}
# 2.获取数据res = requests.get(url='https://v4.ketangpai.com/UserApi/getUserInfo', cookies=response.cookies)
res.json()
{'status': 1, 'data': {'id': 'MDAwMDAwMDAwMLSGz96Iqb9phLVyoQ',  'username': '****',  'avatar': 'http://v4.ketangpai.com/Public/Common/img/40/26.png',  'usertype': '1',  'email': '877***01@qq.com',  'stno': '',  'atteststate': 0,  'attestInfo': []}}

requests库的session对象仅仅只是自动帮我们处理了cookie的携带问题。

2. 封装处理session鉴权的http请求函数

思路:

  • 保证在一个会话中使用同一个会话对象即可
  • 给每一个用例类创建一个会话对象即可。
import jsonimport unittestfrom jsonpath import jsonpathimport settingfrom common import logger, dbfrom common.data_handler import (    replace_args_by_re,    generate_no_usr_phone)from common.encrypt_handler import generate_signimport requestsclass BaseCase(unittest.TestCase):    """    用例基类    """    db = db    logger = logger    setting = setting    name = 'base_case'    session = requests.session()  # 创建一个session对象用来处理session鉴权    @classmethod    def setUpClass(cls) -> None:        cls.logger.info('==========={}接口开始测试==========='.format(cls.name))    @classmethod    def tearDownClass(cls) -> None:        cls.logger.info('==========={}接口结束测试==========='.format(cls.name))    def flow(self, item):        """        测试流程        """        self.logger.info('>>>>>>>用例{}开始执行>>>>>>>>'.format(item['title']))        # 把测试数据绑定到方法属性case上,其他也要把一些变量绑定到对象的属性上        self._case = item        # 1. 处理测试数据        self.process_test()        # 2. 发送请求        self.send_request()        # 3. 断言        self.assert_all()    def process_test(self):        """        测试数据的处理        """        # 1.1 生成测试数据        self.generate_test_data()        # 1.2 替换依赖参数        self.replace_args()        # 1.3 处理url        self.process_url()        # 1.4 鉴权处理        self.auth_process()    def auth_process(self):        """        v3版本鉴权处理        :return:        """        request_data = self._case.get('request_data')        if request_data:            headers = request_data.get('headers')            if headers:                if headers.get('X-Lemonban-Media-Type') == 'lemonban.v3':                    # 获取token                    token = self._case['request_data']['headers']['Authorization'].split(' ')[-1]                    # 生成签名                    sign, timestamp = generate_sign(token, self.setting.SERVER_RSA_PUB_KEY)                    # 添加到请求数据中                    self._case['request_data']['json']['sign'] = sign                    self._case['request_data']['json']['timestamp'] = timestamp        # if self._case['request_data']['headers']['X-Lemonban-Media-Type'] == 'lemonban.v3':        #     # 获取token        #     token = self._case['request_data']['headers']['Authorization'].split(' ')[-1]        #     # 生成签名        #     sign, timestamp = generate_sign(token, self.setting.SERVER_RSA_PUB_KEY)        #     # 添加到请求数据中        #     self._case['request_data']['json']['sign'] = sign        #     self._case['request_data']['json']['timestamp'] = timestamp    def generate_test_data(self):        """        生成测试数据        """        """           生成测试数据,不是固定流程,有不同可以复写           :return:           """        self._case = json.dumps(self._case)        if '$phone_number$' in self._case:            phone = generate_no_usr_phone()            self._case = self._case.replace('$phone_number$', phone)        self._case = json.loads(self._case)    def replace_args(self):        """        替换参数        """        self._case = json.dumps(self._case)  # 把用例数据dumps成字符串,一次替换        self._case = replace_args_by_re(self._case, self)        self._case = json.loads(self._case)        # 再将request_data, expect_data loads为字典        try:            self._case['request_data'] = json.loads(self._case['request_data'])            self._case['expect_data'] = json.loads(self._case['expect_data'])        except Exception as e:            self.logger.error('{}用例的测试数据格式不正确'.format(self._case['title']))            raise e    def process_url(self):        """        处理url        """        if self._case['url'].startswith('http'):            # 是否是全地址            pass        elif self._case['url'].startswith('/'):            # 是否是短地址            self._case['url'] = self.setting.PROJECT_HOST + self._case['url']        else:            # 接口名称            try:                self._case['url'] = self.setting.INTERFACES[self._case['url']]            except Exception as e:                self.logger.error('接口名称:{}不存在'.format(self._case['url']))                raise e    def send_request(self):        """        发送请求        @return:        """        self._response = self.session.request(            url=self._case['url'], method=self._case['method'], **self._case['request_data'])        # self._response = send_http_request(url=self._case['url'], method=self._case['method'],        #                                    **self._case['request_data'])    def assert_all(self):        try:            # 3.1 断言响应状态码            self.assert_status_code()            # 3.2 断言响应数据            self.assert_response()            # 响应结果断言成功后就提取依赖数据            self.extract_data()            # 3.3 数据库断言后面的任务            self.assert_sql()        except  Exception as e:            self.logger.error('++++++用例{}测试失败'.format(self._case['title']))            raise e        else:            self.logger.info('<<<<<<<<<用例{}测试成功<<<<<<<'.format(self._case['title']))    def assert_status_code(self):        """        断言响应状态码        """        try:            self.assertEqual(self._case['status_code'], self._response.status_code)        except AssertionError as e:            self.logger.warning('用例【{}】响应状态码断言异常'.format(self._case['title']))            raise e        else:            self.logger.info('用例【{}】响应状态码断言成功'.format(self._case['title']))    def assert_response(self):        """        断言响应数据        """        if self._case['res_type'].lower() == 'json':            res = self._response.json()        elif self._case['res_type'].lower() == 'html':            # 扩展思路            res = self._response.text        try:            self.assertEqual(self._case['expect_data'], {'code': res['code'], 'msg': res['msg']})        except AssertionError as e:            self.logger.warning('用例【{}】响应数据断言异常'.format(self._case['title']))            self.logger.warning('用例【{}】期望结果为:{}'.format(self._case['title'], self._case['expect_data']))            self.logger.warning('用例【{}】的响应结果:{}'.format(self._case['title'], res))            raise e        else:            self.logger.info('用例【{}】响应数据断言成功'.format(self._case['title']))    def assert_sql(self):        """        断言数据库        """        if self._case.get('sql'):  # 返回指定键的值,如果键不在字典中返回默认值 None 或者设置的默认值。            # 只有sql字段有sql的才需要校验数据库            # 只有sql字段有sql的才需要校验数据库            sqls = self._case['sql'].split(',')            for sql in sqls:                try:                    self.assertTrue(self.db.exist(sql))                except AssertionError as e:                    self.logger.warning('用例【{}】数据库断言异常,执行的sql为:{}'.format(self._case['title'], sql))                    raise e                except Exception as e:                    self.logger.warning('用例【{}】数据库断言异常,执行的sql为:{}'.format(self._case['title'], sql))                    raise e    def extract_data(self):        """        根据提取表达式提取对应的数据        :return:        """        if self._case.get('extract'):            if self._case['res_type'].lower() == 'json':                self.extract_data_from_json()            elif self._case['res_type'].lower() == 'html':                self.extract_data_from_html()            elif self._case['res_type'].lower() == 'xml':                self.extract_data_from_xml()            else:                raise ValueError('res_type类型不正确,只支持json,html,xml')    def extract_data_from_json(self):        """        从json数据中提取数据并绑定到类属性中        :return:        """        try:            rules = json.loads(self._case.get('extract'))        except Exception as e:            raise ValueError('用例【{}】的extract字段数据:{}格式不正确'.format(self._case['title'], self._case['extract']))        for rule in rules:            # 类属性名            name = rule[0]            # 提取表达式            exp = rule[1]            # 根据jsonpath去响应中提取值            value = jsonpath(self._response.json(), exp)            # 如果能提取到值            if value:                # 把值绑定到对应的类属性上                setattr(self.__class__, name, value[0])  # 注意value是个列表            else:                # 提取不到值,说明jsonpath写错了,或者是响应又问题                raise ValueError('用例【{}】的提取表达式{}提取不到数据'.format(self._case['title'], self._case['extract']))    def extract_data_from_html(self):        """        从html数据中提取数据并绑定到类属性中        :return:        """        raise ValueError('请实现此方法')    def extract_data_from_xml(self):        """        从xml数据中提取数据并绑定到类属性中        :return:        """        raise ValueError('请实现此方法')
from unittestreport import ddt, list_datafrom common.base_case import BaseCasecases = [    {'title': '课堂派登录',     'method': 'post',     'url': 'https://v4.ketangpai.com/UserApi/login',     'request_data': '{"data": {"email": "877***01@qq.com", "password": "Pyt***inlan", "remember": 0}}',     'status_code': 200,     'res_type': 'json',     'expect_data': '{"status": 1}'     },    {        'title': '获取用户信息',        'method': 'get',        'url': 'https://v4.ketangpai.com/UserApi/getUserInfo',        'request_data': '{}',        'status_code': 200,        'res_type': 'json',        'expect_data': '{"status": 1}'    }]@ddtclass TestCourseFlow(BaseCase):    name = "课堂派测试"    @list_data(cases)    def test_course(self, item):        self.flow(item)    def assert_response(self):        """        复写assert_response方法实现课堂派的断言        :return:        """        if self._case['res_type'].lower() == 'json':            res = self._response.json()        elif self._case['res_type'].lower() == 'html':            # 扩展思路            res = self._response.text        try:            self.assertEqual(self._case['expect_data'], {'status': res['status']})        except AssertionError as e:            self.logger.warning('用例【{}】响应数据断言异常'.format(self._case['title']))            self.logger.warning('用例【{}】期望结果为:{}'.format(self._case['title'], self._case['expect_data']))            self.logger.warning('用例【{}】的响应结果:{}'.format(self._case['title'], res))            raise e        else:            self.logger.info('用例【{}】响应数据断言成功'.format(self._case['title']))if __name__ == '__main__':    BaseCase.unittest.main()

本文来自博客园,作者:奈非天,转载请注明原文链接:https://www.cnblogs.com/Nephalem-262667641/p/17318278.html