#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ SMEM Python接口模块 提供对共享内存库的访问,用于读取摄像头图像和向量数据 """ import os import sys import ctypes from ctypes import * from typing import Tuple, List, Dict, Optional, Any # 定义错误码常量 SMEM_SUCCESS = 0 SMEM_ERROR_INVALID_PARAM = -1 SMEM_ERROR_CONFIG = -2 SMEM_ERROR_CREATE_SHM = -3 SMEM_ERROR_NO_MEMORY = -4 SMEM_ERROR_BUSY = -5 SMEM_ERROR_NOT_FOUND_CAMERID = -6 SMEM_ERROR_NOT_FOUND_FRAMEID = -7 SMEM_ERROR_INVALID_STATE = -8 SMEM_ERROR_INVALID_SIZE = -9 # 定义原子类型 class atomic_size_t(Structure): _fields_ = [("__a", c_size_t)] class atomic_int(Structure): _fields_ = [("__a", c_int)] # 修正ActiveCameraList结构体定义 class ActiveCameraList(Structure): _fields_ = [ ("camera_ids_offset", c_size_t), # 摄像头ID数组相对偏移量 ("count", atomic_size_t), # 当前活动摄像头数量 ("capacity", atomic_size_t), # 数组容量 ("lock", atomic_int) # 用于同步的锁 ] # 修正ReadImageData结构体定义 class ReadImageData(Structure): _fields_ = [ ("frame_id", c_uint64), ("clock_time", c_uint64), ("image_size", c_size_t), ("image_data", POINTER(c_char)) ] # 加载共享库 def load_smem_library(): """加载共享内存库""" try: # 尝试从不同位置加载库 lib_paths = [ "./libsmem.so", # 当前目录 "/lib/x86_64-linux-gnu/libsmem.so", # 系统库目录 os.path.join(os.path.dirname(os.path.abspath(__file__)), "libsmem.so") # 模块目录 ] for path in lib_paths: if os.path.exists(path): return CDLL(path) # 如果找不到指定路径,尝试直接按名称加载 return CDLL("libsmem.so") except Exception as e: print(f"加载共享内存库失败: {e}") sys.exit(1) # 加载库 _lib = load_smem_library() # 设置函数原型 _lib.smem_init.argtypes = [] _lib.smem_init.restype = c_int _lib.client_smem_init.argtypes = [] _lib.client_smem_init.restype = c_int _lib.smem_destroy.argtypes = [] _lib.smem_destroy.restype = None _lib.offset_to_ptr.argtypes = [c_size_t] _lib.offset_to_ptr.restype = c_void_p _lib.smem_get_camera_list.argtypes = [] _lib.smem_get_camera_list.restype = POINTER(ActiveCameraList) # dino读取 摄像头id _lib.smem_read_frame_dino.argtypes = [c_int64, POINTER(ReadImageData)] _lib.smem_read_frame_dino.restype = c_int # dino设置千问id _lib.smem_set_qianwenID.argtypes = [c_int64, c_uint64, c_uint64] _lib.smem_set_qianwenID.restype = c_int # 千问读取 _lib.smem_read_frame_qianwen.argtypes = [c_int64] _lib.smem_read_frame_qianwen.restype = c_uint64 _lib.smem_destory_frame.argtypes = [c_int64, c_uint64] _lib.smem_destory_frame.restype = c_int # Python包装函数 def smem_init() -> int: return _lib.client_smem_init() def get_active_cameras() -> List[int]: """ 获取活动摄像头列表 Returns: List[int]: 返回摄像头ID列表,如果发生错误则返回空列表 """ try: # 获取摄像头列表指针 camera_list_ptr = _lib.smem_get_camera_list() if not camera_list_ptr: print("错误:smem_get_camera_list返回空指针") return [] # 安全地访问结构体内容 try: camera_list = camera_list_ptr.contents except (ValueError, AttributeError) as e: print(f"错误:无法访问摄像头列表内容: {e}") return [] # 获取摄像头数量 try: count = camera_list.count.__a if count < 0 or count > 1000: # 跳过无效数量 print(f"错误:无效的摄像头数量: {count}") return [] except AttributeError as e: print(f"错误:无法访问count字段: {e}") return [] # 获取camera_ids数组指针 camera_ids_ptr = _lib.offset_to_ptr(camera_list.camera_ids_offset) if not camera_ids_ptr: print("错误:无法获取camera_ids数组指针") return [] # 转换为Python列表 camera_ids = cast(camera_ids_ptr, POINTER(c_int64)) result = [] for i in range(count): try: camera_id = camera_ids[i] if camera_id != -1: # 跳过无效ID result.append(camera_id) except (IndexError, ValueError) as e: print(f"警告:访问索引{i}时出错: {e}") continue return result except Exception as e: print(f"错误:获取摄像头列表时发生异常: {e}") return [] def smem_read_frame_image_dino(camera_id: int) -> Tuple[int, Dict[str, Any]]: """ 读取指定摄像头的最小帧号图像数据 Args: camera_id: 摄像头ID Returns: (状态码, 图像数据字典) """ # 创建输出结构体 read_data = ReadImageData() # 调用C函数 status = _lib.smem_read_frame_dino(camera_id, byref(read_data)) # 处理结果 if status == SMEM_SUCCESS: # 只复制实际使用的数据部分 actual_data = bytes(read_data.image_data[:read_data.image_size]) result = { "frame_id": read_data.frame_id, "clock_time": read_data.clock_time, "image_size": read_data.image_size, "image_data": actual_data } else: result = { "error": f"读取失败,错误码: {status}" } return status, result def smem_set_qianwen_id(camera_id: int, frame_Id: int, qianwen_id: int) -> int: """ 设置指定摄像头的千问ID Args: camera_id: 摄像头ID frame_id: 帧id qianwenID: 千问id Returns: 状态码 """ return _lib.smem_set_qianwenID(camera_id, frame_Id, qianwen_id) def smem_read_frame_qianwen(camera_id: int) -> int: """ 读取指定摄像头的千问ID Args: camera_id: 摄像头ID Returns: 成功时返回千问ID(非负值),失败时返回错误码(负值) """ # 调用C函数,返回值小于0表示错误,大于等于0表示成功并且该值就是qianwenID result = _lib.smem_read_frame_qianwen(camera_id) return result def smem_destory_frame_image(camera_id: int, frame_id: int) -> int: """ 销毁指定摄像头的指定帧号图像数据 Args: camera_id: 摄像头ID frame_id: 帧号 Returns: 状态码 """ return _lib.smem_destory_frame(camera_id, frame_id) # 错误码转换为可读字符串 def smem_error_to_string(error_code: int) -> str: """ 将错误码转换为可读字符串 Args: error_code: 错误码 Returns: 错误描述字符串 """ error_map = { SMEM_SUCCESS: "成功", SMEM_ERROR_INVALID_PARAM: "无效参数", SMEM_ERROR_CONFIG: "配置错误", SMEM_ERROR_CREATE_SHM: "创建共享内存失败", SMEM_ERROR_NO_MEMORY: "内存不足", SMEM_ERROR_BUSY: "资源忙", SMEM_ERROR_NOT_FOUND_CAMERID: "未找到摄像头", SMEM_ERROR_NOT_FOUND_FRAMEID: "未找到帧号", SMEM_ERROR_INVALID_STATE: "无效状态", SMEM_ERROR_INVALID_SIZE: "无效大小" } return error_map.get(error_code, f"未知错误 ({error_code})") # 使用示例 if __name__ == "__main__": # 初始化共享内存 status = smem_init() if status != SMEM_SUCCESS: print(f"python---初始化失败: {smem_error_to_string(status)}") sys.exit(1) print("python---初始化成功") try: # 获取摄像头列表 camera_list = get_active_cameras() print(f"python---发现 {len(camera_list)} 个活动摄像头") for camera_id in camera_list: # print(f"摄像头ID: {camera_id}") # 读取图像数据 status, image_data = smem_read_frame_image_dino(camera_id) if status == SMEM_SUCCESS: image_size = image_data["image_size"] print(f" python---dino---------------------读取成功:") print(f" python---dino--帧号: {image_data['frame_id']}") print(f" python---dino--大小: {image_data['image_size']} 字节") print(f" python---dino--时间戳: {image_data['clock_time']}") # print(f" dino--数据: {image_data['image_data']}") qianwenID = camera_id * 10 + 111 aframdid = image_data["frame_id"] print(f" python---设置当前 千问 ID: {qianwenID} 当前帧是ID: {aframdid} ,千问读取..") smem_set_qianwen_id(camera_id, aframdid, qianwenID) status1 = smem_read_frame_qianwen(camera_id) if status1 > 0: print(f" python---qianwen----------------------读取成功:") print(f" python---qianwen--千问ID: {status1}") else: print(f"python---qianwen读取失败: {smem_error_to_string(status1)}") else: print(f"python---读取图像失败: {smem_error_to_string(status)}") finally: print("python---操作完成")