from SimOneIOStruct import *
StreamingImageCbFuncType = CFUNCTYPE(c_void_p,POINTER(SimOne_Data_Image))
StreamingPointCloudCbFuncType = CFUNCTYPE(c_void_p,POINTER(SimOne_Data_Point_Cloud))
SIMONEAPI_StreamingPOINTCLOUD_CB = None
SIMONEAPI_StreamingIMAGE_CB = None
def _simoneapi_streamingpointcloud_cb(data):
global SIMONEAPI_StreamingPOINTCLOUD_CB
SIMONEAPI_StreamingPOINTCLOUD_CB(data)
def _simoneapi_streamingimage_cb(data):
global SIMONEAPI_StreamingIMAGE_CB
SIMONEAPI_StreamingIMAGE_CB(data)
simoneapi_streamingpointcloud_cb_func = StreamingPointCloudCbFuncType(_simoneapi_streamingpointcloud_cb)
simoneapi_streamingimage_cb_func = StreamingImageCbFuncType(_simoneapi_streamingimage_cb)
[文档]
def SoGetStreamingImage(ip, port, imageData):
"""获取图像数据\n
Get image streaming data
:param ip: UDP ip, the machine which SimOneAPI running on
:type ip: string
:param port: UDP port
:type port: int
:param imageData: image streaming data
:type imageData: streaming
:return: executive outcome: success/faile
:rtype: bool
"""
_input = create_string_buffer(ip.encode(), 256)
SimoneStreamingAPI.GetStreamingImage.restype = c_bool
return SimoneStreamingAPI.GetStreamingImage(_input, port, pointer(imageData))
[文档]
def SoApiSetStreamingImageUpdateCB(ip, port, cb):
"""获取图像数据回调\n
Register the callback func for applying image streaming data
:param ip: UDP ip, the machine which SimOneAPI running on
:type ip: string
:param port: UDP port
:type port: int
:param cb: callback func for applying image streaming data: cb(imageData)
:type cb: function
:return: executive outcome: success/faile
:rtype: bool
"""
global SIMONEAPI_StreamingIMAGE_CB
_input = create_string_buffer(ip.encode(), 256)
SimoneStreamingAPI.SetStreamingImageUpdateCB(_input, port, simoneapi_streamingimage_cb_func)
SIMONEAPI_StreamingIMAGE_CB = cb
[文档]
def SoGetStreamingPointCloud(ip, port,infoPort, pointCloudData):
"""获取点云数据\n
Get point cloud streaming data
:param ip: UDP ip, the machine which SimOneAPI running on
:type ip: string
:param port: UDP port
:type port: int
:param infoPort: UDP info port
:type infoPort: int
:param pointCloudData: point cloud streaming data
:type pointCloudData: streaming
:return: executive outcome: success/faile
:rtype: bool
"""
_input = create_string_buffer(ip.encode(), 256)
SimoneStreamingAPI.GetStreamingPointCloud.restype = c_bool
return SimoneStreamingAPI.GetStreamingPointCloud(_input, port, infoPort,pointer(pointCloudData))
[文档]
def SoApiSetStreamingPointCloudUpdateCB(ip, port,infoPort, cb):
"""获取点云数据回调\n
Register the callback func for applying point cloud streaming data
:param ip: UDP ip, the machine which SimOneAPI running on
:type ip: string
:param port: UDP port
:type port: int
:param infoPort: UDP info port
:type infoPort: int
:param cb: callback func for applying point cloud streaming data: cb(point_cloudData)
:type cb: function
:return: executive outcome: success/faile
:rtype: bool
"""
global SIMONEAPI_StreamingPOINTCLOUD_CB
_input = create_string_buffer(ip.encode(), 256)
SimoneStreamingAPI.SetStreamingPointCloudUpdateCB(_input, port,infoPort, simoneapi_streamingpointcloud_cb_func)
SIMONEAPI_StreamingPOINTCLOUD_CB = cb
[文档]
def SoApiGetStreamingRadar4DPointCloud(ip, port, pointCloudData):
"""获取4D激光雷达点云数据\n
Get point cloud streaming data
:param ip: UDP ip, the machine which SimOneAPI running on
:type ip: string
:param port: UDP port
:type port: int
:param infoPort: UDP info port
:type infoPort: int
:param pointCloudData: point cloud streaming data
:type pointCloudData: streaming
:return: executive outcome: success/faile
:rtype: bool
"""
_input = create_string_buffer(ip.encode(), 256)
SimoneStreamingAPI.GetStreamingRadar4DPointCloud.restype = c_bool
return SimoneStreamingAPI.GetStreamingRadar4DPointCloud(_input, port, pointer(pointCloudData))
[文档]
def SoApiSetStreamingRadar4DPointCloudUpdateCB(ip, port, cb):
"""获取4D激光雷达点云数据回调\n
Register the callback func for applying point cloud streaming data
:param ip: UDP ip, the machine which SimOneAPI running on
:type ip: string
:param port: UDP port
:type port: int
:param infoPort: UDP info port
:type infoPort: int
:param cb: callback func for applying point cloud streaming data: cb(point_cloudData)
:type cb: function
:return: executive outcome: success/faile
:rtype: bool
"""
global SIMONEAPI_StreamingPOINTCLOUD_CB
_input = create_string_buffer(ip.encode(), 256)
SimoneStreamingAPI.SetStreamingRadar4DPointCloudUpdateCB(_input, port, simoneapi_streamingpointcloud_cb_func)
SIMONEAPI_StreamingPOINTCLOUD_CB = cb