使用参考手册

1. 步骤描述

  1. 创建一个python文件 依次将接口路径、依赖、请求方法粘贴进去

  2. 顺序粘贴对应的 功能实现 模块代码

  3. 登录功能必须要有并且在首位调用所有的操作都需要依赖登录功能携带的token

  4. 粘贴调用示例并稍微根据自己的环境做一些参数修改比如请求路径修改使用案例库案例名称修改等,即可成功调用API

  5. 修改以下几处位置:

# 路径地址与云端账户需要修改为自己对应的账户
SimOneUrl = "http://127.0.0.1:30083"
loginData = {
    "username": 'admin',
    "password": 'admin',
}


if __name__ == '__main__':
	# 云端登录
    LoginPolicy("cloud")
    # 单机版本登录
    LoginPolicy("standalone")

备注: 附录参考代码是根据步骤描述中的顺序进行粘贴并可以成功运行的代码示例,如有疑惑请查看附录参考代码!!

2. 接口路径与依赖库

import base64
import copy
import inspect
import random
from os.path import exists

from Crypto.Cipher import AES
from faker import Faker
from requests import request, Response
import json
from urllib.parse import urljoin
import os
import time

# 路径地址与云端账户需要修改为自己对应的账户
SimOneUrl = "http://127.0.0.1:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}

fake = Faker('zh_CN')
version_url = "version"
interval = 10

# header
header = {
    "Content-Type": "application/json",
}


# Environment Path
class EnvPath:
    token = "TOKEN"


class Prefix:
    UserServer = "api-user"
    CaseServer = "api-case"
    AssetServer = "api-asset"
    TaskServer = "api-task"

# token
_token = os.environ.get(EnvPath.token)


"""  登录相关接口  """
user_check_url = Prefix.UserServer + "/users/check"
health = Prefix.UserServer + "/health"  # 检查服务是否正常
cloud_login_url = Prefix.UserServer + "/users/login"  # 云端登录
standalone_login_url = Prefix.UserServer + "/users/standalone"  # 单机版登录
login_check = Prefix.UserServer + "/users/check"  # user check

"""  案例库目录相关接口  """
categories_url = Prefix.CaseServer + "/categories"  # 案例库目录
get_categories_url = Prefix.CaseServer + "/categories?withCases=false"  # 获取所有案例库
get_cases_url = "api-case/cases?page=1&filter={category}&pageSize=100"
# get_cases_url = Prefix.CaseServer + "/cases?page={page}&filter={category}&pageSize=100"  # 获取当前案例库的所有案例
delete_categories_url = Prefix.CaseServer + "/categories/{}"  # 删除案例库目录

"""  上传文件相关接口  """
file_md5_url = Prefix.AssetServer + "/files/{}?raw={}"  # 验证文件MD5
upload_file_url = Prefix.AssetServer + "/files/{}/upload?raw={}"  # 上传文件

"""  测试案例相关接口  """
case_import_url = Prefix.AssetServer + "/cases/import"  # 导入案例
case_details_url = Prefix.CaseServer + "/cases/{case_id}/data"  # 案例详情
delete_case_url = Prefix.CaseServer + "/trash"  # 删除案例
convert_replay_case_url = Prefix.TaskServer + "/tasks/{id}/convert2case"  # 转换回放案例
case_def_url = Prefix.CaseServer + "/cases/{case_id}"  # 案例概要
update_case_url = Prefix.AssetServer + "/cases/update"  # 编辑案例 # doing

"""  测试任务相关接口  """
create_task_url = Prefix.TaskServer + "/tasks"  # 创建task
stop_task_url = Prefix.TaskServer + "/tasks/stop"  # 停止task
stop_sessions_url = Prefix.TaskServer + "/sessions/stop"  # 会话停止
delete_task_url = Prefix.TaskServer + "/tasks/delete"  # 删除task
delete_sessions_url = Prefix.TaskServer + "/sessions/delete"  # 删除停止
result_url = Prefix.TaskServer + "/tasks/task?taskIds={task_id}"  # 删除停止  案例运行结果详情
task_info = Prefix.TaskServer + "/tasks/task"  # 删除停止  案例运行结果详情
pause_task_url = Prefix.TaskServer + "/sessions/{task_id}/pause"  # 任务暂停
resume_task_url = Prefix.TaskServer + "/sessions/{task_id}/resume"  # 任务暂停后开始
task_result_url = Prefix.TaskServer + "/tasks/task?taskIds={id}"  # 删除停止  案例运行结果详情

"""  主车相关接口  """
import_vehicle_url = Prefix.AssetServer + "/vehicles/import"  # 导入主车
delete_vehicle_url = Prefix.AssetServer + "/vehicles/{vehicles_id}"  # 删除主车
get_vehicle_url = Prefix.AssetServer + "/vehicles"  # get获取主车信息 post创建主车
get_vehicles_url = Prefix.AssetServer + "/vehicles"  

"""  地图相关接口  """
import_map_url = Prefix.AssetServer + "/maps"  # 导入地图
delete_map_url = Prefix.CaseServer + "/maps/trash"  # 删除地图
get_map_url = Prefix.CaseServer + "/maps?pageSize=100"  # 获取地图相关信息
road_service_url = Prefix.TaskServer + "/sce"  # 启动路网服务
laneTypeIfInside_url = Prefix.TaskServer + "/sce/{map_id}/location/laneTypeIfInside"  # 监测点是否在车道内

"""  数据驱动相关接口  """
import_data_driven_url = Prefix.AssetServer + "/dd"  # 导入数据驱动源
delete_data_dirven_url = Prefix.AssetServer + "/dd/{id}"  # 删除数据驱动源

""" 资源相关接口 """
import_asset_url = Prefix.AssetServer + "/assets"  # 资源导入
asset_repeat_url = Prefix.AssetServer + "/assets/isrepeat"  # 资源查重
get_asset_url = Prefix.AssetServer + "/assets?schema={}"  # 获取资源信息
delete_asset_url = Prefix.AssetServer + "/assets/{}"  # 删除对应资源

"""  案例导出相关  """
export_case_url = Prefix.AssetServer + "/cases/export"  # 案例导出
download_case_url = Prefix.AssetServer + "/{pake_id}"  # 案例下载

"""  测试案例集相关  """
get_suites_url = Prefix.AssetServer + "/suites"  # 获取测试集
run_suite_url = Prefix.TaskServer + "/tasks"  # 运行测试集
get_task_list_url = Prefix.TaskServer + "/tasks?finished=true&page=0&pageSize=12&own=true&expandedIds={task_set_id}"  # 获取测试案例集合
queue_status_url = Prefix.TaskServer + "/tasks/queue?own=true"  # 套件运行状态检查

"""  案例判定相关  """
get_judgements_url = Prefix.AssetServer + "/cases/{caseId}/judgements"  # 获取更新 所有判定信息
get_judgement_url = "api-asset/cases/{caseId}/judgements/{judgementId}" # 获取更新 单个判定信息

"""  测试计划相关接口  """
testPlan_list_url = Prefix.TaskServer + "/testPlan?page=1&pageSize=20&options={options}"  # 获取测试计划列表
testPlan_id_url = Prefix.TaskServer + "/testPlan/{id}"  # 获取测试计划 / 更新测试计划(包含重命名)
create_testPlan_url = Prefix.TaskServer + "/testPlan"  # 创建测试计划
del_testPlan_url = Prefix.TaskServer + "/testPlan/delete"  # 删除测试计划
del_testPlan_single_url = Prefix.TaskServer + "/testPlan/{id}"  # 删除单个测试计划
execute_testPlan_url = Prefix.TaskServer + "/testPlan/execute"  # 执行测试计划
clone_testPlan_url = Prefix.TaskServer +"/testPlan/{id}/clone"  # 复制测试计划
isrepeat_testPlan_url = Prefix.TaskServer +"/testPlan/isrepeat?name={name}&type={type}"  # 测试计划名称是否已存在


"""测试报告导出相关"""
export_report_url = Prefix.TaskServer + "/sessions/{sessions_id}/report" # 导出测试报告
download_report_url = "/report/{sessions_id}_zh-Hans.pdf" # 下载测试报告

3. 接口请求方法

class RquestApi():
    def __init__(self):
        super(RquestApi, self).__init__()
        self._headers = {"Content-Type": "application/json"}
        self._token = os.environ.get(EnvPath.token)
        self._host_ip = SimOneUrl
        self.get = "GET"
        self.post = "POST"
        self.delete = "DELETE"
        self.put = "PUT"

    def send(self, method: str, url: str, **kwargs) -> Response:
        """
        发送请求
        method: 请求方法
        url: 请求地址
        kwargs: 请求参数
        """
        self.headers.update({"Authorization": self.token})
        complete_url = urljoin(self._host_ip, url)
        data = dict()
        data['method'] = method
        data['url'] = complete_url
        data['headers'] = self.headers
        data.update(kwargs)
        print(data)
        response = request(verify=False, **data)
        try:
            print("response:%s" % str(response.content.decode("utf-8")))
            self._status_code = response.status_code
            self._response_code = response.json().get("code")
        except:
            pass
        return response

    @property
    def token(self):
        return self._token

    @property
    def headers(self):
        return self._headers

    @headers.setter
    def headers(self, value):
        self._headers = value

4. 云端登录

云端API

"""
def cloud_login_api(self, userinfo: dict):
    """
    接口名称云端版登录
    请求参数userinfo 
    请求方式post
    请求路径cloud_login_url = Prefix.UserServer + "/users/login"
    参数示例userinfo: body {"username":xxx, "password": xxx}
    """
    return self.send(self.post, self.cloud_login_url, json=userinfo).json()

def check_username_api(self, username:str):
    """
    接口名称用户校验
    请求参数username       
    请求方式post
    请求路径cloud_login_url = Prefix.UserServer + "/users/check"
    """
    payload = {'username': username}
    return self.send(self.post, self.login_check, json=payload).json()
"""

功能实现

class LoginBusiness(RquestApi):

    def cloud_login_api(self, userinfo: dict):
        """
        云端登录API
        @param userinfo:
        @return:
        """
        return self.send(self.post, cloud_login_url, json=userinfo).json()

    def check_username_api(self, username: str):
        """
        校验用户名称是否合格
        @param username:
        @return:
        """
        payload = {'username': username}
        return self.send(self.post, login_check, json=payload).json()

    def getCheckToken(self) -> str:
        """
        校验token是否合格
        @return:
        """
        getCheckUrl = os.path.join(SimOneUrl, user_check_url)
        payload = {'username': loginData['username']}
        response = self.send(self.post, getCheckUrl, json=payload)
        checkToken = json.loads(response.content.decode("utf-8"))['data']['checkToken']
        return checkToken

    def pad(self, text):
        """
        填充函数,使被加密数据的字节码长度是block_size的整数倍
        @param text:
        @return:
        """
        length = AES.block_size
        count = len(text.encode('utf-8'))
        add = length - (count % length)
        entext = text + (chr(add) * add)
        return entext

    def str_aes(self, string: str, key: str):
        """
        aes 加密
        @param string:
        @param key:
        @return:
        """
        aes = AES.new(key.encode("utf-8"), AES.MODE_ECB)
        res = aes.encrypt(self.pad(string).encode("utf8"))
        msg = str(base64.b64encode(res), encoding="utf8")
        return msg

    def get_str_aes(self, string: str, username: str, key: str = "eGeVQh0lRyq41I41") -> str:
        '''
        @param string:
        @param username:
        @param key: AES加密的key
        @return:
        '''
        check_token = self.check_username_api(username)
        target_string = check_token["data"]["checkToken"] + string
        msg = self.str_aes(target_string, key)
        return msg

    @property
    def _cloud_std_login(self):
        """
        云端版本登陆方式
        @return:
        """
        try:
            new_login_data = copy.deepcopy(loginData)
            new_login_data['password'] = self.get_str_aes(string=new_login_data['password'],
                                                          username=new_login_data['username'])
            print('cloud login data:' + str(new_login_data))
            response = self.cloud_login_api(userinfo=new_login_data)
            print("-------", response)
            print('cloud login request result:' + str(response))
            token = response['data']['token']
            os.environ[EnvPath.token] = token
            print(token)
            return token
        except Exception as e:
            print(str(e))
            raise print('get token error ')


# 登录入口
class LoginPolicy():
    standalone = "standalone"
    cloud = "cloud"

    def __new__(cls, env_name):
        if env_name == cls.standalone:
            return LoginBusiness()._enterprise_std_login
        elif env_name == cls.cloud:
            return LoginBusiness()._cloud_std_login
        else:
            return print("----------input error---------------")

云端调用示例

SimOneUrl = "http://127.0.0.1:30083"
loginData = {
    "username": 'admin',
    "password": 'admin',
}

if __name__ == '__main__':
    LoginPolicy("cloud")

正确返回结果

cloud login request result:{'code': 0, 'data': {'token': 'f2e3e458-264b-44fb-9f95-319a978ddec3'}}

5. 单机版登录

单机版API

"""
def enterprise_login_api(self):
    """
    接口名称企业单机版本登录
    请求参数 
    请求方式get
    请求路径SimOneUrl + standalone_login_url
    参数示例userinfo: body {"username":xxx, "password": xxx}
    """
    getLoginUrl = os.path.join(SimOneUrl, standalone_login_url)
    response = self.send(self.get, url=getLoginUrl)
    return response
"""

功能实现

class LoginBusiness(RquestApi):

    def cloud_login_api(self, userinfo: dict):
        """
        云端登录API
        @param userinfo:
        @return:
        """
        return self.send(self.post, cloud_login_url, json=userinfo).json()

    def check_username_api(self, username: str):
        """
        校验用户名称是否合格
        @param username:
        @return:
        """
        payload = {'username': username}
        return self.send(self.post, login_check, json=payload).json()

    def getCheckToken(self) -> str:
        """
        校验token是否合格
        @return:
        """
        getCheckUrl = os.path.join(SimOneUrl, user_check_url)
        payload = {'username': loginData['username']}
        response = self.send(self.post, getCheckUrl, json=payload)
        checkToken = json.loads(response.content.decode("utf-8"))['data']['checkToken']
        return checkToken

    def pad(self, text):
        """
        填充函数,使被加密数据的字节码长度是block_size的整数倍
        @param text:
        @return:
        """
        length = AES.block_size
        count = len(text.encode('utf-8'))
        add = length - (count % length)
        entext = text + (chr(add) * add)
        return entext

    def str_aes(self, string: str, key: str):
        """
        aes 加密
        @param string:
        @param key:
        @return:
        """
        aes = AES.new(key.encode("utf-8"), AES.MODE_ECB)
        res = aes.encrypt(self.pad(string).encode("utf8"))
        msg = str(base64.b64encode(res), encoding="utf8")
        return msg

    def get_str_aes(self, string: str, username: str, key: str = "eGeVQh0lRyq41I41") -> str:
        '''
        @param string:
        @param username:
        @param key: AES加密的key
        @return:
        '''
        check_token = self.check_username_api(username)
        target_string = check_token["data"]["checkToken"] + string
        msg = self.str_aes(target_string, key)
        return msg

    def enterprise_login_api(self):
        """
        @return:
        """
        getLoginUrl = os.path.join(SimOneUrl, standalone_login_url)
        response = self.send(self.get, url=getLoginUrl)
        return response
    @property
    def _enterprise_std_login(self):
        """
        单机版本登录方式
        @return:
        """
        try:
            getLoginUrl = os.path.join(SimOneUrl, standalone_login_url)
            response = self.send(self.get, url=getLoginUrl)
            token = json.loads(response.content.decode("utf-8"))['data']['token']
            os.environ[EnvPath.token] = token
        except Exception as e:
            print(str(e))
            token = ""
            # raise AssertionError('get token error ')
        return token

    @property
    def _cloud_std_login(self):
        """
        云端版本登陆方式
        @return:
        """
        try:
            new_login_data = copy.deepcopy(loginData)
            new_login_data['password'] = self.get_str_aes(string=new_login_data['password'],
                                                          username=new_login_data['username'])
            print('cloud login data:' + str(new_login_data))
            response = self.cloud_login_api(userinfo=new_login_data)
            print("-------", response)
            print('cloud login request result:' + str(response))
            token = response['data']['token']
            os.environ[EnvPath.token] = token
            print(token)
            return token
        except Exception as e:
            print(str(e))
            raise print('get token error ')


class LoginPolicy():
    standalone = "standalone"
    cloud = "cloud"

    def __new__(cls, env_name):
        if env_name == cls.standalone:
            return LoginBusiness()._enterprise_std_login
        elif env_name == cls.cloud:
            return LoginBusiness()._cloud_std_login
        else:
            return print("----------input error---------------")

调用示例

if __name__ == '__main__':
    LoginPolicy("standalone")

返回结果

{'method': 'GET', 'url': 'http://172.31.9.85:30083/api-user/users/standalone', 'headers': {'Content-Type': 'application/json', 'Authorization': None}}
response:{"code":0,"data":{"token":"b557f84a-070d-4ce3-8f70-8c0021428bed"}}

6. 案例库相关

6.1 创建案例库

API

"""
def create_category_api(self, cate_name:str):
    '''
    创建案例库
    @param cate_name: 案例库名
    @return:
    '''
    payload = {"parentId": "", "name": cate_name}
    return self.send(self.post,self.categories_url,json=payload).json()
"""

功能实现

class Suite(RquestApi):

    def __init__(self):
        super(Suite, self).__init__()
        self._cyclical = 0
        self.category_name = "Automation_" + str(random.randint(0,9999))

    def create_category_api(self, cate_name:str):
        '''
        创建案例库
        @param cate_name: 案例库名
        @return:
        '''
        payload = {"parentId": "", "name": cate_name}
        return self.send(self.post,categories_url,json=payload).json()

    def create_category(self, cate_name:str = None)->str:
        '''
        创建案例库
        @param cate_name:
        @return:
        '''
        if not cate_name:
            cate_name = self.category_name
        print(f"案例库目录名称:{cate_name}")
        response = self.create_category_api(cate_name)
        cate_id = response["data"]["categoryId"]
        print(f"案例库目录 id :{cate_id}")
        return cate_id

def main():
    suite = Suite()
    suite.create_category()

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main()

返回值

{'method': 'POST', 'url': 'http://172.31.9.85:30083/api-case/categories', 'headers': {'Content-Type': 'application/json', 'Authorization': 'a2b6beb8-f582-4b15-b063-d92c506452dd'}, 'json': {'parentId': '', 'name': 'Automation_5538'}}
response:{"code":0,"data":{"name":"Automation_5538","categoryId":"feeea7d0-439e-4796-a239-0f7d8f484d01"}}
案例库目录 id :feeea7d0-439e-4796-a239-0f7d8f484d01

6.2 获取案例库列表

API

"""
def get_category_api(self):
    """
    获取案例库列表
    @return: response
    """
    return self.send(self.get, self.categories_url).json()
"""

功能实现

    def get_category_api(self):
        """
        获取案例库列表
        @return: response
        """
        return self.send(self.get, categories_url).json()

    def get_category(self,userid:str=None)->dict:
        """
        获取案例库
        @param userid: 用户id desc:admin->builtIn
        @return:
        """
        cate_dict={"data":{},"parentId":{}}
        response =  self.get_category_api()
        cate_dict["total"] = response["data"]["total"]
        if not userid:
            print("获取当前所有的案例库目录信息")
            for one in response["data"]["categories"]:
                cate_dict["data"].setdefault(one["name"],one["id"])
                cate_dict["parentId"].setdefault(one["name"],one["parentId"])
        else:
            print("获取用户{}的案例库目录信息".format(userid))
            for one in response["data"]["categories"]:
                if  one["userId"] == userid:
                    cate_dict["data"].setdefault(one["name"],one["id"])
                    cate_dict["parentId"].setdefault(one["name"], one["parentId"])
        print("获取到的案例库->%s"%str(cate_dict))
        return cate_dict

def main():
    suite = Suite()
    suite.create_category()

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main()

返回值

