SimOneStreamingAPI 源代码

from SimOneIOStruct import *

StreamingImageCbFuncType = CFUNCTYPE(c_void_p,POINTER(SimOne_Data_Image))
StreamingPointCloudCbFuncType = CFUNCTYPE(c_void_p,POINTER(SimOne_Data_Point_Cloud))

SIMONEAPI_StreamingPOINTCLOUD_CB = None
SIMONEAPI_StreamingIMAGE_CB = None

def _simoneapi_streamingpointcloud_cb(data):
	global SIMONEAPI_StreamingPOINTCLOUD_CB
	SIMONEAPI_StreamingPOINTCLOUD_CB(data)
	
def _simoneapi_streamingimage_cb(data):
	global SIMONEAPI_StreamingIMAGE_CB
	SIMONEAPI_StreamingIMAGE_CB(data)

simoneapi_streamingpointcloud_cb_func = StreamingPointCloudCbFuncType(_simoneapi_streamingpointcloud_cb)
simoneapi_streamingimage_cb_func = StreamingImageCbFuncType(_simoneapi_streamingimage_cb)

[文档] def SoGetStreamingImage(ip, port, imageData): """获取图像数据\n Get image streaming data :param ip: UDP ip, the machine which SimOneAPI running on :type ip: string :param port: UDP port :type port: int :param imageData: image streaming data :type imageData: streaming :return: executive outcome: success/faile :rtype: bool """ _input = create_string_buffer(ip.encode(), 256) SimoneStreamingAPI.GetStreamingImage.restype = c_bool return SimoneStreamingAPI.GetStreamingImage(_input, port, pointer(imageData))
[文档] def SoApiSetStreamingImageUpdateCB(ip, port, cb): """获取图像数据回调\n Register the callback func for applying image streaming data :param ip: UDP ip, the machine which SimOneAPI running on :type ip: string :param port: UDP port :type port: int :param cb: callback func for applying image streaming data: cb(imageData) :type cb: function :return: executive outcome: success/faile :rtype: bool """ global SIMONEAPI_StreamingIMAGE_CB _input = create_string_buffer(ip.encode(), 256) SimoneStreamingAPI.SetStreamingImageUpdateCB(_input, port, simoneapi_streamingimage_cb_func) SIMONEAPI_StreamingIMAGE_CB = cb
[文档] def SoGetStreamingPointCloud(ip, port,infoPort, pointCloudData): """获取点云数据\n Get point cloud streaming data :param ip: UDP ip, the machine which SimOneAPI running on :type ip: string :param port: UDP port :type port: int :param infoPort: UDP info port :type infoPort: int :param pointCloudData: point cloud streaming data :type pointCloudData: streaming :return: executive outcome: success/faile :rtype: bool """ _input = create_string_buffer(ip.encode(), 256) SimoneStreamingAPI.GetStreamingPointCloud.restype = c_bool return SimoneStreamingAPI.GetStreamingPointCloud(_input, port, infoPort,pointer(pointCloudData))
[文档] def SoApiSetStreamingPointCloudUpdateCB(ip, port,infoPort, cb): """获取点云数据回调\n Register the callback func for applying point cloud streaming data :param ip: UDP ip, the machine which SimOneAPI running on :type ip: string :param port: UDP port :type port: int :param infoPort: UDP info port :type infoPort: int :param cb: callback func for applying point cloud streaming data: cb(point_cloudData) :type cb: function :return: executive outcome: success/faile :rtype: bool """ global SIMONEAPI_StreamingPOINTCLOUD_CB _input = create_string_buffer(ip.encode(), 256) SimoneStreamingAPI.SetStreamingPointCloudUpdateCB(_input, port,infoPort, simoneapi_streamingpointcloud_cb_func) SIMONEAPI_StreamingPOINTCLOUD_CB = cb
[文档] def SoApiGetStreamingRadar4DPointCloud(ip, port, pointCloudData): """获取4D激光雷达点云数据\n Get point cloud streaming data :param ip: UDP ip, the machine which SimOneAPI running on :type ip: string :param port: UDP port :type port: int :param infoPort: UDP info port :type infoPort: int :param pointCloudData: point cloud streaming data :type pointCloudData: streaming :return: executive outcome: success/faile :rtype: bool """ _input = create_string_buffer(ip.encode(), 256) SimoneStreamingAPI.GetStreamingRadar4DPointCloud.restype = c_bool return SimoneStreamingAPI.GetStreamingRadar4DPointCloud(_input, port, pointer(pointCloudData))
[文档] def SoApiSetStreamingRadar4DPointCloudUpdateCB(ip, port, cb): """获取4D激光雷达点云数据回调\n Register the callback func for applying point cloud streaming data :param ip: UDP ip, the machine which SimOneAPI running on :type ip: string :param port: UDP port :type port: int :param infoPort: UDP info port :type infoPort: int :param cb: callback func for applying point cloud streaming data: cb(point_cloudData) :type cb: function :return: executive outcome: success/faile :rtype: bool """ global SIMONEAPI_StreamingPOINTCLOUD_CB _input = create_string_buffer(ip.encode(), 256) SimoneStreamingAPI.SetStreamingRadar4DPointCloudUpdateCB(_input, port, simoneapi_streamingpointcloud_cb_func) SIMONEAPI_StreamingPOINTCLOUD_CB = cb