获取到的案例库->{'data': {'智能网联场景库': 'f58fdcf4-c36b-11eb-81fd-1831bf08c354', 'V2X泛化': 'f58fdcf4-c36b-11eb-81fd-1831bf08c354', '前向碰撞预警': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', '交叉路口碰撞预警': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', '盲区变道预警': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', '异常车辆提醒': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', '车辆失控预警': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', '道路危险状况提示': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', '限速预警': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', '弱势交通参与者碰撞预警': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', '绿波车速引导': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', '车内标牌': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', '左转辅助': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', '前方拥堵提醒': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', '紧急车辆提醒': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', '闯红灯预警': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', '逆向超车预警': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', '紧急制动预警': 'fbdc25a4-f982-4db7-9e39-7ca68b5acd76', 'new': '', 'Automation_5538': ''}, 'total': 15}

6.3 获取案例库列表

API

"""
def get_category_case_api(self,cate_id:str):
	"""
	获取当前案例库所有案例
	@param cate_name: 案例库名
	@return:
	"""
	return self.send(self.get,self.get_cases_url.format(category=cate_id)).json()
"""

功能实现

    def get_category_case_api(self,cate_id:str):
        """
        获取当前案例库所有案例
        @param cate_name: 案例库名
        @return:
        """
        return self.send(self.get,get_cases_url.format(category=cate_id)).json()

    def get_cases_category(self, cate_id: list):
        """
        获取当前案例库的所有案例
        @param cate_id: 案例库目录id
        @return:
        """

        def url_code(url: str, way: str):
            from urllib import parse
            if way == "unquote":
                return parse.unquote(url)
            elif way == "quote":
                return parse.quote(url)
        page_size = 100
        # print("get all case id from api by category_id")
        _filter = {"keyword": "", "categories": cate_id}
        string = json.dumps(_filter).replace(" ", "")
        filter = url_code(string, UrlCodeType.quote).replace("%", "%25")
        response = self.send(self.get, get_cases_url.format(page="1", category=filter)).json()
        # get case id
        # print("get_cases_category 入参cate_id:%s" % cate_id)
        cases_num = response['data']['total']
        if cases_num >= page_size:
            if cases_num % page_size == 0:
                page = cases_num // page_size
            else:
                page = cases_num // page_size + 1
        else:
            page = 1

        def update_case_id(response, page_size):
            case_name = response['data']['caseDefs'][page_size]['name']
            case_id = response['data']['caseDefs'][page_size]['id']
            case_id_list.append(case_id)
            cases_dict.setdefault(case_name, [case_id])


        cases_dict = {}
        case_id_list = []
        # 如果案例数量是100以内
        if cases_num <= page_size:
            for i in range(cases_num):
                update_case_id(response=response, page_size=i)
        else:
            # 如果案例数量超过100了,固定获取案例数量为100个就翻页
            for i in range(page):
                response = self.send(self.get, self.get_cases_url.format(page=str(i + 1), category=filter)).json()
                # cases_num % page_size 是取整表示没有到达最后一页
                # if i == 0 or i > 0 and cases_num % page_size == 0:
                if i + 1 != page:
                    # print("xxx page_size:",page_size)
                    for j in range(page_size):
                        # print("xxx j:",j)
                        update_case_id(response=response, page_size=j)
                # 到最后一页了,因为案例数量不固定所以需要单独计算获取的个数
                else:
                    for j in range(cases_num % page_size):
                        update_case_id(response=response, page_size=j)
        return cases_dict, case_id_list

def main(catgory_name):
    suite = Suite()
    cate_id_dict = suite.get_category()
    # 案例库名称可以自己指定
    cate_id = [cate_id_dict["data"][catgory_name]]
    suite.get_cases_category(cate_id)

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main('转向冲突')

返回值

{'method': 'GET', 'url': 'http://172.31.9.85:30083/api-case/cases?page=1&filter=%257B%2522keyword%2522%253A%2522%2522%252C%2522categories%2522%253A%255B%25220863fa00-4618-11e9-803b-61333d8d0423%2522%255D%257D&pageSize=100', 'headers': {'Content-Type': 'application/json', 'Authorization': '5022f578-187b-47ca-bed8-8d452c8738c5'}}
response:{"code":0,"data":{"total":1,"caseDefs":[{"thumbnail":null,"data":{"schema":"casedef","vehicles":[{"id":"simonedriver","name":"SimOneDriver控制","classId":"MKZ","physicalGradeSensor":0}],"id":"25834d95-d7c2-4f5f-af0e-5d8937e35064","opponent":"openscenario","userId":"18392614332","mapId":"Junction5","categoryId":"0863fa00-4618-11e9-803b-61333d8d0423","name":"转向冲突8","tags":["十字路口","机动车","转向"],"lastModified":1642420795018,"selected":false,"builtIn":true,"created":1642420795018,"version":{"major":3,"minor":0,"build":0,"patch":2},"createUsername":"standalone","createUserRole":0,"revision":{"id":0,"userId":"standalone","timestamp":1608881493476},"updateUsername":"standalone","notes":"本车转向,与前车发生冲突","joinway":"import","thumbnail":"","mapName":"五车道路口"},"id_":97,"updateUserId":"18392614332","created":1642420795134,"builtIn":true,"terminal":null,"userId":"18392614332","tags":["转向","机动车","十字路口"],"total":1,"invalidedFlag":"","deleteTime":0,"opponent":"openscenario","name":"转向冲突8","editRevision":"5712fa29-188d-4653-81d2-b5e56baaeb45","mapId":"Junction5","id":"25834d95-d7c2-4f5f-af0e-5d8937e35064","lastModified":1642420795134,"vehicleId":["simonedriver"],"categoryId":"0863fa00-4618-11e9-803b-61333d8d0423"}],"page":1}}

6.4 获取案例库中子集目录案例ID

API

"""
def get_category_api(self):
	"""
    获取案例库列表
    @return: response
    """
    return self.send(self.get, categories_url).json()
"""

功能实现

def get_all_cate_id_list(self, cate_id) -> list:

	"""
	@param cate_id: 案例库ID
	@return: cate_id_list
	"""
	all_cate_list = [cate_id]
	response = self.get_category_api()
	def get_cate_id(current_cate_id):
		categories = response["data"]["categories"]
		for category in categories:
			if current_cate_id == category["parentId"] and category["id"] not in all_cate_list:
				all_cate_list.append(category["id"])
				get_cate_id(category["id"])

	get_cate_id(cate_id)
	print("all_cate_list:", all_cate_list)
	return all_cate_list

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    # main("new", "test_4")
    test = Suite()
    test.get_all_cate_id_list("dbaf8d6e-af07-44a6-b7c7-d8b7f2fd6616")

返回值

all_cate_list: ['dbaf8d6e-af07-44a6-b7c7-d8b7f2fd6616', 'c8557561-5729-43e1-9d65-91498f7a1129']

7. 案例相关

7.1 获取案例信息

API

"""
def get_case_detail_api(self, case_id:str):
    """
    获取案例信息
    @param params: /{case_id}/data
    @return: response
    """
    url = self.case_details_url.format(case_id=case_id)
    return self.send(self.get, url).json()
"""

功能实现

    def get_case_detail_api(self, case_id: str):
        """
        获取案例信息
        @param : /{case_id}/data
        @return: response
        """
        url = case_details_url.format(case_id=case_id)
        return self.send(self.get, url).json()

    def get_case_detail(self, case_id: str):
        """

        @param case_id: 测试案例id
        @return:
        """
        response = self.get_case_detail_api(case_id)
        return response


def main(catgory_name):
    suite = Suite()
    cate_id_dict = suite.get_category()
    # 案例库名称可以自己指定
    cate_id = [cate_id_dict["data"][catgory_name]]
    # suite.get_cases_category(cate_id)
    cases_dict, case_id_list = suite.get_cases_category(cate_id)
    suite.get_case_detail(case_id_list[0])

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main('转向冲突')

返回值

{'code': 0, 'data': {'schema': 'casedata', 'caseId': '25834d95-d7c2-4f5f-af0e-5d8937e35064', 'sensors': {'byId': {}, 'allIds': []}, 'judgements': {'byId': {'timeout': {'id': 'timeout', 'name': '超时', 'type': 'timeout', 'enabled': True, 'scope': {'type': 'global', 'position': {'x': 0, 'y': 0, 'z': 0}, 'heading': {'x': 0, 'y': 0, 'z': 0, 'w': 1}, 'size': {'x': 10, 'y': 10, 'z': 0}}, 'conditions': [{'variable': 'timeout', 'value': 600}], 'settings': {'action': 'failure', 'logLevel': 'error', 'logInfo': ''}, 'builtIn': True, 'lock': True, 'userId': 'admin', 'schema': 'judgement'}, 'collision': {'id': 'collision', }..........}

7.2 案例导入

API

"""
def import_case_api(self, payload: dict):
    """
    案例导入
    @param payload:
    @return:
    """

	return self.send(self.post, case_import_url, data=json.dumps(payload)).json()
	
def file_md5_check(self, file_path: str, raw: bool = False):
	"""
	文件md5检查
	@param file_path: 文件路径
	@return:
	"""

	# global url
	def file_md5(filename: str):
		"""
		以文件内容md5加密
		@param filename:
		@return:
		"""
		with open(filename, 'rb') as f:
			contents = f.read()
		return hashlib.md5(contents).hexdigest()

	file_md5 = file_md5(file_path)
	# print("file_md5:", file_md5)
	if raw:
		url = file_md5_url.format(file_md5, "true")
	else:
		url = file_md5_url.format(file_md5, "false")
	# print("file_md5:",file_md5)
	# print(self.send(self.get, url).json())
	return self.send(self.get, url).json()
	
def upload_file(self, file_path: str, index: int, chunks: int, category: str, file_md5: str, raw: bool = False,
				uuid: str = Faker('zh_CN').uuid4()):
	"""
	 上传文件
	@param file_path: 文件路径
	@param index: 上传序列
	@param chunks: chunks总数
	@param category: 类别
	@param file_md5: 文件MD5
	@return:
	"""
	global url
	self.headers = {}
	file_size = os.path.getsize(file_path)
	file_name = file_path.split(os.sep)[-1]
	files = {'file': open(os.path.join(file_path), 'rb')}

	if not file_md5:
		def file_md5(filename: str):
			"""
			以文件内容md5加密
			@param filename:
			@return:
			"""
			with open(filename, 'rb') as f:
				contents = f.read()
			return hashlib.md5(contents).hexdigest()

		file_md5 = file_md5(file_path)
	if raw:
		url = upload_file_url.format(file_md5, "true")
	else:
		url = upload_file_url.format(file_md5, "false")

	upload_payload = {"params": json.dumps({"filename": file_name,
											"index": index,
											"chunks": chunks,
											"filesize": file_size,
											"uploadId": uuid,
											"category": category})}
	response = self.send(self.post, url, data=upload_payload, files=files).json()
	# print(f"-----upload_file-- url:{url} upload_payload:{upload_payload} files:{files} response:{response}")
	print("uuid:", uuid)
	return uuid, response
    
"""

功能实现

    def file_md5_check(self, file_path: str, raw: bool = False):
        """
        文件md5检查
        @param file_path: 文件路径
        @return:
        """

        # global url
        def file_md5(filename: str):
            """
            以文件内容md5加密
            @param filename:
            @return:
            """
            with open(filename, 'rb') as f:
                contents = f.read()
            return hashlib.md5(contents).hexdigest()

        file_md5 = file_md5(file_path)
        # print("file_md5_check file_md5:", file_md5)
        # file_md5_url = "api-asset/files/{}?raw={}"  # 验证文件MD5
        if raw:
            url = file_md5_url.format(file_md5, "true")
        else:
            url = file_md5_url.format(file_md5, "false")
        response = self.send(self.get, url).json()
        # print(f"-----file_md5_check:{url} get response:",response)
        return response

    def remove_file(self, sim_path: str):
        """
        remove files that except (.sim .zip .xosc)
        :param sim_path:
        """
        for item in os.listdir(sim_path):
            sim_file = os.path.join(sim_path, item)
            try:
                if os.path.isfile(sim_file):
                    os.remove(sim_file)
                    # print(f'{sim_file} 文件删除成功')
                elif os.path.isdir(sim_file):
                    shutil.rmtree(sim_file)
                    # print(f'{sim_file} 文件夹删除成功')
            except OSError as e:
                print(f'Error: {e.filename} - {e.strerror}.')
    def get_caseid(self, sim_path: str, sim_list: list):
        """
        get the caseid before import according to
        :param sim_path: string, the path to the .sim file
        :sim_list run_sim: list, cases that need to be run
        :return: list, caseid before import
        """
        # print("sim_path:",sim_path)
        # print("sim_list:",sim_list)
        file_list = os.listdir(sim_path)
        # print("file_list:",file_list)
        target_run = list()
        for item in file_list:
            for run_sim_item in sim_list:
                # if ('31map.sim' in item) and ('all_sim' in choose_run):
                if os.path.join(sim_path, run_sim_item + '.sim') == os.path.join(sim_path, item):
                    target_run.append(item)
        # print("target_run:",target_run)
        if not target_run:
            target_run = file_list
            # print("更新后target_run:",target_run)
        extract_dir = os.path.join(sim_path, "extract_dir")
        # print("extract_dir:",extract_dir)
        if not os.path.exists(extract_dir):
            os.makedirs(extract_dir)
        for item in target_run:
            if item == "extract_dir":
                continue
            # zip_file = os.path.join(sim_path, item).replace('.sim', '.zip') # 当文件名中有两个.sim时会有BUG
            zip_file = os.path.join(extract_dir, item[:-4] + ".zip") if item.endswith(".sim") else os.path.join(
                extract_dir, item)
            # print("zip_file:",zip_file)
            try:
                if item.endswith(".sim") or item.endswith(".zip"):
                    # print("try item:",item)
                    shutil.copy(os.path.join(sim_path, item), zip_file)
            except Exception as e:
                pass
            try:
                with zipfile.ZipFile(zip_file) as zfile:
                    # print("执行到这。。。")
                    zfile.extractall(path=extract_dir)
                    # print("执行到这2。。。")
            except Exception as e:
                raise Exception(f"解压{zip_file}失败,异常信息:{e}")
        temp = list(filter(lambda x: 'json' in x, os.listdir(extract_dir)))
        # print("temp:",temp)
        if temp == []:  # 从3.3.0版本起案例解压后是文件夹,则上一条语句执行后temp为空
            for d in os.listdir(extract_dir):
                # print("extract_dir目录下的内容:",d)
                if os.path.isdir(os.path.join(extract_dir, d)):
                    temp.append(d)
            # print("临时测试temp:",temp)
            all_case_id = list(set(temp))  # 去重
            # print("all_case_id:",all_case_id)
            old_case_id = list(filter(lambda x: 'vehicle' not in x, all_case_id))
            # print("old_case_id:",old_case_id)
            self.remove_file(extract_dir)
            # print("执行到这3。。。")
            return old_case_id
        all_case_id = list(map(lambda x: x.replace('.json', ''), temp))
        all_case_id = list(set(all_case_id))  # 去重
        old_case_id = list(filter(lambda x: 'vehicles' not in x, all_case_id))
        self.remove_file(extract_dir)
        return old_case_id

    def upload_file(self, file_path: str, index: int, chunks: int, category: str, file_md5: str, raw: bool = False,
                    uuid: str = Faker('zh_CN').uuid4()):
        """
         上传文件
        @param file_path: 文件路径
        @param index: 上传序列
        @param chunks: chunks总数
        @param category: 类别
        @param file_md5: 文件MD5
        @return:
        """
        global url
        # self.headers = {}
        self.headers = {"ProjectId": 'default'} # 江淮3.5.0
        file_size = os.path.getsize(file_path)
        file_name = file_path.split(os.sep)[-1]
        files = {'file': open(os.path.join(file_path), 'rb')}

        if not file_md5:
            def file_md5(filename: str):
                """
                以文件内容md5加密
                @param filename:
                @return:
                """
                with open(filename, 'rb') as f:
                    contents = f.read()
                return hashlib.md5(contents).hexdigest()

            file_md5 = file_md5(file_path)
        if raw:
            url = upload_file_url.format(file_md5, "true")
        else:
            url = upload_file_url.format(file_md5, "false")

        upload_payload = {"params": json.dumps({"filename": file_name,
                                                "index": index,
                                                "chunks": chunks,
                                                "filesize": file_size,
                                                "uploadId": uuid,
                                                "category": category})}
        response = self.send(self.post, url, data=upload_payload, files=files).json()
        # print(f"-----upload_file-- url:{url} upload_payload:{upload_payload} files:{files} response:{response}")
        print("uuid:", uuid)
        return uuid, response

    def import_case_api(self, payload: dict):
        """
        案例导入
        @param payload:
        @return:
        """
        # print("********* import_case_api payload********",payload)
        self.headers.update({"Content-Type": "application/json;charset=UTF-8"})
        return self.send(self.post, case_import_url, data=json.dumps(payload)).json()

    def import_case(self, file_path: str, file_name: str, cate_id: str):
        """
        导入案例到指定的案例库目录
        @param file_path: 案例文件路径
        @param file_name: 案例文件名
        @param cate_id: 案例库目录id
        @return: [重要] 此处返回的并不是导入成功后的案例ID,而是导入案例文件里的原ID
        """
        case_file_path = os.path.join(file_path, file_name)
        filename = file_name.split(".")[0]
        file_size = os.path.getsize(case_file_path)
        md5_res = self.file_md5_check(case_file_path)
        md5_res_info = md5_res["data"]
        file_md5 = md5_res_info["id"]
        global ext
        global path_ext
        if file_name.split(".")[1] == "xosc":
            ext = "xosc"
            case_id_list = [f"{filename}"]
            includes = case_id_list
            overrides_id = case_id_list[0]
            overrides_name = filename
            overrides_path = filename + ".xosc"
        elif file_name.split(".")[1] == "sim":
            ext = "sim"
            case_id_list = self.get_caseid(file_path, [filename])
            includes = case_id_list
            overrides_id = case_id_list[0]
            overrides_name = filename
            overrides_path = case_id_list[0] + "/case/" + filename + ".json"
        elif file_name.split(".")[1] == "zip":
            ext = "zip"
            case_id_list = self.get_caseid(file_path, [filename])
            print("case_id_list:", case_id_list)
            includes = case_id_list[0] + "/case/" + filename
            overrides_id = includes
            overrides_name = includes
            overrides_path = includes + ".xosc"
        else:
            raise f"导入案例不支持{file_name.split('.')[1]}格式,用例中断"
        case_payload = {"category": cate_id,
                        "ext": ext,
                        "file": file_md5,
                        "id": "2222222222",
                        "includes": includes,
                        "includesMap": True,
                        "includesStopTrigger": True,
                        "includesVehicle": True,
                        "name": filename,
                        # "overrides": [],
                        "overrides": [{
                            "id": overrides_id,
                            "missingMap": False,
                            "missingVehicle": False,
                            "name": overrides_name,
                            "path": overrides_path,
                            "replaceMap": False,
                            "replaceVehicle": True
                        }],
                        "categoryInfo": {},
                        "needRemoveCases": {}
                        }

        def round_up(data):
            """
            func:向上取整
            Args:
                data: float
            Returns: int

            """
            return math.ceil(data)

        chunk_size = 2 * 1024 * 1024
        if md5_res_info['size'] == 0:
            chunks = round_up(file_size / chunk_size)
            for i in range(chunks):
                self.upload_file(file_path=case_file_path, index=i, chunks=chunks, category="case", file_md5=file_md5)
            response = self.import_case_api(payload=case_payload)
        else:
            response = self.import_case_api(payload=case_payload)
        # print("临时测试 import_case case_payload:", case_payload)
        # print("-----import_case_api response -->>> ", response)
        return case_id_list, response  # [重要] 此处返回的并不是导入成功后的案例ID,而是导入案例文件里的原ID


    def import_flow(self, category_name: str, file_path: str, file_name: str):
        '''
        导入文件流程
        可导入的文件类型为 .xosc 或者 .sim格式
        @param category_name: 案例库名称
        @param file_path: 文件路径比如 r"D:\System
        @param file_name:文件名称比如 123.xosc or 123.sim
        '''
        cate_id = self.get_category()
        cate_idname = cate_id["data"][category_name]
        self.import_case(file_path, file_name, cate_idname)

def main():
    suite = Suite()
    # cate_id_dict = suite.get_category()
    # 案例库名称可以自己指定
    # cate_id = [cate_id_dict["data"][catgory_name]]
    # suite.get_cases_category(cate_id)
    # cases_dict, case_id_list = suite.get_cases_category(cate_id)
    # suite.get_case_detail(case_id_list[0])
    suite.import_flow("new",r"C:\Users\Administrator\Desktop\test", "test.xosc")

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main()

返回值

response:{"code":0,"data":{"id":"2222222222"}}

7.3 案例导出

API

"""
def export_case_api(self, payload: dict) -> json:
	"""
    导出测试案例
    @param payload:dict
    @return:response
    """
    return self.send(self.post, export_case_url, data=json.dumps(payload)).json()

def download_case_api(self, pake_id: str) -> bytes:
	"""
    下载测试案例
    @param pake_id:   pake_id
    @return:response
    """
    return self.send(self.get, download_case_url.format(pake_id=pake_id)).content
    
"""

功能实现

   def export_case_api(self, payload: dict) -> json:
        """
        导出测试案例
        @param payload:dict
        @return:response
        """
        return self.send(self.post, export_case_url, data=json.dumps(payload)).json()

    def download_case_api(self, pake_id: str) -> bytes:
        """
        下载测试案例
        @param pake_id:   pake_id
        @return:response
        """
        return self.send(self.get, download_case_url.format(pake_id=pake_id)).content

    def download_case(self, case_id: list) -> bytes:
        """
        获取二进制测试案例内容
        @param case_id: case_id_list
        @return: bytes
        """
        # 取pake_id
        pake_id = self.export_case(case_id)["data"]["id"]
        print(f"download_case_bytes:{self.download_case_api(pake_id=pake_id)}")
        return self.download_case_api(pake_id=pake_id)

    def export_case(self, case_id: list, _format: str = ".sim", vehicle: bool = True, includesMap: bool = True,
                    includesObstacle: bool = True):

        """
        @param case_id: case_id_list
        @param _format: sim or xosc
        @param vehicle: True or False
        @return:    response
        """

        if _format == ".sim" or _format == ".xosc":
            payload = {
                "caseIds": case_id,
                "includesVehicle": vehicle,
                "includesMap": includesMap,  # 3.5.0
                "includesObstacle": includesObstacle,
                "format": _format,
                "pathinfo": [{"id": caseId, "parentId": None} for caseId in case_id]
            }
            response = self.export_case_api(payload)
            return response
        else:
            raise Exception(f"-------------input format={_format} error----------------")

    def get_all_cate_id_list(self, cate_id) -> list:

        """
        @param cate_name_list: 案例名称列表
        @return: cate_id_list
        """
        all_cate_list = [cate_id]
        response = self.get_category_api()
        def get_cate_id(current_cate_id):
            categories = response["data"]["categories"]
            for category in categories:
                if current_cate_id == category["parentId"] and category["id"] not in all_cate_list:
                    all_cate_list.append(category["id"])
                    get_cate_id(category["id"])

        get_cate_id(cate_id)
        all_cate_list = [j for i in all_cate_list for j in i]
        print("all_cate_list:", all_cate_list)
        return all_cate_list

    def download_case_flow(self, category_name: str, downloadPath: str, downloadName: str) -> None:
        """
        @param cate_name: 案例库名称
        @param downloadPath: 下载路径
        @param downloadName: 下载包的名称
        @return: None
        """
        cate_id_dict = self.get_category()[0]
        cate_id_str = [cate_id_dict["data"][category_name]]
        # cate_method=CategoryBusiness()
        cate_id_list = self.get_all_cate_id_list(cate_id_str)


        case_id_list = self.get_cases_category(cate_id_list)[1]
        downloadResponse = self.download_case(case_id_list)
        # 取 format 的实际参数
        _format = inspect.getfullargspec(self.export_case)[3][0]
        # 下载的文件路径
        downloadPake = os.path.join(downloadPath, downloadName + _format)
        with open(downloadPake, 'wb') as f:
            f.write(downloadResponse)
        print(f"导出的案例路径为:{downloadPake}")

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    download_case_flow(category_name, downloadPath, downloadName)

返回值

run_task response:{'code': 0, 'data': {'sessionId': '3f90ef1b-f5e1-4ab7-b24f-7f94b9d7f1cb', 'taskIds': ['d2973d5b-7fef-4d83-af5d-ec108db815df']}}

7.4 案例场景编辑

API

"""
    def case_def_api(self, case_id: str):
        """
        案例概要
        @param case_id:
        @return:
        """
        return self.send(self.get, case_def_url.format(case_id=case_id)).json()

    def case_data_api(self, case_id: str):
        """
        案例详情
        @param case_id:
        @return:
        """
        return self.send(self.get, case_details_url.format(case_id=case_id)).json()

    def update_case_api(self, payload: dict):
        """
        案例编辑
        @param payload:
        @return:
        """
        self.headers.update({"Content-Type": "application/json"})
        return self.send(self.post, update_case_url, data=json.dumps(payload)).json()

    def get_vehicles_api(self):
        """
        获取主车信息
        @return:
        """
        return self.send(self.get, get_vehicles_url).json()
    
"""

功能实现

	def case_def_api(self, case_id: str):
        """
        案例概要
        @param case_id:
        @return:
        """
        return self.send(self.get, case_def_url.format(case_id=case_id)).json()

    def case_data_api(self, case_id: str):
        """
        案例详情
        @param case_id:
        @return:
        """
        return self.send(self.get, case_details_url.format(case_id=case_id)).json()

    def update_case_api(self, payload: dict):
        """
        案例编辑
        @param payload:
        @return:
        """
        self.headers.update({"Content-Type": "application/json"})
        return self.send(self.post, update_case_url, data=json.dumps(payload)).json()

    def get_vehicles_api(self):
        """
        获取主车信息
        @return:
        """
        return self.send(self.get, get_vehicles_url).json()
    
    
    def get_vehicle_name(self, vehicle_id: str):
        res = self.get_vehicles_api()
        print(f"get_vehicle_api response: {res}")
        data = res["data"]["list"]["byId"]
        for key in data:
            if key == vehicle_id:
                print(f"找到主车ID为{vehicle_id}的主车名称:{data[key]['name']}")
                return data[key]["name"]
        print(f"没有找到主车ID为{vehicle_id}的主车")
        return None


    def update_case(self, case_id: str, **kwargs):
        """
        编辑案例
        @param case_id: 案例ID
        @param map_name: 地图名称
        @param map_id: 地图ID
        @param vehicle_id: 主车ID
        @param AbsoluteTargetSpeed: 主车初始状态绝对目标速度
        @param WorldPosition_x: 主车初始状态世界位置x
        @param WorldPosition_y: 主车初始状态世界位置y
        @param WorldPosition_z: 主车初始状态世界位置z
        @return: 更新主车或地图等信息后的案例
        """
        # 提取参数(设置默认值)
        map_name = kwargs.get("map_name")
        map_id = kwargs.get("map_id")
        vehicle_id = kwargs.get("vehicle_id")
        absolute_target_speed = kwargs.get("absolute_target_speed", 0.0)
        worldposition_x = kwargs.get("worldposition_x", 0.0)
        worldposition_y = kwargs.get("worldposition_y", 0.0)
        worldposition_z = kwargs.get("worldposition_z", 0.0)

        case_def = self.case_def_api(case_id=case_id)
        case_data = self.case_data_api(case_id=case_id)
        # print("临时测试 原case_def:", case_def)
        # print("临时测试 原case_data:", case_data)
        # print("update_case case_id:", case_id)
        # print("update_case map_name:", map_name)
        # print("update_case map_id:", map_id)
        # print("update_case vehicle_id:", vehicle_id)
        # 修改主车
        if vehicle_id:
            vehicle_name = self.get_vehicle_name(vehicle_id)
            case_data["data"]["openSCENARIO"]["Entities"]["ScenarioObject"][0]["Vehicle"][
                "name"] = vehicle_name
            case_data["data"]["openSCENARIO"]["Entities"]["ScenarioObject"][0]["Vehicle"]["Properties"][
                "Property"][0]["value"] = vehicle_id
            for key in case_data["data"]["openSCENARIO"]["Entities"]["ScenarioObject"][0]["Vehicle"]["Properties"][
                "Property"]:
                if key["name"] == 'model':
                    key["value"] = vehicle_id
                    print(key)
                if key["name"] == 'name':
                    key["value"] = vehicle_name
                    print(key)
        # 修改地图
        if map_id:
            case_def["data"]["data"]["mapId"] = map_id
            case_def["data"]["data"]["mapName"] = map_name.split(".")[0]
            case_data["data"]["openSCENARIO"]["RoadNetwork"]["LogicFile"]["filepath"] = map_id + "." + \
                                                                                        map_name.split(".")[1]
        case_payload = {"casedef": case_def["data"]["data"],
                        "casedata": case_data["data"],
                        "needRefresh": True}
        # 修改主车初始状态世界位置
        if worldposition_x:
            privates = case_data["data"]["openSCENARIO"]['Storyboard']['Init']['Actions']['Private']
            for private in privates:
                if private["entityRef"] == "Ego":
                    for PrivateAction in private["PrivateAction"]:
                        if PrivateAction.get('TeleportAction'):
                            PrivateAction['TeleportAction']['Position']['WorldPosition']['x'] = worldposition_x
                            PrivateAction['TeleportAction']['Position']['WorldPosition']['y'] = worldposition_y
                            PrivateAction['TeleportAction']['Position']['WorldPosition']['z'] = worldposition_z
        # 修改主车初始状态绝对目速度
        if absolute_target_speed:
            privates = case_data["data"]["openSCENARIO"]['Storyboard']['Init']['Actions']['Private']
            for private in privates:
                if private["entityRef"] == "Ego":
                    for PrivateAction in private["PrivateAction"]:
                        if PrivateAction.get('LongitudinalAction'):
                            PrivateAction['LongitudinalAction']['SpeedAction']['SpeedActionTarget']['AbsoluteTargetSpeed']['value'] = absolute_target_speed

        response = self.update_case_api(payload=case_payload)
        # print("临时测试 update_case response",response)
        if response['code'] != 0:
            raise Exception("编辑案例异常,接口返回:" + response)
        return response

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    test = Suite()
    test.update_case(case_id)

返回值

update_case_api response {'code': 0, 'msg': 'ok', 'data': {'code': 0, 'msg': 'ok'}}

7.5 保存回放案例

API

"""
    def convert_replay_case_api(self, task_id: str, category_id: str, replay_case_name: str):
        """
        转换回放案例
        @param case_id: 需要转转换的案例ID
        @param category_id: 存放案例库目录ID
        @param replay_case_name: 转换后的回放案例名称
        @return:
        """
        payload = {
            "name": replay_case_name,
            "categoryId": category_id,
            "opponent": "record",
            "ego2npc": False,
            "notes": "",
            "tags": ""
        }
        return self.send(self.get, convert_replay_case_url.format(id=task_id), data=json.dumps(payload))
"""

功能实现

    def convert_replay_case_api(self, task_id: str, category_id: str, replay_case_name: str):
        """
        转换回放案例
        @param case_id: 需要转转换的案例ID
        @param category_id: 存放案例库目录ID
        @param replay_case_name: 转换后的回放案例名称
        @return:
        """
        payload = {
            "name": replay_case_name,
            "categoryId": category_id,
            "opponent": "record",
            "ego2npc": False,
            "notes": "",
            "tags": ""
        }
        return self.send(self.get, convert_replay_case_url.format(id=task_id), data=json.dumps(payload))

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    suite = Suite()
    suite.convert_replay_case_api(task_id="4c3f93ff-3f4a-4bea-ac04-507759fed4d2",
                                  category_id="ab21e34c-9df8-433f-89e7-8a66ec42529d", replay_case_name="TestCase")

返回值

{code:0,success:True}

8. 运行任务相关

8.1 运行任务

API

"""
def create_task_api(self,payload:dict):
    """
    创建task
    @param payload: eg {"caseIds": ["3dd03c09-3ba9-4f7a-b988-fde88296095a"],
    "taskName": "task_name"}
    @return:
    """
    return self.send(self.post,self.create_task_url,json=payload).json()
    
"""

功能实现

注意: 此功能 3.4 以前包括3.4的版本 和 3.5 以后的版本有所区分请注意

    def create_task_api(self,payload:dict):
        """
        创建task
        @param payload: eg {"caseIds": ["3dd03c09-3ba9-4f7a-b988-fde88296095a"],
                            "taskName": "task_name"}
        @return:
        """
        return self.send(self.post,create_task_url,json=payload).json()
    
    def run_task(self, case_ids: List[str], vehicle_id: Optional[str] = None, withEvaluation: bool = False, version: str = "3.5.0", type: str = "worldsim")-> Tuple[str, List[str]]:
        """
        运行测试任务集合
        Run task set
        @param caseid: caseid
        @param vehicle_id: vehicle_id
        @param withEvaluation: withEvaluation
        @param version: version
        @return:
        """
        task_name = "AutoTest_" + time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime())
        payload = {}
        if version >= "3.6.3":
            payload = {"caseIds": case_ids, "taskName": task_name,
             "type": type, "withEvaluation": False, "vehicleConfig": {
                "Ego": {"instanceId": "Ego", "id": "default", "name": "主车", "classId": "MKZ",
                        "algorithms": [{"id": "Default", "name": "默认控制器", "algorithmId": "SimOneDriver"},
                                       {"id": "AutoDrive", "name": "自驾控制器", "algorithmId": "SimOneDriver"}]}},
             "notification": {"type": "none", "condition": "any"}, "enableStateMachine": False, "priority": 0}
        elif version >= "3.5.0":
            payload = {"caseIds": case_ids,
                       "taskName": task_name,
                       "withEvaluation": withEvaluation,
                       "controllers": [{"id": "Default", "name": "默认控制器", "algorithmId": "SimOneDriver"},
                                       {"id": "AutoDrive", "name": "自驾控制器", "algorithmId": "SimOneDriver"}]}
        elif version < "3.5.0":
            payload = {"caseIds": case_ids, "taskName": task_name, "speed": 1}
        # elif version > "3.1.1":
        #     payload = {"caseIds": case_ids, "taskName": task_name, "speed": 1}
        # else:
        #     payload = {"caseIds":case_ids,"taskName":task_name,"withEvaluation":True,"type":"worldsim"} # 现代云端 内置案例有坑
        if vehicle_id:
            payload.update({"overrideVehicleId": vehicle_id})
        response = self.send(self.post, url=create_task_url, json=payload).json()

        data = response['data']
        return data['sessionId'], data['taskIds']

def main(catgory_name,case_name):
    suite = Suite()
	if not version:
        version = suite.get_version()
        print("请求simone version接口提取版本号为:" + version)
    cate_id_dict = suite.get_category()
    # 案例库名称可以自己指定
    cate_id = [cate_id_dict["data"][catgory_name]]
    suite.get_cases_category(cate_id)
    cases_dict, case_id_list = suite.get_cases_category(cate_id)
    suite.get_case_detail(case_id_list[0])
    # suite.import_flow("new",r"C:\Users\Administrator\Desktop\test", "test.xosc")
    # 案例名称逻辑
    if case_name:
        case_id_list = cases_dict[case_name]
        # print(f" case_name:{case_name}")
    suite.run_task(case_id_list, vehicle_id=vehicle_id, version=version)

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main("Automation_5538","test_4-copy")

返回值

run_task response:{'code': 0, 'data': {'sessionId': '3f90ef1b-f5e1-4ab7-b24f-7f94b9d7f1cb', 'taskIds': ['d2973d5b-7fef-4d83-af5d-ec108db815df']}}

8.2 结束任务

API

"""
def stop_task_api(self, payload: dict):
    """
    task停止API
    @param payload: eg  {"ids":["f9e096f6-78ff-431a-a5e2-24a2de4117df"]}
    @return:
    """
    return self.send(self.post, stop_task_url, json=payload).json()

def stop_sessions_api(self, payload: dict):
    """
    sessions停止API
    @param payload: eg  {"ids":["f9e096f6-78ff-431a-a5e2-24a2de4117df"]}
    @return:
    """
    return self.send(self.post, stop_sessions_url, json=payload).json()
    
"""

功能实现

   def stop_task_api(self, payload: dict):
        """
        task停止API
        @param payload: eg  {"ids":["f9e096f6-78ff-431a-a5e2-24a2de4117df"]}
        @return:
        """
        return self.send(self.post, stop_task_url, json=payload).json()

    def stop_sessions_api(self, payload: dict):
        """
        sessions停止API
        @param payload: eg  {"ids":["f9e096f6-78ff-431a-a5e2-24a2de4117df"]}
        @return:
        """
        return self.send(self.post, stop_sessions_url, json=payload).json()

    def stop_task(self, task_id: list, sessions_id: list):
        """
        停止任务和会话
        @param task_id:
        @param sessions_id:
        @return:
        """
        task_payload = {"ids": task_id}
        self.stop_task_api(task_payload)
        sessions_id = {"ids": sessions_id}
        self.stop_sessions_api(sessions_id)


def main(catgory_name, case_name):
    suite = Suite()
    cate_id_dict = suite.get_category()
    # 案例库名称可以自己指定
    cate_id = [cate_id_dict["data"][catgory_name]]
    suite.get_cases_category(cate_id)
    cases_dict, case_id_list = suite.get_cases_category(cate_id)
    suite.get_case_detail(case_id_list[0])
    # suite.import_flow("new",r"C:\Users\Administrator\Desktop\test", "test.xosc")
    # 案例名称逻辑
    if case_name:
        case_id_list = cases_dict[case_name]
        # print(f" case_name:{case_name}")
    session_id,task_ids = suite.run_task(caseid=case_id_list)
    time.sleep(10)
    suite.stop_task(task_ids, [session_id])

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main("Automation_5538", "test_4-copy")

返回值

{'method': 'POST', 'url': 'http://172.31.9.85:30083/api-task/tasks/stop', 'headers': {'Content-Type': 'application/json', 'Authorization': 'ae04e5b4-6a7a-4ad4-b476-33f946e717d5'}, 'json': {'ids': ['8d18f2f6-fd8b-48ae-86a0-0a3bbee1763c']}}
response:{"code":0,"data":{}}
{'method': 'POST', 'url': 'http://172.31.9.85:30083/api-task/sessions/stop', 'headers': {'Content-Type': 'application/json', 'Authorization': 'ae04e5b4-6a7a-4ad4-b476-33f946e717d5'}, 'json': {'ids': ['fc25f9de-7a21-4f0d-8906-a830b7e799ab']}}
response:{"code":0,"data":{}}

8.3 暂停任务&重新开始

API

"""
def pause_task_api(self,task_id:str):
    """
    暂停任务
    @param task_id:
    @return:
    """
    return self.send(self.post, self.pause_task_url.format(task_id=task_id),json={}).json()

def resume_task_api(self,task_id:str):
    """
    暂停任务后开始
    @param task_id:
    @return:
    """
    return self.send(self.post, self.resume_task_url.format(task_id=task_id),json={}).json()
    
"""

功能实现

    def pause_task_api(self,task_id:str):
        """
        暂停任务
        @param task_id:
        @return:
        """
        return self.send(self.post, pause_task_url.format(task_id=task_id),json={}).json()

    def resume_task_api(self,task_id:str):
        """
        暂停任务后开始
        @param task_id:
        @return:
        """
        return self.send(self.post, resume_task_url.format(task_id=task_id),json={}).json()


def main(catgory_name, case_name):
    suite = Suite()
    cate_id_dict = suite.get_category()
    # 案例库名称可以自己指定
    cate_id = [cate_id_dict["data"][catgory_name]]
    suite.get_cases_category(cate_id)
    cases_dict, case_id_list = suite.get_cases_category(cate_id)
    suite.get_case_detail(case_id_list[0])
    # suite.import_flow("new",r"C:\Users\Administrator\Desktop\test", "test.xosc")
    # 案例名称逻辑
    if case_name:
        case_id_list = cases_dict[case_name]
        # print(f" case_name:{case_name}")
    session_id ,task_ids = suite.run_task(caseid=case_id_list)
    time.sleep(5)
    suite.stop_task(task_ids, [session_id])
    for i in task_ids:
        suite.pause_task_api("c686a2de-47fa-4604-b354-36f8fe2b9d75")

    time.sleep(5)
    for i in task_ids:
        suite.resume_task_api("c686a2de-47fa-4604-b354-36f8fe2b9d75")

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main("Automation_5538", "test_4-copy")

返回值

{'method': 'GET', 'url': 'http://172.31.9.85:30083/api-case/cases/f5bbfb2c-619a-4812-90c2-75a84247a4db/data', 'headers': {'Content-Type': 'application/json', 'Authorization': 'beff0aaa-a060-4e9e-9f54-58362549b3f0'}}
{'method': 'POST', 'url': 'http://172.31.9.85:30083/api-task/sessions/c686a2de-47fa-4604-b354-36f8fe2b9d75/resume', 'headers': {'Content-Type': 'application/json', 'Authorization': 'beff0aaa-a060-4e9e-9f54-58362549b3f0'}, 'json': {}}
response:{"code":0,"data":{}}

8.4 删除任务

API

"""
def delete_sessions_api(self, payload: dict):
    """
    sessions删除API
    @param payload: eg  {"ids":["f9e096f6-78ff-431a-a5e2-24a2de4117df"]}
    @return:
    """
    return self.send(self.post, delete_sessions_url, json=payload).json()

def delete_task_api(self, payload: dict):
    """
    task删除API
    @param payload: eg  {"ids":["f9e096f6-78ff-431a-a5e2-24a2de4117df"]}
    @return:
    """
    return self.send(self.post, delete_task_url, json=payload).json()
    
"""

功能实现

   def delete_sessions_api(self, payload: dict):
        """
        sessions删除API
        @param payload: eg  {"ids":["f9e096f6-78ff-431a-a5e2-24a2de4117df"]}
        @return:
        """
        return self.send(self.post, delete_sessions_url, json=payload).json()

    def delete_task_api(self, payload: dict):
        """
        task删除API
        @param payload: eg  {"ids":["f9e096f6-78ff-431a-a5e2-24a2de4117df"]}
        @return:
        """
        return self.send(self.post, delete_task_url, json=payload).json()

    def delete_task(self, sessions_id: list, task_id: list):
        """
        删除任务和会话
        @param sessions_id:
        @param task_id:
        @return:
        """
        sessions_id = {"ids": sessions_id}
        self.delete_sessions_api(sessions_id)
        task_payload = {"ids": task_id}
        self.delete_task_api(task_payload)


def main(catgory_name, case_name):
    suite = Suite()
    cate_id_dict = suite.get_category()
    # 案例库名称可以自己指定
    cate_id = [cate_id_dict["data"][catgory_name]]
    suite.get_cases_category(cate_id)
    cases_dict, case_id_list = suite.get_cases_category(cate_id)
    suite.get_case_detail(case_id_list[0])
    # suite.import_flow("new",r"C:\Users\Administrator\Desktop\test", "test.xosc")
    # 案例名称逻辑
    if case_name:
        case_id_list = cases_dict[case_name]
        # print(f" case_name:{case_name}")
    session_id ,task_ids = suite.run_task(caseid=case_id_list)
    time.sleep(5)
    # suite.stop_task(task_ids, [session_id])
    suite.delete_task([session_id],task_ids)

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main("Automation_5538", "test_4-copy")

返回值

{'method': 'POST', 'url': 'http://172.31.9.85:30083/api-task/sessions/delete', 'headers': {'Content-Type': 'application/json', 'Authorization': '5a3662a3-2fbb-4790-b40d-42f600e63801'}, 'json': {'ids': ['bc344bf1-4bd3-493e-a4d3-95f98dcc3e48']}}
response:{"code":0,"data":{}}
{'method': 'POST', 'url': 'http://172.31.9.85:30083/api-task/tasks/delete', 'headers': {'Content-Type': 'application/json', 'Authorization': '5a3662a3-2fbb-4790-b40d-42f600e63801'}, 'json': {'ids': ['18a0fafe-d1e2-404b-b5e8-b4f3ea4c8e86']}}
response:{"code":0,"msg":"success"}

8.5 判断任务是否正在运行

API

    """
def get_result(self, task_id: list, index: int = None):
    """
    :param task_id:
    :return:
    """
    task_id_str = ",".join(task_id)
    # print(task_id_str)
    response = self.send(self.get, url=result_url.format(task_id=task_id_str)).json()
    data = response["data"]["tasks"]
    result = []
    for one in data:
        result.append({"pass": one["pass"], "is_ended": True if one["key"] == "ended" else False})
    if not index:
        return result
    else:
        return result[index]
    """

功能实现

   
    def get_result(self, task_id: list, index: int = None):
        """
        :param task_id:
        :return:
        """
        task_id_str = ",".join(task_id)
        # print(task_id_str)
        response = self.send(self.get, url=result_url.format(task_id=task_id_str)).json()
        data = response["data"]["tasks"]
        result = []
        for one in data:
            result.append({"pass": one["pass"], "is_ended": True if one["key"] == "ended" else False})
        if not index:
            return result
        else:
            return result[index]
    def is_ended(self, task_id: list, case_name=None):
        """
        判断案例是否运行结束
        @param task_id: 任务id
        @param case_name: 案例名称
        @return:
        """
        interval = 10
        while (1):
            result = self.get_result(task_id, -1)
            if result["is_ended"]:
                print('Case status:end of run')
                if case_name:
                    print(f'Name of the current ending case->{case_name}')
                return False
            else:
                print('Case status:in progress')
                if case_name:
                    print(f'Name of the current running case->{case_name}')
                time.sleep(interval)

调用示例

def main(category_name: str, case_name: str = None, vehicle_name=None):
    """
    @param category_name: category_name
    @param case_name:
    @param vehicle_name:vehicle_name
    @return:
    """
    suite = Suite()
    suite.get_category()
    cate_id = suite.get_category()
    cate_name = [cate_id["data"][category_name]]
    cases_dict, case_id_list = suite.get_cases_category(cate_name)

    # 主车控制相关逻辑
    if vehicle_name:
        vehicle_dict = suite.get_vehicle_id([vehicle_name])[1]
        if vehicle_name not in vehicle_dict.keys():
            print(f"vehicle_name:{vehicle_name} inexistence")
            return
        vehicle_id = vehicle_dict[vehicle_name]
    else:
        vehicle_id = None
    print(f"vehicle_name:{vehicle_name},vehicle_id:{vehicle_id}")

    # 案例名称逻辑
    if case_name:
        case_id_list = cases_dict[case_name]
        print(f" case_name:{case_name}")

    # 启动案例
    session_id, task_ids = suite.run_task(case_id_list, vehicle_id=vehicle_id)
    time.sleep(10)
    # suite.stop_task(task_id=task_ids, sessions_id=[session_id])
    # 等待案例运行完毕
    # falg = suite.suite_queue_check()
    # if falg is True:
    #     # 获取案例运行结果
    #     suite.get_task_result(task_ids)
    suite.is_ended(task_ids)


SimOneUrl = "http://172.31.9.85:8088/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main("new", "test")

返回值

response:{"code":0,"data":{"tasks":[{"parentId":"-1","loading":false,"deleted":null,"deleteTime":null,"createAt":"2024-06-19T06:22:14.000Z","updateAt":"2024-06-19T06:22:23.000Z","id":"39e2f0e7-5315-47da-b4d1-79b4427d0beb","userId":"standalone","caseId":"6b6fcd60-2d1f-11ef-905e-8f06be1a01ef","case":{"schema":"casedef"}}

9. 主车相关

9.1 导入主车

API

"""
def delete_sessions_api(self, payload: dict):
    """
    sessions删除API
    @param payload: eg  {"ids":["f9e096f6-78ff-431a-a5e2-24a2de4117df"]}
    @return:
    """
    return self.send(self.post, delete_sessions_url, json=payload).json()

def delete_task_api(self, payload: dict):
    """
    task删除API
    @param payload: eg  {"ids":["f9e096f6-78ff-431a-a5e2-24a2de4117df"]}
    @return:
    """
    return self.send(self.post, delete_task_url, json=payload).json()
    
"""

功能实现

class File:
    """
    文件父类
    """

    def __init__(self, file_path: str):
        if not exists(file_path):
            raise FileNotFoundError
        self._file_path = file_path
        self._data = None
class FileReader(File):
    """
    读取文件
    """

    def __init__(self, file_path: str):
        super(FileReader, self).__init__(file_path)

    def read_byte(self) -> bytes:
        with open(self._file_path, 'rb') as f:
            return f.read()

    def read_str(self) -> str:
        with open(self._file_path, 'r', encoding='utf-8') as f:
            return f.read()

    def read_json(self) -> json:
        with open(self._file_path, 'r', encoding='utf-8') as f:
            return json.load(f)

def import_vehicle_api(self,payload:dict):
    """
        导入主车
        @param payload: eg {"vehicleData": {"byId": {vehicle_id: vehicle_data},
                                      "allIds": [vehicle_id]}}
        @return:
    """
    return self.send(self.post,import_vehicle_url,json=payload).json()

def get_vehicle_data(file_path: str):
    vehicle_info = FileReader(file_path).read_json()
    vehicle_id = vehicle_info.get("id")
    return vehicle_info, vehicle_id

def import_vehicle(self, file_path:str)->str:
    """
    导入主车
    @param file_path: 主车文件
    @return: 主车id
    """
    vehicle_data,vehicle_id = self.get_vehicle_data(file_path)
    payload = {"vehicleData": {"byId": {vehicle_id: vehicle_data},
                                        "allIds": [vehicle_id]}}
    result = self.import_vehicle_api(payload)
    return result["data"]["id"]

def main(suite_name:str=None):
    suite = Suite()
    # suite.get_suite(suite_name)
    suite.import_vehicle('c:/test.json')

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
     main()

返回值

{'method': 'POST', 'url': 'http://172.31.9.85:30083/api-task/sessions/delete', 'headers': {'Content-Type': 'application/json', 'Authorization': '5a3662a3-2fbb-4790-b40d-42f600e63801'}, 'json': {'ids': ['bc344bf1-4bd3-493e-a4d3-95f98dcc3e48']}}
response:{"code":0,"data":{}}
{'method': 'POST', 'url': 'http://172.31.9.85:30083/api-task/tasks/delete', 'headers': {'Content-Type': 'application/json', 'Authorization': '5a3662a3-2fbb-4790-b40d-42f600e63801'}, 'json': {'ids': ['18a0fafe-d1e2-404b-b5e8-b4f3ea4c8e86']}}
response:{"code":0,"msg":"success"}

9.2 删除主车

API

"""
def delete_vehicles_api(self, vehicles_id: str):
    """
    删除主车
    @param vehicles_id: 主车id
    @return:
    """
    url = self.delete_vehicle_url.format(vehicles_id=vehicles_id)
    return self.send(self.delete, url)
    
"""

功能实现

    def delete_vehicles_api(self, vehicles_id: str):
        """
        删除主车
        @param vehicles_id: 主车id
        @return:
        """
        url = delete_vehicle_url.format(vehicles_id=vehicles_id)
        return self.send(self.delete, url)
  
    def delete_vehicle(self,id:str):
        """
        删除主车
        @param id: 主车id
        @return:
        """
        self.delete_vehicles_api(id)
def main(category_name:str,case_name:str=None,vehicle_name:str=None):
    suite = Suite()
    suite.get_category()
    cate_id = suite.get_category()
    cate_name = [cate_id["data"][category_name]]
    cases_dict, case_id_list = suite.get_cases_category(cate_name)
    print(case_name)
    # 主车控制相关逻辑
    if vehicle_name:
        vehicle_dict = suite.get_vehicle_id([vehicle_name])[1]
        if vehicle_name not in vehicle_dict.keys():
            print(f"vehicle_name:{vehicle_name} inexistence")
            return
        vehicle_id = vehicle_dict[vehicle_name]
    else:
        vehicle_id = None
    print(f"vehicle_name:{vehicle_name},vehicle_id:{vehicle_id}")
    suite.delete_vehicle(vehicle_id)

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main("转向冲突", "转向冲突8", "test")

返回值

无返回值

9.3 获取主车信息

API

"""
def get_vehicle_api(self):
    """
    获取主车信息
    @return:
    """
    return self.send(self.get,self.get_vehicle_url).json()
    
"""

功能实现

    def get_vehicle_api(self):
        """
        获取主车信息
        @return:
        """
        return self.send(self.get, get_vehicle_url).json()


    def get_vehicle(self, userid: str = None):
        """
        根据用户信息获取主车信息
        @param userid:
        @return:
        """
        global vehicle_data
        vehicle_data = {}
        res = self.get_vehicle_api()
        # print(f"get_vehicle_api response ->: {res}")
        data = res["data"]["list"]["byId"]
        id_list = res["data"]["list"]["allIds"]
        if userid:
            for id in id_list:
                if data[id]["userId"] == userid:
                    vehicle_data.setdefault(data[id]["name"], data[id]["id"])
        else:
            for id in id_list:
                vehicle_data.setdefault(data[id]["userId"], {}).update({data[id]["name"]: data[id]["id"]})
        print("获取到的主车信息->:{}".format(str(vehicle_data)))
        return vehicle_data

    def get_vehicle_id(self, vehicle_name: list) -> list and dict:
        """
        根据主车名称获取主车ID
        @param vehicle_name:
        @return:
        """
        vehicle_id_dict = {}
        vehicle_id_list = []
        vehicle_list = []
        for i in vehicle_name:
            for k, v in self.get_vehicle().items():
                vehicle_list.append(v)
            for vehicle_dict in vehicle_list:
                # print("vehicle_dict", vehicle_dict)
                vehicle_dict_key = dict(vehicle_dict).items()
                for vehicle_dict_k, vehicle_dict_v in vehicle_dict_key:
                    if i in vehicle_dict_k:
                        vehicle_name_id = vehicle_dict[i]
                        vehicle_id_list.append(vehicle_name_id)
                        vehicle_id_dict.setdefault(i, vehicle_name_id)

        print("vehicle_id_list->:", vehicle_id_list, "\nvehicle_id_dict->:", vehicle_id_dict)
        return vehicle_id_list, vehicle_id_dict



def main(category_name:str,case_name:str=None,vehicle_name:str=None):
    suite = Suite()
    suite = Suite()
    suite.get_category()
    cate_id = suite.get_category()
    cate_name = [cate_id["data"][category_name]]
    cases_dict, case_id_list = suite.get_cases_category(cate_name)

    # 主车控制相关逻辑
    if vehicle_name:
        vehicle_dict = suite.get_vehicle_id([vehicle_name])[1]
        if vehicle_name not in vehicle_dict.keys():
            print(f"vehicle_name:{vehicle_name} inexistence")
            return
        vehicle_id = vehicle_dict[vehicle_name]
    else:
        vehicle_id = None
    print(f"vehicle_name:{vehicle_name},vehicle_id:{vehicle_id}")

    # 案例名称逻辑
    if case_name:
        case_id_list = cases_dict[case_name]
        print(f" case_name:{case_name}")

    # 启动案例
    session_id, task_ids = suite.run_task(case_id_list, vehicle_id=vehicle_id)

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main("转向冲突", "转向冲突8", "手动控制-默认")

返回值

{'method': 'GET', 'url': 'http://172.31.9.85:30083/api-asset/vehicles', 'headers': {'Content-Type': 'application/json', 'Authorization': 'e5033d43-2368-4fd0-9c4b-299c4d3ba8d9'}}

10. 测试集相关

10.1 获取测试集

API

"""
def get_suite_api(self, isProjectItem : str):
	"""
	@param isProjectItem : True(团队) or False(False)
	@return:

	"""
	url = get_suites_url.format(isProjectItem)
	return self.send(self.get, url=url).json()
    
"""

运行测试集


    def get_suite_api(self, isProjectItem : bool):
        """
        @param isProjectItem : True(团队) or False(个人)
        @return:

        """
        url = get_suites_url.format(isProjectItem)
        return self.send(self.get, url=url).json()

    def get_suite(self, suite_name: str = None , isProjectItem : bool = True):
        """
        @param isProjectItem : true(团队) or false(个人)
        :return:
        """
        if  isProjectItem: isProjectItem = "true" 
        else: isProjectItem = "false"
        
        response = self.get_suite_api(isProjectItem)
        data = response['data']
        suite_dict = {}
        for id in data['allIds']:
            suite_dict.setdefault(data['byId'][id]['name'], {}).update({"caseIds": data['byId'][id]['caseIds']})
        print("suite_dict", suite_dict)
        try : 
            if not suite_name:
                return suite_dict
            else:
                # print("suite_dict[suite_name]:",suite_dict[suite_name])
                return suite_dict[suite_name]
        except Exception as e:
            print("suite_name不存在")

def main(suite_name:str=None):
    suite = Suite()
    suite.get_suite(suite_name)

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main("suite_name" , True)

返回值

response:{"code":0,"data":{"byId":{"ea6d0670-0db5-11ef-98b6-f5137907bac2":{"schema":"suite","id":"ea6d0670-0db5-11ef-98b6-f5137907bac2","userId":"standalone","parentId":"","name":"test","created":1715226044506,"lastModified":1715226044506,"createUserRole":1,"caseIds":["f5bbfb2c-619a-4812-90c2-75a84247a4db"]}},"allIds":["ea6d0670-0db5-11ef-98b6-f5137907bac2"]}}
suite_dict {'test': {'caseIds': ['f5bbfb2c-619a-4812-90c2-75a84247a4db']}}

10.2 运行测试集

API

"""
def get_suite_api(self):
    """
    get_suite
    @return:
    """
    return self.send(self.get, url=get_suites_url).json()

def run_suite_api(self, payload:dict):
    """
    run_suite
    @param payload: payload
    @return:
    """
    return self.send(self.post, url=self.run_suite_url, json=payload).json()
    
"""

功能实现

    def get_suite_api(self):
        """
        get_suite
        @return:
        """
        return self.send(self.get, url=get_suites_url).json()


    def get_suite(self,suite_name:str=None):
        """
        :return:
        """
        response = self.get_suite_api()
        data = response['data']
        suite_dict = {}
        for id in data['allIds']:
            suite_dict.setdefault(data['byId'][id]['name'], {}).update({"caseIds": data['byId'][id]['caseIds']})
        print("suite_dict",suite_dict)
        if not suite_name:
            return suite_dict
        else:
            #print("suite_dict[suite_name]:",suite_dict[suite_name])
            return suite_dict[suite_name]

    def run_suite_api(self, payload: dict):
        """
        run_suite
        @param payload: payload
        @return:
        """
        return self.send(self.post, url=run_suite_url, json=payload).json()


    def run_suite(self, suite_name: str, taskName: str = None, vehicle_id: str = None):
        if taskName is None:
            def creat_task_name():
                return "taskName_" + time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime())

            taskName = creat_task_name()
        caseid_list = self.get_suite(suite_name)['caseIds']
        payload = {
            'caseIds': caseid_list,
            'taskName': taskName,
            "type" : "worldsim",
            "enableStateMachine" : False,
            "vehicleConfig" : {
                "Ego" : {
                    "instanceId" : "Ego" ,
                    "id" : "default",
                    "name" : "主车",
                    "classId" :"MKZ",
                    "algorithms" : [
                        {
                        "id" : "Default",
                         "name" : "默认控制器",
                         "algorithmId": "SimOneDriver",
                         }
                    ]
                }
            },
            "withEvaluation" : False
        }
        if vehicle_id:
            # vehicle_module = VehicleBusiness()
            # vehicle_module.get_vehicle()
            payload.update({"overrideVehicleId": vehicle_id})
        response = self.run_suite_api(payload)
        data = response['data']
        return data['sessionId'], data['taskIds']

def main(suite_name:str=None):
    suite = Suite()
    # suite.get_suite(suite_name)
    suite.run_suite(suite_name)

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main("test")

返回值

{'method': 'POST', 'url': 'http://172.31.9.85:30083/api-task/tasks', 'headers': {'Content-Type': 'application/json', 'Authorization': '7d4c315b-dbd1-4f4b-8552-ff66be9346ec'}, 'json': {'caseIds': ['f5bbfb2c-619a-4812-90c2-75a84247a4db'], 'taskName': 'taskName_2024-05-09_11:49:56'}}
response:{"code":0,"data":{"sessionId":"5b4fc9fb-8da4-42f4-8eef-7e4522b85dd4","taskIds":["0b933b59-b4d6-49e8-b656-de52a8561fb6"]}}

10.3 获取运行案例集信息

API

"""
def suite_queue_api(self):
    """
    suite_queue
    @return:
    """
    return self.send(self.get, url=self.queue_status_url).json()
    
"""

功能实现

    def suite_queue_api(self):
        """
        suite_queue
        @return:
        """
        return self.send(self.get, url=queue_status_url).json()


    def suite_queue_check(self,n=10):
        """
        @param n:cycle index
        @return:
        """
        if n==1:
            print("-----------------------The test case run fail--------------------------------------")
            return False
        while(1):
            try:
                assert self.suite_queue_api()
                response = self.suite_queue_api()
                assert response["code"]==0
            except Exception as e:
                return -1
            queue_info = response["data"]
            running,pending,waiting = queue_info["running"],queue_info["pending"],queue_info["waiting"]

            if running == 0 and pending == 0 and waiting == 0:
                print("-----------------------The test case run finish--------------------------------------")
                return True
            else:
                time.sleep(interval)

def main(suite_name:str=None):
    suite = Suite()
    # suite.get_suite(suite_name)
    suite.run_suite(suite_name)
    suite.suite_queue_check()

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main("test")

返回值

{'method': 'GET', 'url': 'http://172.31.9.85:30083/api-task/tasks/queue?own=true', 'headers': {'Content-Type': 'application/json', 'Authorization': 'f083c671-f6fe-4d64-b775-3f0539eca7c9'}}
response:{"code":0,"data":{"total":2,"running":1,"pending":1,"waiting":1,"finished":0,"estimatedTaskDuration":87.22495652173913}}

10.4 获取任务集中的task_id

API

"""
def get_taskassemble_api(self):
    '''
    获取测试案例集
    @param
    @return:
    '''
    return self.send(self.get, get_taskassemble_url).json()
"""

功能实现

    def get_taskassemble_api(self, task_set_id):
        '''
        获取测试案例集
        @param
        @return:
        '''
        return self.send(self.get, get_taskassemble_url.format(task_set_id=task_set_id)).json()
    def get_task_id_of_task_set(self, task_set_id: str) -> list:
        """
        获取任务集合中所有测试案例的task_id
        @param task_set_id: 任务集合id
        @return: list
        """
        response = self.get_taskassemble_api(task_set_id)
        _task_id_list = response["data"]["list"]
        # print("---------------------------------task_id_list-----------------------")
        # print(_task_id_list)
        index = 0
        task_id_list = []
        for i in _task_id_list:
            index += 1
            # print(f"------------------------第{index}个元素----------------")
            # print(i)
            if i['parentId'] == task_set_id:
                # print(i)
                task_id_list.append(i["id"])
        # print("task_id_list",task_id_list)
        return task_id_list
def main(suite_name:str=None):
    suite = Suite()
    res = suite.run_suite(suite_name)
    suite.get_task_id_of_task_set(res[1])

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main("test")

返回值

task_set_id_list  [id1,id2,...]

11. 地图相关

11.1 导入地图

API

"""
def import_map_api(self, payload: dict, files: dict):
    """
    导入地图API
    @param payload:传参字典
    @param files: 传参文件
    @return:
    """
    self.headers = {"projectId": "default"}
    return self.send(self.post, import_map_url, data=payload, files=files).json()
"""

功能实现

    def import_map_api(self, payload: dict, files: dict):
        """
        导入地图API
        @param payload:传参字典
        @param files: 传参文件
        @return:
        """
        self.headers = {"projectId": "default"}
        return self.send(self.post, import_map_url, data=payload, files=files).json()

    def import_map(self, xodr_path: str, thumbnail_path: str):
        """
        导入地图方法
        @param xodr_path: 地图文件
        @param thumbnail_path: 地图对应的图像
        @return:
        """
        payload = {"params": json.dumps(
            {"category": "customized", "id": "new_" + Faker('zh_CN').uuid4(),
             "name": os.path.splitext(os.path.basename(xodr_path))[0], "size": 512,
             "ppm": 10, "bgColor": "#dddddd", "reproject": True, "reprojectOrigin": False, "reprojectOriginLat": 0,
             "reprojectOriginLng": 0, "tags": [], "notes": "",
             "header": {"minX": -210.20535534122396, "minY": -149.68815701999185, "minZ": -1.862645149230957e-9,
                        "maxX": 237.95535534122394, "maxY": 135.43815701999196, "maxZ": 2.7940070024635385e-9,
                        "centerX": 97.12480158531203, "centerY": 24.463606820251727, "centerZ": 100,
                        "localEnuExt": "6378137,0,0;0,1,0;0,0,1;1,0,0"}})}
        files = {'xodr': open(xodr_path, 'rb'), "thumbnail": open(thumbnail_path, 'rb')}
        result = self.import_map_api(payload, files)
        if result["code"] == 0:
            print(f"导入地图{xodr_path}成功,id:{result['data']['mapId']}")
        else:
            print(f"导入地图{xodr_path}失败,返回结果:{result}")
            raise f"导入地图{xodr_path}失败,返回结果:{result}"
        res_dict = {"code": result["code"], "mapId": result["data"]["mapId"],
                    "name": os.path.basename(os.path.basename(xodr_path))}
        return res_dict


def main():
    suite = Suite()

    suite.import_map(r"D:\Project\SimOneAutomation\simone_automation\Sysdata\Public\import\Map\xodr\标志牌测试.xodr",
                     r"D:\Project\SimOneAutomation\simone_automation\Sysdata\Public\import\Map\img\import_map.png"
                     )

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main()

返回值

{'method': 'POST', 'url': 'http://172.31.9.85:30083/api-asset/maps', 'headers': {'Authorization': '92400a5e-0e20-4d34-bfb3-05f9344f4889'}, 'data': {'params': '{"category": "customized", "id": "", "name": "\\u6807\\u5fd7\\u724c\\u6d4b\\u8bd5", "size": 512, "ppm": 10, "bgColor": "#dddddd", "reproject": true, "reprojectOrigin": false, "reprojectOriginLat": 0, "reprojectOriginLng": 0, "tags": [], "notes": "", "header": {"minX": -210.20535534122396, "minY": -149.68815701999185, "minZ": -1.862645149230957e-09, "maxX": 237.95535534122394, "maxY": 135.43815701999196, "maxZ": 2.7940070024635385e-09, "centerX": 97.12480158531203, "centerY": 24.463606820251727, "centerZ": 100, "localEnuExt": "6378137,0,0;0,1,0;0,0,1;1,0,0"}}'}, 'files': {'xodr': <_io.BufferedReader name='D:\\Project\\SimOneAutomation\\simone_automation\\Sysdata\\Public\\import\\Map\\xodr\\标志牌测试.xodr'>, 'thumbnail': <_io.BufferedReader name='D:\\Project\\SimOneAutomation\\simone_automation\\Sysdata\\Public\\import\\Map\\img\\import_map.png'>}}
response:{"code":0,"data":{"mapId":"01ca5bba-d804-4d32-900f-742fa958d47f"}}
import map D:\Project\SimOneAutomation\simone_automation\Sysdata\Public\import\Map\xodr\标志牌测试.xodrsuccess

11.2 获取地图信息

API

"""
def get_map_api(self):
    """
    获取地图
    @return:
    """
    return self.send(self.get, self.get_map_url).json()
"""

功能实现

    def get_map_api(self):
    """
    获取地图
    @return:
    """
    return self.send(self.get, self.get_map_url).json()
    def get_map_id(self,userid:str=None,map_name:str=None):
        """
        按条件获取地图信息,默认获取所有地图
        @param userid: 用户id
        @param map_name: 地图名字
        @return:
        """
        global map_data
        map_data={}
        try:
            result = self.get_map_api()
            # print(len(result['data']['maps']))
            map_info = result['data']['maps']

            if userid:
                for map in map_info:
                    if map["userId"] == userid:
                        map_data.setdefault(map["name"],map["id"])
            elif map_name:
                for map in map_info:
                    if map["name"] == map_name:
                        map_data = {map["name"]: map["id"]}
            else:
                for map in map_info:
                    map_data.setdefault(map["userId"],{}).update({map["name"]: map["id"]})
            return map_data

        except Exception as e:
            print(e)
def main():
    suite = Suite()

    suite.get_map_id()

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main()

返回值

response:{"code":0,"data"...}

11.3 删除地图

API

"""
def delete_map_api(self,payload:dict):
   """
   删除地图
   @param payload: eg:{ids: ["27a0bbc4-5d4d-48c1-9187-62df794dadae"]}
   @return:
   """
   return self.send(self.post, delete_map_url, json=payload).json()
"""

功能实现

    def delete_map_api(self,payload:dict):
        """
        删除地图
        @param payload: eg:{ids: ["27a0bbc4-5d4d-48c1-9187-62df794dadae"]}
        @return:
        """
        return self.send(self.post, delete_map_url, json=payload).json()

    def delete_map(self,id:list):
        """
        批量删除地图
        @param id: 地图id列表
        @return:
        """
        print("删除的地图列表-》{}".format(str(id)))
        payload = {"ids": id}
        self.delete_map_api(payload)


def main():
    suite = Suite()

    suite.delete(["'92400a5e-0e20-4d34-bfb3-05f9344f4889'"])

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main()

返回值

response:{"code":0,"data"...}

12. 判定相关

12.1 获取案例判定信息

API

"""
def get_judgements_api(self, caseId, judgementId=None):
	"""
	获取判定信息API
	@param caseId: 案例ID
	@param judgementId: 判定ID
	@return:
	"""
	if judgementId:
		url = get_judgement_url.format(caseId=caseId,judgementId=judgementId)
	else:
		url = get_judgements_url.format(caseId=caseId)
	response = self.send(self.get, url).json()
	return response
"""

功能实现

    def get_judgements_api(self, caseId, judgementId=None):
        """
        获取判定信息API
        @param caseId: 案例ID
        @param judgementId: 判定ID
        @return:
        """
        if judgementId:
            url = get_judgement_url.format(caseId=caseId,judgementId=judgementId)
        else:
            url = get_judgements_url.format(caseId=caseId)
        response = self.send(self.get, url).json()
        return response

    def get_judgements_method(self, caseId:str, judgementId:str = None):
        """
        获取判定信息
        @param caseId: 案例ID
        @param judgementId: 判定ID
        @return:
        """
        response = self.get_judgements_api(caseId,judgementId)
        print("得到的扩展判定信息:",response["data"][1])
        return response


def main():
    suite = Suite()

    suite.get_judgements_api("ebd4cf06-b66f-4e03-9294-5f60f55d453a")

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main()

返回值

response:{"code":0,"msg":"success","data":[{"id":"timeout","name":"超时","type":"timeout","category":"general","enabled":true,"scope":{"type":"global","position":{"x":0,"y":0,"z":0},"heading":{"x":0,"y":0,"z":0,"w":1},"size":{"x":10,"y":10,"z":0}},"conditions":[{"variable":"timeout","value":600}],"settings":{"action":"failure","logLevel":"error","logInfo":""},"builtIn":true,"lock":true,"userId":"admin","schema":"judgement"},{"id":"collision","name":"碰撞","type":"collision","category":"general","enabled":true,"scope":{"type":"global","position":{"x":0,"y":0,"z":0},"heading":{"x":0,"y":0,"z":0,"w":1},"size":{"x":10,"y":10,"z":0}},"conditions":[],"settings":{"action":"failure","logLevel":"error","logInfo":""},"builtIn":true,"lock":true,"userId":"admin","schema":"judgement"}]}

12.2 更新判定信息

API

"""
def update_judgement_method(self, caseId, judgementId="collision"):
	__collision = {"schema": "judgement", "settings": {"logLevel": "error", "action": "failure", "logInfo": ""},
				   "scope": {"size": {"x": 10, "y": 10, "z": 0}, "heading": {"w": 1, "x": 0, "y": 0, "z": 0},
							 "position": {"x": 0, "y": 0, "z": 0},
							 "type": "global"}, "builtIn": True,
				   "name": "碰撞", "lock": True, "id": "collision",
				   "type": "collision", "category": "general",
				   "conditions": [], "userId": "admin",
				   "enabled": True}
	payload = json.dumps(__collision)
	judgement_url = "api-asset/cases/{caseId}/judgements/{judgementId}"
	response = self.send(self.put, judgement_url.format(caseId=caseId, judgementId=judgementId), data=payload)
	print("更新扩展判定信息")
	print("status_code:", response.status_code)
"""

功能实现

    def update_judgement_method(self, caseId, judgementId="collision"):
        __collision = {"schema": "judgement", "settings": {"logLevel": "error", "action": "failure", "logInfo": ""},
                       "scope": {"size": {"x": 10, "y": 10, "z": 0}, "heading": {"w": 1, "x": 0, "y": 0, "z": 0},
                                 "position": {"x": 0, "y": 0, "z": 0},
                                 "type": "global"}, "builtIn": True,
                       "name": "碰撞", "lock": True, "id": "collision",
                       "type": "collision", "category": "general",
                       "conditions": [], "userId": "admin",
                       "enabled": True}
        payload = json.dumps(__collision)
        judgement_url = "api-asset/cases/{caseId}/judgements/{judgementId}"
        response = self.send(self.put, judgement_url.format(caseId=caseId, judgementId=judgementId), data=payload)
        print("更新扩展判定信息")
        print("status_code:", response.status_code)


def main(category_name: str, case_name: str):
    """
    category_name:案例库名称
    case_name:案例名称
    reset:是否重置判定
    """
    LoginPolicy("standalone")
    suite = Suite()
    suite.get_category()

    # 获取caseid逻辑
    cate_id = suite.get_category()
    cate_name = [cate_id["data"][category_name]]
    cases_dict, case_id_list = suite.get_cases_category(cate_name)
    case_id = cases_dict[case_name][0]
    suite.get_judgements_method(case_id)
    suite.update_judgement_method(case_id)

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main("new", "test_4")

返回值

更新扩展判定信息
status_code: 202

13. 任务运行结果相关

13.1 获取运行案例集信息

API

"""
def get_task_result_api(self,task_id:str):
    """
    获取指定task的结果
    @param task_id:
    @return:
    """
    return self.send(self.get,task_result_url.format(id=task_id)).json()
"""

功能实现

    def get_task_result_api(self,task_id:str):
        """
        获取指定task的结果
        @param task_id:
        @return:
        """
        return self.send(self.get,task_result_url.format(id=task_id)).json()

    def get_task_result(self, task_id_list: list = None) -> list:
        """
        获取任务集中的  案例名称,caseId,task_id,运行结果
        @param task_id_list: 任务id列表
        @param task_set_id: 任务集合id
        @return:
        """

        task_result_list = []
        for task_id in task_id_list:
            result = self.get_task_result_api(task_id)
            case_name = result["data"]["tasks"][0]["case"]["name"]
            case_id = result["data"]["tasks"][0]["caseId"]
            task_id = result["data"]["tasks"][0]["id"]
            task_result = result["data"]["tasks"][0]["pass"]
            if task_result != True: task_result = False
            task_result_list.append({"case_name": case_name,
                                     "case_id": case_id,
                                     "task_id": task_id,
                                     "task_result": task_result})
        print("task_result_list->", task_result_list)
        return task_result_list


def main():
    suite = Suite()
	# 入参id taskid 可以用 run_task方法中的返回值获取
    suite.get_task_result(["51ec8581-4563-49c4-8ce1-ba96962b278b"])

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    main()

返回值

task_result_list-> [{'case_name': 'test_4-copy', 'case_id': 'f5bbfb2c-619a-4812-90c2-75a84247a4db', 'task_id': '51ec8581-4563-49c4-8ce1-ba96962b278b', 'task_result': False}]

13.2 获取任务集列表信息

API

"""
def get_task_list_api(self):
	"""
    获取任务列表集合
    """
    response = self.send(self.get, url=get_task_list_url).json()
"""

功能实现

    def get_task_list_api(self):
        """
        获取任务列表集合
        """
        response = self.send(self.get, url=get_task_list_url).json()

调用示例

SimOneUrl = "http://172.31.9.85:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
LoginPolicy("standalone")
if __name__ == '__main__':
    test = Suite()
    test.get_task_list_api()

返回值

response:{"code":0,"data":{"list":[{"parentId":null,"deleted":null,"deleteTime":null,"createAt":"2024-05-16T02:34:13.000Z","updateAt":"2024-05-16T02:35:08.000Z",....}

13.3 下载测试报告

API

"""
    def export_report_api(self, sessions_id: str):
        """
        导出测试报告API
        @param sessions_id: sessions_id
        @return: Response
        """
        self.headers.update({"Language": "zh-Hans"})
        return self.send(self.get, export_report_url.format(sessions_id=sessions_id)).json()

    def download_report_api(self, sessions_id: str):
        """
        下载测试报告API
        @param sessions_id: sessions_id
        @return: Response
        """

        return self.send(self.get, download_report_url.format(sessions_id=sessions_id)).content
"""

功能实现

    def export_report_api(self, sessions_id: str):
        """
        导出测试报告API
        @param sessions_id: sessions_id
        @return: Response
        """
        self.headers.update({"Language": "zh-Hans"})
        return self.send(self.get, export_report_url.format(sessions_id=sessions_id)).json()

    def download_report_api(self, sessions_id: str):
        """
        下载测试报告API
        @param sessions_id: sessions_id
        @return: Response
        """

        return self.send(self.get, download_report_url.format(sessions_id=sessions_id)).content

    def download_report(self, sessions_id: str, download_path: str, download_name: str) -> None:
        """
        下载测试报告
        @param sessions_id: sessions_id
        @return: Response
        """
        self.export_report_api(sessions_id)
        pdf_data = self.download_report_api(sessions_id)
        download_pathname = os.path.join(download_path, download_name)
        with open(download_pathname, mode="wb") as f:
            f.write(pdf_data)

调用示例

LoginPolicy("standalone")
if __name__ == '__main__':
    test = Suite()		
    					# 测试任务集ID  
    test.download_report("54c0806b-ba80-4821-ae25-49f7c284f486", r"C:\Users\Administrator\Desktop\test", "test.pdf")

返回值

response:{"code":0,"data":{"success":true}}
测试报告二进制数据

14. 测试计划

14.1 获取测试计划列表

API

"""
def testPlan_list_api(self):
	"""
	获取测试计划列表
	"""
	options= {"search":"","types":[],"vehicleIds":[],"algorithms":[],"triggerTypes":[],"notifications":[],"category":"worldsim"}
	json_str = json.dumps(options, indent=4, ensure_ascii=False)  # 缩进美化,保留非ASCII字符
	# encoded_url = quote(json_str, safe="/:?=")  # 保留路径分隔符、查询符号和等号
	return self.send(self.get, url=testPlan_list_url.format(options=json_str)).json()
"""

功能实现


def test_testPlan_list(self):
	"""
	获取测试计划列表
	"""
	response = self.testPlan_list_api()
	return response
	

调用示例

LoginPolicy("standalone")
if __name__ == '__main__':
    test = Suite()		
    					# 测试任务集ID  
    test.test_testPlan_list()

14.2 创建测试计划

API

"""
def create_testPlan_api(self, categoryIds, name, withEvaluation, evaluationPresetId, Default):
	"""
	创建测试计划
	"""
	payload = {
		"data": {"caseSuiteIds": [], "categoryIds": categoryIds, "name": name,
				"type": "1", "vehicleConfig": {"Ego": {"instanceId": "Ego", "id": "default", "classId": "MKZ",
														"algorithms": [{"id": "Default", "name": "默认控制器",
																		"algorithmId": Default},
																	{"id": "AutoDrive", "name": "自驾控制器",
																		"algorithmId": "SimOneDriver"}],
														"name": "主车"}}, "triggerType": "manual",
				"withEvaluation": False, "overrideJudgementId": "-1", "evaluationPresetId": evaluationPresetId,
				"evaluationOperatorId": "-1", "notification": {"type": "none", "condition": "any"},
				"enableStateMachine": False}}

	return self.send(self.post, create_testPlan_url, json=payload).json()
"""

功能实现


def create_testPlan_api(self, categoryIds, name, withEvaluation, evaluationPresetId, Default):
	"""
	创建测试计划
	"""
	payload = {
		"data": {"caseSuiteIds": [], "categoryIds": categoryIds, "name": name,
				"type": "1", "vehicleConfig": {"Ego": {"instanceId": "Ego", "id": "default", "classId": "MKZ",
														"algorithms": [{"id": "Default", "name": "默认控制器",
																		"algorithmId": Default},
																	{"id": "AutoDrive", "name": "自驾控制器",
																		"algorithmId": "SimOneDriver"}],
														"name": "主车"}}, "triggerType": "manual",
				"withEvaluation": False, "overrideJudgementId": "-1", "evaluationPresetId": evaluationPresetId,
				"evaluationOperatorId": "-1", "notification": {"type": "none", "condition": "any"},
				"enableStateMachine": False}}

	return self.send(self.post, create_testPlan_url, json=payload).json()

def test_create_testPlan(self, caseId : list ,test_case_title : str ,withEvaluation , evaluationPresetId , Default):
	"""
	@param caseId: 案例ID
	@param test_case_title: 测试计划名称
	@param withEvaluation: 是否进行评估
	@param evaluationPresetId: 评估预设ID
	@param Default: 主车算法
	@return: response
	"""
	response = self.create_testPlan_api(caseId, test_case_title,
														withEvaluation = withEvaluation,
														evaluationPresetId= 1,
														 Default="SimOneDriver")
	return response

调用示例

LoginPolicy("standalone")
if __name__ == '__main__':
    test = Suite()		
    test..test_create_testPlan(
        caseId = ["ce88f2f9-3634-448b-b1ac-0b13924a0f52"],
        test_case_title="测试计划1",
        withEvaluation = True,
        evaluationPresetId= 1,
        Default="SimOneDriver"
    )

返回值

response:{"code":0,"data":{"id":"d6d1da56-fa4c-4a3b-ac7b-5cc1762dd703","useId":"standalone"}}

14.3 更新测试计划

API

"""
def update_testPlan_api(self, PlanId, categoryIds, name):
	"""
	更新测试计划
	"""
	payload = {
		"data": {"caseSuiteIds": [], "categoryIds": categoryIds, "name": name,
				"type": "1", "vehicleConfig": {"Ego": {"instanceId": "Ego", "id": "default", "classId": "MKZ",
														"algorithms": [{"id": "Default", "name": "默认控制器",
																		"algorithmId": "SimOneDriver"},
																	{"id": "AutoDrive", "name": "自驾控制器",
																		"algorithmId": "Manual"}],
														"name": "主车"}}, "triggerType": "manual",
				"withEvaluation": False, "overrideJudgementId": "-1", "evaluationPresetId": 1,
				"evaluationOperatorId": "-1", "notification": {"type": "none", "condition": "any"},
				"enableStateMachine": False}}
	return self.send(self.put, url=testPlan_id_url.format(id=PlanId), json=payload).json()
"""

功能实现


def update_testPlan_api(self, PlanId, categoryIds, name):
	"""
	更新测试计划
	"""
	payload = {
		"data": {"caseSuiteIds": [], "categoryIds": categoryIds, "name": name,
				"type": "1", "vehicleConfig": {"Ego": {"instanceId": "Ego", "id": "default", "classId": "MKZ",
														"algorithms": [{"id": "Default", "name": "默认控制器",
																		"algorithmId": "SimOneDriver"},
																	{"id": "AutoDrive", "name": "自驾控制器",
																		"algorithmId": "Manual"}],
														"name": "主车"}}, "triggerType": "manual",
				"withEvaluation": False, "overrideJudgementId": "-1", "evaluationPresetId": 1,
				"evaluationOperatorId": "-1", "notification": {"type": "none", "condition": "any"},
				"enableStateMachine": False}}
	return self.send(self.put, url=testPlan_id_url.format(id=PlanId), json=payload).json()

def test_update_testPlan(self, PlanId : str ,caseId : list,  NewCaseTitle):
	"""
	@param PlanId: 测试计划ID
	@param caseId: 案例ID
	@param NewCaseTitle: 新的测试计划名称
	@return: update_response
	"""
	update_response = self.update_testPlan_api(PlanId, caseId, NewCaseTitle)
	return update_response

返回值

response:{"code":0,"data":{"id":"a17cc9a2-9ee9-44f4-94cf-e36f18eff3ae","userId":"standalone"}}

调用示例

LoginPolicy("standalone")
if __name__ == '__main__':
    test = Suite()		
    test.test_update_testPlan(
        PlanId = "a17cc9a2-9ee9-44f4-94cf-e36f18eff3ae",
        caseId = ["ce88f2f9-3634-448b-b1ac-0b13924a0f52"],
        NewCaseTitle="测试计划6"
    )

14.4 删除测试计划

API

"""
def del_testPlan_api(self, ids:list):
	"""
	删除测试计划
	"""
	payload = {"ids": ids}
	return self.send(self.post, url=del_testPlan_url, json=payload).json()
"""

功能实现


def del_testPlan_api(self, ids:list):
	"""
	删除测试计划
	"""
	payload = {"ids": ids}
	return self.send(self.post, url=del_testPlan_url, json=payload).json()

def test_del_testPlan(self, planID):
	"""
	@param planID: 测试计划ID
	@return: del_response
	"""
	del_response = self.del_testPlan_api([planID])
	return del_response

返回值

response:{"code":0,"data":{"deleted":["f804a357-3a0f-414c-88c5-1b889d6ef7b1"]}}

调用示例

LoginPolicy("standalone")
if __name__ == '__main__':
    test = Suite()		
    test.test_del_testPlan("f804a357-3a0f-414c-88c5-1b889d6ef7b1")

14.5 执行测试计划

API

"""
def execute_testPlan_api(self, ids:list):
	
	"""
	执行测试计划
	"""
	payload = {"ids":ids}
	return self.send(self.post, url=execute_testPlan_url, json=payload).json()
"""

功能实现


def execute_testPlan_api(self, ids:list):
	
	"""
	执行测试计划
	"""
	payload = {"ids":ids}
	return self.send(self.post, url=execute_testPlan_url, json=payload).json()

def test_execute_testPlan(self, PlanId : list):
	"""
	执行测试计划
	@param planID: 测试计划ID
	@return: response
	"""
	response = self.execute_testPlan_api(PlanId)
	return response

返回值

response:{"code":0,"data":{"sessionIds":["ff2ec939-b969-41db-99d1-a0ba907dd99c"]}}

调用示例

LoginPolicy("standalone")
if __name__ == '__main__':
    test = Suite()		
    test.test_execute_testPlan(PlanId=["daa18ce2-ca0f-44a4-920d-15c2edcc7130"])

14.6 克隆测试计划

API

"""
def clone_testPlan_api(self, id):
	"""
	克隆测试计划
	"""
	payload = {}
	return self.send(self.post, url=clone_testPlan_url.format(id=id), json=payload).json()
"""

功能实现


def clone_testPlan_api(self, id):
	"""
	克隆测试计划
	"""
	payload = {}
	return self.send(self.post, url=clone_testPlan_url.format(id=id), json=payload).json()

def test_clone_testPlan(self, PlanId):
	"""
	克隆测试计划
	@param planID: 测试计划ID
	@return: response
	"""
	response = self.clone_testPlan_api(PlanId)
	return response

返回值

response:{"code":0,"data":{"id":"4c5ebebc-b313-43f3-83d7-77834d249a46"}}

调用示例


LoginPolicy("standalone")
if __name__ == '__main__':
    test = Suite()		
    test.test_clone_testPlan(PlanId=["daa18ce2-ca0f-44a4-920d-15c2edcc7130"])

14.7 测试计划名称是否已存在

API

"""
def isrepeat_testPlan_api(self, name, type):
	"""
	判断测试计划名称是否重复
	"""
	return self.send(self.get, url=isrepeat_testPlan_url.format(name=name, type=type)).json()
"""

功能实现


def isrepeat_testPlan_api(self, name, type):
	"""
	判断测试计划名称是否重复
	"""
	return self.send(self.get, url=isrepeat_testPlan_url.format(name=name, type=type)).json()

def test_isrepeat_testPlan(self, PlanName):
	"""
	判断测试计划名称是否重复
	@param PlanName: 测试计划名称
	@return: response
	"""
	response = self.isrepeat_testPlan_api(PlanName + time.strftime("_%H:%M:%S", time.localtime()), "worldsim")

返回值


response:{"code":0,"data":{"isRepeat":true}}    # isRepeat 返回true 重复, 反之false 未重复

调用示例


LoginPolicy("standalone")
if __name__ == '__main__':
    test = Suite()		
    test.suite.test_isrepeat_testPlan("测试计划6")

14.8 删除单个测试计划

API

"""
def delfel_testPlan_single_api(self, PlanID:str):
	"""
	删除单个测试计划
	"""
	response = self.send(self.delete, url=del_testPlan_single_url.format(id=PlanID)).json()
	return response
"""

功能实现


def delfel_testPlan_single_api(self, PlanID:str):
	"""
	删除单个测试计划
	"""
	response = self.send(self.delete, url=del_testPlan_single_url.format(id=PlanID)).json()
	return response

def delfel_testPlan_single(self,PlanId : str ):
	"""
	删除单个测试计划
	@param PlanId: 测试计划ID
	@return: response
	"""
	response = self.delfel_testPlan_single_api(PlanId)
	return response

返回值


response:{"code":0,"data":{"id":"ba571f1d-753b-43a0-b0e9-81ff830b3d7d"}}

调用示例


LoginPolicy("standalone")
if __name__ == '__main__':
    test = Suite()		
    test.suite.delfel_testPlan_single("ba571f1d-753b-43a0-b0e9-81ff830b3d7d")

14.9 获取测试计划

API

"""
def get_testPlan_api(self, PlanId):
	"""
	获取测试计划 
	"""
	return self.send(self.get, url= testPlan_id_url.format(id=PlanId)).json()
"""

功能实现


def get_testPlan_api(self, PlanId):
	"""
	获取测试计划 
	"""
	return self.send(self.get, url= testPlan_id_url.format(id=PlanId)).json()

def test_get_testPlan(self, PlanId):
	"""
	获取测试计划 
	@param PlanId: 测试计划ID
	@return: response
	"""
	response = self.get_testPlan_api(PlanId)

返回值


response:
{
	"code": 0,
	"data": {
		"caseSuiteIds": [],
		"categoryIds": ["ce88f2f9-3634-448b-b1ac-0b13924a0f52"],
		"type": "1",
		"vehicleConfig": {
			"Ego": {
				"instanceId": "Ego",
				"id": "default",
				"classId": "MKZ",
				"algorithms": [{
					"id": "Default",
					"name": "默认控制器",
					"algorithmId": "SimOneDriver"
				}],
				"name": "主车"
			}
		},
		"triggerType": "manual",
		"withEvaluation": false,
		"overrideJudgementId": "-1",
		"evaluationPresetId": "-1",
		"evaluationOperatorId": "-1",
		"notification": {
			"type": "none",
			"condition": "any"
		},
		"enableStateMachine": false,
		"updateUserName": "standalone",
		"id": "5debf4fc-487e-41a7-b7ea-13f7a5ca94ab",
		"name": "测试计划",
		"userId": "standalone",
		"createAt": "2025-10-10T02:20:26.000Z",
		"updateAt": "2025-10-10T02:20:26.000Z"
	}
}

调用示例


LoginPolicy("standalone")
if __name__ == '__main__':
    test = Suite()		
    test.suite.delfel_testPlan_single("ba571f1d-753b-43a0-b0e9-81ff830b3d7d")

14. 附录参考代码

import base64
import copy
import hashlib
import math
import random
import shutil
import zipfile
from os.path import exists
from typing import Optional, Tuple, List

import fire
from crypto.Cipher import AES
from faker import Faker
from requests import request, Response
import json
from urllib.parse import urljoin
import os
import time

fake = Faker('zh_CN')
version_url = "version"
interval = 10

# header
header = {
    "Content-Type": "application/json",
}


# Environment Path
class EnvPath:
    token = "TOKEN"


class Prefix:
    UserServer = "api-user"
    CaseServer = "api-case"
    AssetServer = "api-asset"
    TaskServer = "api-task"


# token
_token = os.environ.get(EnvPath.token)

"""  初始化配置相关接口  """
accept_url = Prefix.UserServer + "/users/agreement"  # 接受协议
init_config_url = Prefix.TaskServer + "/system/initConfig"  # 初始化配置
init_serve_url = Prefix.TaskServer + "/system/initServices"  # 初始化服务
conn_master_url = Prefix.TaskServer + "/system/connectMaster"  # 连接master
start_serve_url = Prefix.TaskServer + "/system/start"  # 启动服务

"""  多车增加节点  """
add_worker_url = Prefix.TaskServer + "/worker"  # 增加节点

"""  登录相关接口  """
user_check_url = Prefix.UserServer + "/users/check"
health = Prefix.UserServer + "/health"  # 检查服务是否正常
cloud_login_url = Prefix.UserServer + "/users/login"  # 云端登录
standalone_login_url = Prefix.UserServer + "/users/standalone"  # 单机版登录 -- 已测试
login_check = Prefix.UserServer + "/users/check"  # user check

"""  案例库目录相关接口  """
categories_url = Prefix.CaseServer + "/categories"  # 案例库目录  --已测试
get_categories_url = Prefix.CaseServer + "/categories?withCases=false"  # 获取所有案例库  --已测试
# get_cases_url = "api-case/cases?page=1&filter={category}&pageSize=100"
get_cases_url = Prefix.CaseServer + "/cases?page={page}&filter={category}&pageSize=100"  # 获取当前案例库的所有案例  --已测试
delete_categories_url = Prefix.CaseServer + "/categories/{}"  # 删除案例库目录

"""  上传文件相关接口  """
file_md5_url = Prefix.AssetServer + "/files/{}?raw={}"  # 验证文件MD5  -- 已完成
upload_file_url = Prefix.AssetServer + "/files/{}/upload?raw={}"  # 上传文件  -- 已完成

"""  测试案例相关接口  """
case_import_url = Prefix.AssetServer + "/cases/import"  # 导入案例
case_def_url = Prefix.CaseServer + "/cases/{case_id}"  # 案例概要  -- 已完成
case_details_url = Prefix.CaseServer + "/cases/{case_id}/data"  # 案例详情  -- 已完成
delete_case_url = Prefix.CaseServer + "/trash"  # 删除案例  -- 已完成
convert_replay_case_url = Prefix.TaskServer + "/tasks/{id}/convert2case"  # 转换回放案例
update_case_url = "api-asset/cases/update"  # 编辑案例

"""  测试任务相关接口  """
create_task_url = Prefix.TaskServer + "/tasks"  # 创建task  --已测试
stop_task_url = Prefix.TaskServer + "/tasks/stop"  # 停止task --已完成
stop_sessions_url = Prefix.TaskServer + "/sessions/stop"  # 会话停止  --已完成
delete_task_url = Prefix.TaskServer + "/tasks/delete"  # 删除task  --已完成
delete_sessions_url = Prefix.TaskServer + "/sessions/delete"  # 删除停止  --已完成
result_url = Prefix.TaskServer + "/tasks/task?taskIds={task_id}"  # 删除停止  案例运行结果详情 -- 已测试
pause_task_url = Prefix.TaskServer + "/sessions/{task_id}/pause"  # 任务暂停
resume_task_url = Prefix.TaskServer + "/sessions/{task_id}/resume"  # 任务暂停后开始
task_result_url = Prefix.TaskServer + "/tasks/task?taskIds={id}"  # 删除停止  案例运行结果详情  -- 已测试

"""  主车相关接口  """
import_vehicle_url = Prefix.AssetServer + "/vehicles/import"  # 导入主车  --已完成
delete_vehicle_url = Prefix.AssetServer + "/vehicles/{vehicles_id}"  # 删除主车  --已完成
get_vehicle_url = Prefix.AssetServer + "/vehicles"  # 获取主车信息  --已完成

"""  地图相关接口  """
import_map_url = Prefix.AssetServer + "/maps"  # 导入地图  --已完成
delete_map_url = Prefix.CaseServer + "/maps/trash"  # 删除地图  --已完成
get_map_url = Prefix.CaseServer + "/maps?pageSize=100"  # 获取地图相关信息
road_service_url = Prefix.TaskServer + "/sce"  # 启动路网服务
laneTypeIfInside_url = Prefix.TaskServer + "/sce/{map_id}/location/laneTypeIfInside"  # 监测点是否在车道内

"""  数据驱动相关接口  """
import_data_driven_url = Prefix.AssetServer + "/dd"  # 导入数据驱动源
delete_data_dirven_url = Prefix.AssetServer + "/dd/{id}"  # 删除数据驱动源

""" 资源相关接口 """
import_asset_url = Prefix.AssetServer + "/assets"  # 资源导入
asset_repeat_url = Prefix.AssetServer + "/assets/isrepeat"  # 资源查重
get_asset_url = Prefix.AssetServer + "/assets?schema={}"  # 获取资源信息
delete_asset_url = Prefix.AssetServer + "/assets/{}"  # 删除对应资源

"""  案例导出相关  """
export_case_url = Prefix.AssetServer + "/cases/export"  # 案例导出
download_case_url = Prefix.AssetServer + "/download/{pake_id}"  # 案例下载

"""  测试案例集相关  """
get_suites_url = Prefix.AssetServer + "/suites?isProjectItem={}&opponent=openscenario"  # 获取测试集
run_suite_url = Prefix.TaskServer + "/tasks"  # 运行测试集
get_taskassemble_url = Prefix.TaskServer + "/tasks?finished=true&page=0&pageSize=12&own=true&expandedIds={task_set_id}"  # 获取测试案例集合
queue_status_url = Prefix.TaskServer + "/tasks/queue?own=true"  # 套件运行状态检查  -- 已测试

"""  案例判定相关  """
get_judgements_url = Prefix.AssetServer + "/cases/{caseId}/judgements"
# get_judgement_url = get_judgements_url + "/{judgementId}"
get_judgement_url = "api-asset/cases/{caseId}/judgements/{judgementId}"


"""  测试计划相关接口  """
testPlan_list_url = Prefix.TaskServer + "/testPlan?page=1&pageSize=20&options={options}"  # 获取测试计划列表
testPlan_id_url = Prefix.TaskServer + "/testPlan/{id}"  # 获取测试计划 / 更新测试计划(包含重命名)
create_testPlan_url = Prefix.TaskServer + "/testPlan"  # 创建测试计划
del_testPlan_url = Prefix.TaskServer + "/testPlan/delete"  # 删除多个测试计划
del_testPlan_single_url = Prefix.TaskServer + "/testPlan/{id}"  # 删除单个测试计划
execute_testPlan_url = Prefix.TaskServer + "/testPlan/execute"  # 执行测试计划
clone_testPlan_url = Prefix.TaskServer +"/testPlan/{id}/clone"  # 复制测试计划
isrepeat_testPlan_url = Prefix.TaskServer +"/testPlan/isrepeat?name={name}&type={type}"  # 测试计划名称是否已存在


# f"api-asset/cases/{caseId}/judgements/{judgementId}"
# url = f"api-asset/cases/{caseId}/judgements"

class UrlCodeType:
    unquote = "unquote"
    quote = "quote"


class File:
    """
    文件父类
    """

    def __init__(self, file_path: str):
        if not exists(file_path):
            raise FileNotFoundError
        self._file_path = file_path
        self._data = None


class FileReader(File):
    """
    读取文件
    """

    def __init__(self, file_path: str):
        super(FileReader, self).__init__(file_path)

    def read_byte(self) -> bytes:
        with open(self._file_path, 'rb') as f:
            return f.read()

    def read_str(self) -> str:
        with open(self._file_path, 'r', encoding='utf-8') as f:
            return f.read()

    def read_json(self) -> json:
        with open(self._file_path, 'r', encoding='utf-8') as f:
            return json.load(f)


class RquestApi():
    def __init__(self):
        super(RquestApi, self).__init__()
        self._headers = {"Content-Type": "application/json", "ProjectId": 'default'}
        self._token = os.environ.get(EnvPath.token)
        self._host_ip = SimOneUrl
        self.get = "GET"
        self.post = "POST"
        self.delete = "DELETE"
        self.put = "PUT"

    def send(self, method: str, url: str, **kwargs) -> Response:
        """
        发送请求
        method: 请求方法
        url: 请求地址
        kwargs: 请求参数
        """
        self.headers.update({"Authorization": self.token})
        complete_url = urljoin(self._host_ip, url)
        data = dict()
        data['method'] = method
        data['url'] = complete_url
        data['headers'] = self.headers
        data.update(kwargs)
        print(data)
        response = request(verify=False, **data)
        try:
            print("response:%s" % str(response.content.decode("utf-8")))
            self._status_code = response.status_code
            self._response_code = response.json().get("code")
        except:
            pass
        return response

    @property
    def token(self):
        return self._token

    @property
    def headers(self):
        return self._headers

    @headers.setter
    def headers(self, value):
        self._headers = value


class LoginBusiness(RquestApi):

    def cloud_login_api(self, userinfo: dict):
        """
        云端登录API
        @param userinfo:
        @return:
        """
        return self.send(self.post, cloud_login_url, json=userinfo).json()

    def check_username_api(self, username: str):
        """
        校验用户名称是否合格
        @param username:
        @return:
        """
        payload = {'username': username}
        return self.send(self.post, login_check, json=payload).json()

    def getCheckToken(self) -> str:
        """
        校验token是否合格
        @return:
        """
        getCheckUrl = os.path.join(SimOneUrl, user_check_url)
        payload = {'username': loginData['username']}
        response = self.send(self.post, getCheckUrl, json=payload)
        checkToken = json.loads(response.content.decode("utf-8"))['data']['checkToken']
        return checkToken

    def pad(self, text):
        """
        填充函数,使被加密数据的字节码长度是block_size的整数倍
        @param text:
        @return:
        """
        length = AES.block_size
        count = len(text.encode('utf-8'))
        add = length - (count % length)
        entext = text + (chr(add) * add)
        return entext

    def str_aes(self, string: str, key: str):
        """
        aes 加密
        @param string:
        @param key:
        @return:
        """
        aes = AES.new(key.encode("utf-8"), AES.MODE_ECB)
        res = aes.encrypt(self.pad(string).encode("utf8"))
        msg = str(base64.b64encode(res), encoding="utf8")
        return msg

    def get_str_aes(self, string: str, username: str, key: str = "eGeVQh0lRyq41I41") -> str:
        '''
        @param string:
        @param username:
        @param key: AES加密的key
        @return:
        '''
        check_token = self.check_username_api(username)
        target_string = check_token["data"]["checkToken"] + string
        msg = self.str_aes(target_string, key)
        return msg

    @property
    def _cloud_std_login(self):
        """
        云端版本登陆方式
        @return:
        """
        try:
            new_login_data = copy.deepcopy(loginData)
            new_login_data['password'] = self.get_str_aes(string=new_login_data['password'],
                                                          username=new_login_data['username'])
            print('cloud login data:' + str(new_login_data))
            response = self.cloud_login_api(userinfo=new_login_data)
            print("-------", response)
            print('cloud login request result:' + str(response))
            token = response['data']['token']
            os.environ[EnvPath.token] = token
            print(token)
            return token
        except Exception as e:
            print(str(e))
            raise print('get token error ')

    @property
    def _enterprise_std_login(self):
        """
        单机版本登录方式
        @return:
        """
        try:
            getLoginUrl = os.path.join(SimOneUrl, standalone_login_url)
            response = self.send(self.get, url=getLoginUrl)
            token = json.loads(response.content.decode("utf-8"))['data']['token']
            os.environ[EnvPath.token] = token
        except Exception as e:
            print(str(e))
            token = ""
            # raise AssertionError('get token error ')
        return token


# 登录入口
class LoginPolicy():
    standalone = "standalone"
    cloud = "cloud"

    def __new__(cls, env_name):
        if env_name == cls.standalone:
            return LoginBusiness()._enterprise_std_login
        elif env_name == cls.cloud:
            return LoginBusiness()._cloud_std_login
        else:
            return print("----------input error---------------")


class Suite(RquestApi):

    def __init__(self):
        super(Suite, self).__init__()
        self._cyclical = 0
        self.category_name = "Automation_" + str(random.randint(0, 9999))

    def create_category_api(self, cate_name: str):
        '''
        创建案例库
        @param cate_name: 案例库名
        @return:
        '''
        payload = {"parentId": "", "name": cate_name}
        return self.send(self.post, categories_url, json=payload).json()

    def create_category(self, cate_name: str = None) -> str:
        '''
        创建案例库
        @param cate_name:
        @return:
        '''
        if not cate_name:
            cate_name = self.category_name
        print(f"案例库目录名称:{cate_name}")
        response = self.create_category_api(cate_name)
        cate_id = response["data"]["categoryId"]
        print(f"案例库目录 id :{cate_id}")
        return cate_id

    def get_category_api(self):
        """
        获取案例库列表
        @return: response
        """
        return self.send(self.get, categories_url).json()

    def get_category(self, userid: str = None) -> dict:
        """
        获取案例库
        @param userid: 用户id desc:admin->builtIn
        @return:
        """
        # 初始化一个包含data和parentId的字典,用于存储案例库信息
        cate_dict = {"data": {}, "parentId": {}}
        # 调用API获取案例库数据
        response = self.get_category_api()
        # 获取案例库总数
        cate_dict["total"] = response["data"]["total"]
        # 如果没有提供userid,获取所有案例库目录信息
        if not userid:
            print("获取当前所有的案例库目录信息")
            # 遍历所有案例库,将名称和ID、名称和父ID存入字典
            for one in response["data"]["categories"]:
                cate_dict["data"].setdefault(one["name"], one["id"])
                cate_dict["parentId"].setdefault(one["name"], one["parentId"])
        else:
            # 如果提供了userid,只获取该用户的案例库目录信息
            print("获取用户{}的案例库目录信息".format(userid))
            # 遍历所有案例库,筛选出属于指定用户的案例库
            for one in response["data"]["categories"]:
                if one["userId"] == userid:
                    # 将用户案例库的名称和ID、名称和父ID存入字典
                    cate_dict["data"].setdefault(one["name"], one["id"])
                    cate_dict["parentId"].setdefault(one["name"], one["parentId"])
        # 打印获取到的案例库信息
        print("获取到的案例库->%s" % str(cate_dict))
        # 返回包含案例库信息的字典
        return cate_dict

    def get_category_case_api(self, cate_id: str):
        """
        获取当前案例库所有案例
        @param cate_name: 案例库名
        @return:
        """
        return self.send(self.get, get_cases_url.format(category=cate_id)).json()

    def get_cases_category(self, cate_id: list):
        """
        获取当前案例库的所有案例
        @param cate_id: 案例库目录id
        @return:
        """

        def url_code(url: str, way: str):
            from urllib import parse
            if way == "unquote":
                return parse.unquote(url)
            elif way == "quote":
                return parse.quote(url)

        page_size = 100
        # print("get all case id from api by category_id")
        _filter = {"keyword": "", "categories": cate_id}
        string = json.dumps(_filter).replace(" ", "")
        filter = url_code(string, UrlCodeType.quote).replace("%", "%25")
        response = self.send(self.get, get_cases_url.format(page="1", category=filter)).json()
        # get case id
        # print("get_cases_category 入参cate_id:%s" % cate_id)
        cases_num = response['data']['total']
        if cases_num >= page_size:
            if cases_num % page_size == 0:
                page = cases_num // page_size
            else:
                page = cases_num // page_size + 1
        else:
            page = 1

        def update_case_id(response, page_size):
            case_name = response['data']['caseDefs'][page_size]['name']
            case_id = response['data']['caseDefs'][page_size]['id']
            case_id_list.append(case_id)
            cases_dict.setdefault(case_name, [case_id])

        cases_dict = {}
        case_id_list = []
        # 如果案例数量是100以内
        if cases_num <= page_size:
            for i in range(cases_num):
                update_case_id(response=response, page_size=i)
        else:
            # 如果案例数量超过100了,固定获取案例数量为100个就翻页
            for i in range(page):
                response = self.send(self.get, self.get_cases_url.format(page=str(i + 1), category=filter)).json()
                # cases_num % page_size 是取整表示没有到达最后一页
                # if i == 0 or i > 0 and cases_num % page_size == 0:
                if i + 1 != page:
                    # print("xxx page_size:",page_size)
                    for j in range(page_size):
                        # print("xxx j:",j)
                        update_case_id(response=response, page_size=j)
                # 到最后一页了,因为案例数量不固定所以需要单独计算获取的个数
                else:
                    for j in range(cases_num % page_size):
                        update_case_id(response=response, page_size=j)
        return cases_dict, case_id_list

    def get_case_detail_api(self, case_id: str):
        """
        获取案例信息
        @param : /{case_id}/data
        @return: response
        """
        url = case_details_url.format(case_id=case_id)
        return self.send(self.get, url).json()

    def get_case_detail(self, case_id: str):
        """

        @param case_id: 测试案例id
        @return:
        """
        response = self.get_case_detail_api(case_id)
        print(f"调用 get_case_detail : {response}")
        return response
    def get_version(self):

        res = self.send(self.get, url= version_url).json()
        version = res["data"]["profile"]["version"]
        return version
    # def run_task(self, caseid: list, vehicle_id: str = None):
    #     """
    #     :param caseid:
    #     :return:
    #     """
    #
    #     def creat_task_name():
    #         return "taskName_" + time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime())
    #
    #     taskname = creat_task_name()
    #     payload = {"caseIds": caseid, "taskName": taskname, "speed": 1}
    #     if vehicle_id:
    #         payload.update({"overrideVehicleId": vehicle_id})
    #     response = self.send(self.post, url=create_task_url, json=payload).json()
    #     print(f"run_task response:{response}")  # taskIds为空时代表运行案例异常
    #     data = response['data']
    #     return data['sessionId'], data['taskIds']

    def run_task(self, case_ids: List[str], vehicle_id: Optional[str] = None, withEvaluation: bool = False, version: str = "3.5.0", type: str = "worldsim")-> Tuple[str, List[str]]:
        """
        运行测试任务集合
        Run task set
        @param caseid: caseid
        @param vehicle_id: vehicle_id
        @param withEvaluation: withEvaluation
        @param version: version
        @return:
        """
        task_name = "AutoTest_" + time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime())
        payload = {}
        if version >= "3.6.3":
            payload = {"caseIds": case_ids, "taskName": task_name,
             "type": type, "withEvaluation": False, "vehicleConfig": {
                "Ego": {"instanceId": "Ego", "id": "default", "name": "主车", "classId": "MKZ",
                        "algorithms": [{"id": "Default", "name": "默认控制器", "algorithmId": "SimOneDriver"},
                                       {"id": "AutoDrive", "name": "自驾控制器", "algorithmId": "SimOneDriver"}]}},
             "notification": {"type": "none", "condition": "any"}, "enableStateMachine": False, "priority": 0}
        elif version >= "3.5.0":
            payload = {"caseIds": case_ids,
                       "taskName": task_name,
                       "withEvaluation": withEvaluation,
                       "controllers": [{"id": "Default", "name": "默认控制器", "algorithmId": "SimOneDriver"},
                                       {"id": "AutoDrive", "name": "自驾控制器", "algorithmId": "SimOneDriver"}]}
        elif version < "3.5.0":
            payload = {"caseIds": case_ids, "taskName": task_name, "speed": 1}
        # elif version > "3.1.1":
        #     payload = {"caseIds": case_ids, "taskName": task_name, "speed": 1}
        # else:
        #     payload = {"caseIds":case_ids,"taskName":task_name,"withEvaluation":True,"type":"worldsim"} # 现代云端 内置案例有坑
        if vehicle_id:
            payload.update({"overrideVehicleId": vehicle_id})
        response = self.send(self.post, url=create_task_url, json=payload).json()

        data = response['data']
        return data['sessionId'], data['taskIds']

    def stop_task_api(self, payload: dict):
        """
        task停止API
        @param payload: eg  {"ids":["f9e096f6-78ff-431a-a5e2-24a2de4117df"]}
        @return:
        """
        return self.send(self.post, stop_task_url, json=payload).json()

    def stop_sessions_api(self, payload: dict):
        """
        sessions停止API
        @param payload: eg  {"ids":["f9e096f6-78ff-431a-a5e2-24a2de4117df"]}
        @return:
        """
        return self.send(self.post, stop_sessions_url, json=payload).json()

    def stop_task(self, task_id: list, sessions_id: list):
        """
        停止任务和会话
        @param task_id:
        @param sessions_id:
        @return:
        """
        task_payload = {"ids": task_id}
        self.stop_task_api(task_payload)
        sessions_id = {"ids": sessions_id}
        self.stop_sessions_api(sessions_id)

    def pause_task_api(self, task_id: str):
        """
        暂停任务
        @param task_id:
        @return:
        """
        return self.send(self.post, pause_task_url.format(task_id=task_id), json={}).json()

    def resume_task_api(self, task_id: str):
        """
        暂停任务后开始
        @param task_id:
        @return:
        """
        return self.send(self.post, resume_task_url.format(task_id=task_id), json={}).json()

    def delete_sessions_api(self, payload: dict):
        """
        sessions删除API
        @param payload: eg  {"ids":["f9e096f6-78ff-431a-a5e2-24a2de4117df"]}
        @return:
        """
        return self.send(self.post, delete_sessions_url, json=payload).json()

    def delete_task_api(self, payload: dict):
        """
        task删除API
        @param payload: eg  {"ids":["f9e096f6-78ff-431a-a5e2-24a2de4117df"]}
        @return:
        """
        return self.send(self.post, delete_task_url, json=payload).json()

    def delete_task(self, sessions_id: list, task_id: list):
        """
        删除任务和会话
        @param sessions_id:
        @param task_id:
        @return:
        """
        sessions_id = {"ids": sessions_id}
        self.delete_sessions_api(sessions_id)
        task_payload = {"ids": task_id}
        self.delete_task_api(task_payload)

    def import_vehicle_api(self, payload: dict):
        """
        导入主车
        @param payload: eg {"vehicleData": {"byId": {vehicle_id: vehicle_data},
                                      "allIds": [vehicle_id]}}
        @return:
        """
        return self.send(self.post, import_vehicle_url, json=payload).json()

    def get_vehicle_data(self, file_path: str):
        vehicle_info = FileReader(file_path).read_json()
        vehicle_id = vehicle_info.get("id")
        return vehicle_info, vehicle_id

    def import_vehicle(self, file_path: str) -> str:
        """
        导入主车
        @param file_path: 主车文件
        @return: 主车id
        """
        vehicle_data, vehicle_id = self.get_vehicle_data(file_path)
        payload = {"vehicleData": {"byId": {vehicle_id: vehicle_data},
                                   "allIds": [vehicle_id]}}
        result = self.import_vehicle_api(payload)
        return result["data"]["id"]

    def get_vehicle_api(self):
        """
        获取主车信息
        @return:
        """
        return self.send(self.get, get_vehicle_url).json()

    def get_vehicle(self, userid: str = None):
        """
        根据用户信息获取主车信息
        @param userid:
        @return:
        """
        global vehicle_data
        vehicle_data = {}
        res = self.get_vehicle_api()
        # print(f"get_vehicle_api response ------------------------->: {res}")
        data = res["data"]["list"]["byId"]
        id_list = res["data"]["list"]["allIds"]
        if userid:
            for id in id_list:
                if data[id]["userId"] == userid:
                    vehicle_data.setdefault(data[id]["name"], data[id]["id"])
        else:
            for id in id_list:
                vehicle_data.setdefault(data[id]["userId"], {}).update({data[id]["name"]: data[id]["id"]})
        print("获取到的主车信息->:{}".format(str(vehicle_data)))
        return vehicle_data

    def get_vehicle_id(self, vehicle_name: list) -> list and dict:
        """
        根据主车名称获取主车ID
        @param vehicle_name:
        @return:
        """
        vehicle_id_dict = {}
        vehicle_id_list = []
        vehicle_list = []
        for i in vehicle_name:
            for k, v in self.get_vehicle().items():
                vehicle_list.append(v)
            for vehicle_dict in vehicle_list:
                # print("vehicle_dict", vehicle_dict)
                vehicle_dict_key = dict(vehicle_dict).items()
                for vehicle_dict_k, vehicle_dict_v in vehicle_dict_key:
                    if i in vehicle_dict_k:
                        vehicle_name_id = vehicle_dict[i]
                        vehicle_id_list.append(vehicle_name_id)
                        vehicle_id_dict.setdefault(i, vehicle_name_id)

        print("vehicle_id_list->:", vehicle_id_list, "\nvehicle_id_dict->:", vehicle_id_dict)
        return vehicle_id_list, vehicle_id_dict

    def delete_vehicles_api(self, vehicles_id: str):
        """
        删除主车
        @param vehicles_id: 主车id
        @return:
        """
        url = delete_vehicle_url.format(vehicles_id=vehicles_id)
        return self.send(self.delete, url)

    def delete_vehicle(self, id: str):
        """
        删除主车的方法
        @param id: 主车id
        @return:
        """
        response = self.delete_vehicles_api(id)

    def get_suite_api(self, isProjectItem : bool):
        """
        @param isProjectItem : True(团队) or False(False)
        @return:

        """
        url = get_suites_url.format(isProjectItem)
        return self.send(self.get, url=url).json()

    def get_suite(self, suite_name: str = None , isProjectItem : bool = True):
        """
        @param isProjectItem : true(团队) or false(个人)
        :return:
        """
        if  isProjectItem: isProjectItem = "true" 
        else: isProjectItem = "false"
        
        response = self.get_suite_api(isProjectItem)
        data = response['data']
        suite_dict = {}
        for id in data['allIds']:
            suite_dict.setdefault(data['byId'][id]['name'], {}).update({"caseIds": data['byId'][id]['caseIds']})
        print("suite_dict", suite_dict)
        try : 
            if not suite_name:
                return suite_dict
            else:
                # print("suite_dict[suite_name]:",suite_dict[suite_name])
                return suite_dict[suite_name]
        except Exception as e:
            print("suite_name不存在")

    def run_suite_api(self, payload: dict):
        """
        run_suite
        @param payload: payload
        @return:
        """
        return self.send(self.post, url=run_suite_url, json=payload).json()

    def run_suite(self, suite_name: str, taskName: str = None, vehicle_id: str = None):
        if taskName is None:
            def creat_task_name():
                return "taskName_" + time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime())

            taskName = creat_task_name()
        caseid_list = self.get_suite(suite_name)['caseIds']
        payload = {
            'caseIds': caseid_list,
            'taskName': taskName,
            "type" : "worldsim",
            "enableStateMachine" : False,
            "vehicleConfig" : {
                "Ego" : {
                    "instanceId" : "Ego" ,
                    "id" : "default",
                    "name" : "主车",
                    "classId" :"MKZ",
                    "algorithms" : [
                        {
                        "id" : "Default",
                         "name" : "默认控制器",
                         "algorithmId": "SimOneDriver",
                         }
                    ]
                }
            },
            "withEvaluation" : False
        }
        if vehicle_id:
            # vehicle_module = VehicleBusiness()
            # vehicle_module.get_vehicle()
            payload.update({"overrideVehicleId": vehicle_id})
        response = self.run_suite_api(payload)
        data = response['data']
        return data['sessionId'], data['taskIds']

    def suite_queue_api(self):
        """
        suite_queue
        @return:
        """
        return self.send(self.get, url=queue_status_url).json()

    def suite_queue_check(self, n=10):
        """
        @param n:cycle index
        @return:
        """
        if n == 1:
            print("-----------------------The test case run fail--------------------------------------")
            return False
        while (1):
            try:
                assert self.suite_queue_api()
                response = self.suite_queue_api()
                assert response["code"] == 0
            except Exception as e:
                return -1
            queue_info = response["data"]
            running, pending, waiting = queue_info["running"], queue_info["pending"], queue_info["waiting"]

            if running == 0 and pending == 0 and waiting == 0:
                print("-----------------------The test case run finish--------------------------------------")
                return True
            else:
                time.sleep(interval)

    def get_result(self, task_id: list, index: int = None):
        """
        :param task_id:
        :return:
        """
        task_id_str = ",".join(task_id)
        # print(task_id_str)
        response = self.send(self.get, url=result_url.format(task_id=task_id_str)).json()
        data = response["data"]["tasks"]
        result = []
        for one in data:
            result.append({"pass": one["pass"], "is_ended": True if one["key"] == "ended" else False})
        if not index:
            return result
        else:
            return result[index]

    def is_ended(self, task_id: list, case_name=None):
        """
        判断案例是否运行结束
        @param task_id: 任务id
        @param case_name: 案例名称
        @return:
        """
        interval = 10
        while (1):
            result = self.get_result(task_id, -1)
            if result["is_ended"]:
                print('Case status:end of run')
                if case_name:
                    print(f'Name of the current ending case->{case_name}')
                return False
            else:
                print('Case status:in progress')
                if case_name:
                    print(f'Name of the current running case->{case_name}')
                time.sleep(interval)

    def get_taskassemble_api(self, task_set_id):
        '''
        获取测试案例集
        @param
        @return:
        '''
        return self.send(self.get, get_taskassemble_url.format(task_set_id=task_set_id)).json()

    def get_task_id_of_task_set(self, task_set_id: str) -> list:
        """
        获取任务集合中所有测试案例的task_id
        @param task_set_id: 任务集合id
        @return: list
        """
        response = self.get_taskassemble_api(task_set_id)
        _task_id_list = response["data"]["list"]
        # print("---------------------------------task_id_list-----------------------")
        # print(_task_id_list)
        index = 0
        task_id_list = []
        for i in _task_id_list:
            index += 1
            # print(f"------------------------第{index}个元素----------------")
            # print(i)
            if i['parentId'] == task_set_id:
                # print(i)
                task_id_list.append(i["id"])
        # print("task_id_list",task_id_list)
        return task_id_list

    def get_task_result_api(self, task_id: str):
        """
        获取指定task的结果
        @param task_id:
        @return:
        """
        return self.send(self.get, task_result_url.format(id=task_id)).json()

    def get_task_result(self, task_id_list: list = None) -> list:
        """
        获取任务集中的  案例名称,caseId,task_id,运行结果
        @param task_id_list: 任务id列表
        @param task_set_id: 任务集合id
        @return:
        """

        task_result_list = []
        print(f"task_id_list ===== >>> {task_id_list} ")
        for task_id in task_id_list:
            result = self.get_task_result_api(task_id)
            case_name = result["data"]["tasks"][0]["case"]["name"]
            case_id = result["data"]["tasks"][0]["caseId"]
            task_id = result["data"]["tasks"][0]["id"]
            task_result = result["data"]["tasks"][0]["pass"]
            if task_result != True: task_result = False
            task_result_list.append({"case_name": case_name,
                                     "case_id": case_id,
                                     "task_id": task_id,
                                     "task_result": task_result})
        print("task_result_list->", task_result_list)
        return task_result_list

    def import_map_api(self, payload: dict, files: dict):
        """
        导入地图API
        @param payload:传参字典
        @param files: 传参文件
        @return:
        """
        self.headers = {"projectId": "default"}
        return self.send(self.post, import_map_url, data=payload, files=files).json()

    def import_map(self, xodr_path: str, thumbnail_path: str):
        """
        导入地图方法
        @param xodr_path: 地图文件
        @param thumbnail_path: 地图对应的图像
        @return:
        """
        map_name = "map_" + str(random.randint(0, 9999))
        payload = {"params": json.dumps(
            {"category": "customized", "id": "", "name": os.path.splitext(os.path.basename(xodr_path))[0], "size": 512,
             "ppm": 10, "bgColor": "#dddddd", "reproject": True, "reprojectOrigin": False, "reprojectOriginLat": 0,
             "reprojectOriginLng": 0, "tags": [], "notes": "",
             "header": {"minX": -210.20535534122396, "minY": -149.68815701999185, "minZ": -1.862645149230957e-9,
                        "maxX": 237.95535534122394, "maxY": 135.43815701999196, "maxZ": 2.7940070024635385e-9,
                        "centerX": 97.12480158531203, "centerY": 24.463606820251727, "centerZ": 100,
                        "localEnuExt": "6378137,0,0;0,1,0;0,0,1;1,0,0"}})}
       
        files = {'xodr': open(xodr_path, 'rb'), "thumbnail": open(thumbnail_path, 'rb')}
        result = self.import_map_api(payload, files)
        if result["code"] == 0:
            print(f"import map {xodr_path}success")
        else:
            raise f"import map {xodr_path} fail"

    def get_map_api(self):
        """
        获取地图
        @return:
        """
        return self.send(self.get, get_map_url).json()

    def get_map_id(self, userid: str = None, map_name: str = None):
        """
        按条件获取地图信息,默认获取所有地图
        @param userid: 用户id
        @param map_name: 地图名字
        @return:
        """
        global map_data
        map_data = {}
        try:
            result = self.get_map_api()
            # print(len(result['data']['maps']))
            map_info = result['data']['maps']

            if userid:
                for map in map_info:
                    if map["userId"] == userid:
                        map_data.setdefault(map["name"], map["id"])
            elif map_name:
                for map in map_info:
                    if map["name"] == map_name:
                        map_data = {map["name"]: map["id"]}
            else:
                for map in map_info:
                    map_data.setdefault(map["userId"], {}).update({map["name"]: map["id"]})
            return map_data

        except Exception as e:
            print(e)

    def delete_map_api(self, payload: dict):
        """
        删除地图
        @param payload: eg:{ids: ["27a0bbc4-5d4d-48c1-9187-62df794dadae"]}
        @return:
        """
        return self.send(self.post, delete_map_url, json=payload).json()

    def delete_map(self, id: list):
        """
        批量删除地图
        @param id: 地图id列表
        @return:
        """
        print("删除的地图列表-》{}".format(str(id)))
        payload = {"ids": id}
        self.delete_map_api(payload)

    def get_judgements_api(self, caseId, judgementId=None):
        """
        获取判定信息API
        @param caseId: 案例ID
        @param judgementId: 判定ID
        @return:
        """
        if judgementId:
            url = get_judgement_url.format(caseId=caseId, judgementId=judgementId)
        else:
            url = get_judgements_url.format(caseId=caseId)
        response = self.send(self.get, url).json()
        return response

    def get_judgements_method(self, caseId: str, judgementId: str = None):
        """
        获取判定信息
        @param caseId: 案例ID
        @param judgementId: 判定ID
        @return:
        """
        response = self.get_judgements_api(caseId, judgementId)
        print("得到的扩展判定信息:", response["data"][1])
        return response

    def update_judgement_api(self, caseId, judgementId="collision", payload: dict = None):
        """
        @param caseId: caseId
        @param judgementId: judgementId
        @return:
        """
        if not payload:
            payload = {"schema": "judgement", "settings": {"logLevel": "error", "action": "failure", "logInfo": ""},
                       "scope": {"size": {"x": 10, "y": 10, "z": 0}, "heading": {"w": 1, "x": 0, "y": 0, "z": 0},
                                 "position": {"x": 0, "y": 0, "z": 0},
                                 "type": "global"}, "builtIn": True,
                       "name": "碰撞", "lock": True, "id": "collision",
                       "type": "collision", "category": "general",
                       "conditions": [], "userId": "admin",
                       "enabled": True}
        response = self.send(self.put, get_judgement_url.format(caseId=caseId, judgementId=judgementId),
                             data=json.dumps(payload))
        return response

    def update_judgement_method(self, caseId, judgementId="collision"):
        __collision = {"schema": "judgement", "settings": {"logLevel": "error", "action": "failure", "logInfo": ""},
                       "scope": {"size": {"x": 10, "y": 10, "z": 0}, "heading": {"w": 1, "x": 0, "y": 0, "z": 0},
                                 "position": {"x": 0, "y": 0, "z": 0},
                                 "type": "global"}, "builtIn": True,
                       "name": "碰撞", "lock": True, "id": "collision",
                       "type": "collision", "category": "general",
                       "conditions": [], "userId": "admin",
                       "enabled": True}
        payload = json.dumps(__collision)
        judgement_url = "api-asset/cases/{caseId}/judgements/{judgementId}"
        response = self.send(self.put, judgement_url.format(caseId=caseId, judgementId=judgementId), data=payload)
        print("更新扩展判定信息")
        print("status_code:", response.status_code)

    """ ——————————————————————————【0721新增编辑案例及依赖的接口】———————————————————————— """

    def update_case(self, case_id: str, **kwargs):
        """
        编辑案例
        @param case_id: 案例ID
        @param map_name: 地图名称
        @param map_id: 地图ID
        @param vehicle_id: 主车ID
        @param AbsoluteTargetSpeed: 主车初始状态绝对目标速度
        @param WorldPosition_x: 主车初始状态世界位置x
        @param WorldPosition_y: 主车初始状态世界位置y
        @param WorldPosition_z: 主车初始状态世界位置z
        @return: 更新主车或地图等信息后的案例
        """
        # 提取参数(设置默认值)
        map_name = kwargs.get("map_name")
        map_id = kwargs.get("map_id")
        vehicle_id = kwargs.get("vehicle_id")
        absolute_target_speed = kwargs.get("absolute_target_speed", 0.0)
        worldposition_x = kwargs.get("worldposition_x", 0.0)
        worldposition_y = kwargs.get("worldposition_y", 0.0)
        worldposition_z = kwargs.get("worldposition_z", 0.0)

        case_def = self.case_def_api(case_id=case_id)
        case_data = self.case_data_api(case_id=case_id)
        # print("临时测试 原case_def:", case_def)
        # print("临时测试 原case_data:", case_data)
        # print("update_case case_id:", case_id)
        # print("update_case map_name:", map_name)
        # print("update_case map_id:", map_id)
        # print("update_case vehicle_id:", vehicle_id)
        # 修改主车
        if vehicle_id:
            vehicle_name = self.get_vehicle_name(vehicle_id)
            case_data["data"]["openSCENARIO"]["Entities"]["ScenarioObject"][0]["Vehicle"][
                "name"] = vehicle_name
            case_data["data"]["openSCENARIO"]["Entities"]["ScenarioObject"][0]["Vehicle"]["Properties"][
                "Property"][0]["value"] = vehicle_id
            for key in case_data["data"]["openSCENARIO"]["Entities"]["ScenarioObject"][0]["Vehicle"]["Properties"][
                "Property"]:
                if key["name"] == 'model':
                    key["value"] = vehicle_id
                    print(key)
                if key["name"] == 'name':
                    key["value"] = vehicle_name
                    print(key)
        # 修改地图
        if map_id:
            case_def["data"]["data"]["mapId"] = map_id
            case_def["data"]["data"]["mapName"] = map_name.split(".")[0]
            case_data["data"]["openSCENARIO"]["RoadNetwork"]["LogicFile"]["filepath"] = map_id + "." + \
                                                                                        map_name.split(".")[1]
        case_payload = {"casedef": case_def["data"]["data"],
                        "casedata": case_data["data"],
                        "needRefresh": True}
        # 修改主车初始状态世界位置
        if worldposition_x:
            privates = case_data["data"]["openSCENARIO"]['Storyboard']['Init']['Actions']['Private']
            for private in privates:
                if private["entityRef"] == "Ego":
                    for PrivateAction in private["PrivateAction"]:
                        if PrivateAction.get('TeleportAction'):
                            PrivateAction['TeleportAction']['Position']['WorldPosition']['x'] = worldposition_x
                            PrivateAction['TeleportAction']['Position']['WorldPosition']['y'] = worldposition_y
                            PrivateAction['TeleportAction']['Position']['WorldPosition']['z'] = worldposition_z
        # 修改主车初始状态绝对目速度
        if absolute_target_speed:
            privates = case_data["data"]["openSCENARIO"]['Storyboard']['Init']['Actions']['Private']
            for private in privates:
                if private["entityRef"] == "Ego":
                    for PrivateAction in private["PrivateAction"]:
                        if PrivateAction.get('LongitudinalAction'):
                            PrivateAction['LongitudinalAction']['SpeedAction']['SpeedActionTarget']['AbsoluteTargetSpeed']['value'] = absolute_target_speed

        response = self.update_case_api(payload=case_payload)
        # print("临时测试 update_case response",response)
        if response['code'] != 0:
            raise Exception("编辑案例异常,接口返回:" + response)
        return response

    def case_def_api(self, case_id: str):
        """
        案例概要
        @param case_id:
        @return:
        """
        return self.send(self.get, case_def_url.format(case_id=case_id)).json()

    def case_data_api(self, case_id: str):
        """
        案例详情
        @param case_id:
        @return:
        """
        return self.send(self.get, case_details_url.format(case_id=case_id)).json()

    def get_vehicle_name(self, vehicle_id: str):
        res = self.get_vehicle_api()
        print(f"get_vehicle_api response: {res}")
        data = res["data"]["list"]["byId"]
        for key in data:
            if key == vehicle_id:
                print(f"找到主车ID为{vehicle_id}的主车名称:{data[key]['name']}")
                return data[key]["name"]
        print(f"没有找到主车ID为{vehicle_id}的主车")
        return None

    def update_case_api(self, payload: dict):
        """
        案例编辑
        @param payload:
        @return:
        """
        # print("********* update_case_api payload********",payload)
        self.headers.update({"Content-Type": "application/json"})
        return self.send(self.post, update_case_url, data=json.dumps(payload)).json()
    

    def testPlan_list_api(self):
        """
        获取测试计划列表
        """
        options= {"search":"","types":[],"vehicleIds":[],"algorithms":[],"triggerTypes":[],"notifications":[],"category":"worldsim"}
        json_str = json.dumps(options, indent=4, ensure_ascii=False)  # 缩进美化,保留非ASCII字符
        # encoded_url = quote(json_str, safe="/:?=")  # 保留路径分隔符、查询符号和等号
        return self.send(self.get, url=testPlan_list_url.format(options=json_str)).json()

    def test_testPlan_list(self):
        """
        获取测试计划列表
        """
        response = self.testPlan_list_api()
        return response

    def get_testPlan_api(self, PlanId):
        """
        获取测试计划 
        """
        return self.send(self.get, url= testPlan_id_url.format(id=PlanId)).json()

    def test_get_testPlan(self, PlanId):
        """
        获取测试计划 
        @param PlanId: 测试计划ID
        @return: response
        """
        response = self.get_testPlan_api(PlanId)
        return response

    def create_testPlan_api(self, categoryIds, name, withEvaluation, evaluationPresetId, Default):
        """
        创建测试计划
        """
        payload = {
            "data": {"caseSuiteIds": [], "categoryIds": categoryIds, "name": name,
                    "type": "1", "vehicleConfig": {"Ego": {"instanceId": "Ego", "id": "default", "classId": "MKZ",
                                                            "algorithms": [{"id": "Default", "name": "默认控制器",
                                                                            "algorithmId": Default},
                                                                        {"id": "AutoDrive", "name": "自驾控制器",
                                                                            "algorithmId": "SimOneDriver"}],
                                                            "name": "主车"}}, "triggerType": "manual",
                    "withEvaluation": False, "overrideJudgementId": "-1", "evaluationPresetId": evaluationPresetId,
                    "evaluationOperatorId": "-1", "notification": {"type": "none", "condition": "any"},
                    "enableStateMachine": False}}

        return self.send(self.post, create_testPlan_url, json=payload).json()

    def test_create_testPlan(self, caseId : list ,test_case_title : str ,withEvaluation , evaluationPresetId , Default):
        """
        @param caseId: 案例ID
        @param test_case_title: 测试计划名称
        @param withEvaluation: 是否进行评估
        @param evaluationPresetId: 评估预设ID
        @param Default: 主车算法
        @return: response
        """
        response = self.create_testPlan_api(caseId, test_case_title,
                                                            withEvaluation = withEvaluation,
                                                            evaluationPresetId= 1,
                                                             Default="SimOneDriver")
        return response
    
    def update_testPlan_api(self, PlanId, categoryIds, name):
        """
        更新测试计划
        """
        payload = {
            "data": {"caseSuiteIds": [], "categoryIds": categoryIds, "name": name,
                    "type": "1", "vehicleConfig": {"Ego": {"instanceId": "Ego", "id": "default", "classId": "MKZ",
                                                            "algorithms": [{"id": "Default", "name": "默认控制器",
                                                                            "algorithmId": "SimOneDriver"},
                                                                        {"id": "AutoDrive", "name": "自驾控制器",
                                                                            "algorithmId": "Manual"}],
                                                            "name": "主车"}}, "triggerType": "manual",
                    "withEvaluation": False, "overrideJudgementId": "-1", "evaluationPresetId": 1,
                    "evaluationOperatorId": "-1", "notification": {"type": "none", "condition": "any"},
                    "enableStateMachine": False}}
        return self.send(self.put, url=testPlan_id_url.format(id=PlanId), json=payload).json()
    
    def test_update_testPlan(self, PlanId : str ,caseId : list,  NewCaseTitle):
        """
        @param PlanId: 测试计划ID
        @param caseId: 案例ID
        @param NewCaseTitle: 新的测试计划名称
        @return: update_response
        """
        update_response = self.update_testPlan_api(PlanId, caseId, NewCaseTitle)
        return update_response

    def delfel_testPlan_single_api(self, PlanID:str):
        """
        删除单个测试计划
        """
        response = self.send(self.delete, url=del_testPlan_single_url.format(id=PlanID)).json()
        return response
    
    def delfel_testPlan_single(self,PlanId : str ):
        """
        删除单个测试计划
        @param PlanId: 测试计划ID
        @return: response
        """
        response = self.delfel_testPlan_single_api(PlanId)
        return response

    def del_testPlan_api(self, ids:list):
        """
        删除多个测试计划
        """
        payload = {"ids": ids}
        return self.send(self.post, url=del_testPlan_url, json=payload).json()

    def test_del_testPlan(self, planID):
        """
        删除多个测试计划
        @param planID: 测试计划ID
        @return: del_response
        """
        del_response = self.del_testPlan_api([planID])
        return del_response
    

    def execute_testPlan_api(self, ids:list):
        
        """
        执行测试计划
        """
        payload = {"ids":ids}
        return self.send(self.post, url=execute_testPlan_url, json=payload).json()

    def test_execute_testPlan(self, PlanId : list):
        """
        执行测试计划
        @param planID: 测试计划ID
        @return: response
        """
        response = self.execute_testPlan_api(PlanId)
        return response


    def clone_testPlan_api(self, id):
        """
        克隆测试计划
        """
        payload = {}
        return self.send(self.post, url=clone_testPlan_url.format(id=id), json=payload).json()
    
    def test_clone_testPlan(self, PlanId : str):
        """
        克隆测试计划
        @param planID: 测试计划ID
        @return: response
        """
        response = self.clone_testPlan_api(PlanId)
        return response


    def isrepeat_testPlan_api(self, name, type):
        """
        判断测试计划名称是否重复
        """
        return self.send(self.get, url=isrepeat_testPlan_url.format(name=name, type=type)).json()
    
    def test_isrepeat_testPlan(self, PlanName):
        """
        判断测试计划名称是否重复
        @param PlanName: 测试计划名称
        @return: response
        """
        response = self.isrepeat_testPlan_api(PlanName + time.strftime("_%H:%M:%S", time.localtime()), "worldsim")

        return response


def main(category_name: str, case_name: str = None, vehicle_name=None,version: str = None):
    """
    运行任务的入口主函数
    @param category_name: category_name
    @param case_name:
    @param vehicle_name:vehicle_name
    @param version: version
    @return:
    """
    suite = Suite()
    
    # suite.create_category()  # 创建案例库测试通过
    if not version:
        version = suite.get_version()
        print("请求simone version接口提取版本号为:" + version)
    cate_id = suite.get_category()
    cate_name = [cate_id["data"][category_name]]
    cases_dict, case_id_list = suite.get_cases_category(cate_name)
    

    # # 编辑案例,参数:案例ID、初始化状态主车绝对目标速度、初始状态主车世界位置
    # suite.update_case(case_id = "f1c2b1f0-9cce-11f0-b520-f5c71358353c", absolute_target_speed = 111.123, worldposition_x = -334.9999998954549, worldposition_y = -3.000050967810888, worldposition_z = 1.1)

    # 主车控制相关逻辑
    if vehicle_name:
        vehicle_dict = suite.get_vehicle_id([vehicle_name])[1]
        if vehicle_name not in vehicle_dict.keys():
            print(f"vehicle_name:{vehicle_name} inexistence")
            return
        vehicle_id = vehicle_dict[vehicle_name]
    else:
        vehicle_id = None
    print(f"vehicle_name:{vehicle_name},vehicle_id:{vehicle_id}")

    # 案例名称逻辑
    if case_name:
        case_id_list = cases_dict[case_name]
        print(f" case_name:{case_name}")

    # 启动案例
    session_id, task_ids = suite.run_task(case_id_list, vehicle_id=vehicle_id, version=version)
    time.sleep(10)
    
    # suite.stop_task(task_id=task_ids, sessions_id=[session_id])
    # 等待案例运行完毕
    falg = suite.suite_queue_check()
    if falg is True:
        # 获loginData取案例运行结果
        suite.get_task_result(task_ids)


SimOneUrl = "http://127.0.0.1:30083/"
loginData = {
    "username": 'admin',
    "password": 'admin',
}
# LoginPolicy("cloud")
LoginPolicy("standalone")
if __name__ == '__main__':
    # main("转向冲突", "转向冲突8",None, "3.7.0")
    fire.Fire(main)
    # eg:
    #    1. Run one case :python runSimOneCase.py --category_name='入门案例' --case_name='构建标准案例2.0'
    #    2. Run one case and switch the vehicle :python runSimOneCase.py --category_name='入门案例' --case_name='构建标准案例2.0' --vehicle_name='手动控制-默认'
    #    3. Run all cases under the case category :python runSimOneCase.py --category_name='入门案例'
    #    4. Run all cases under the case category and switch the vehicle :python runSimOneCase.py --category_name='入门案例’ --vehicle_name='手动控制-默认'