#!/usr/bin/env python3
|
# -*- coding: utf-8 -*-
|
"""
|
SMEM Python接口模块
|
提供对共享内存库的访问,用于读取摄像头图像和向量数据
|
"""
|
|
import os
|
import sys
|
import ctypes
|
from ctypes import *
|
from typing import Tuple, List, Dict, Optional, Any
|
|
# 定义错误码常量
|
SMEM_SUCCESS = 0
|
SMEM_ERROR_INVALID_PARAM = -1
|
SMEM_ERROR_CONFIG = -2
|
SMEM_ERROR_CREATE_SHM = -3
|
SMEM_ERROR_NO_MEMORY = -4
|
SMEM_ERROR_BUSY = -5
|
SMEM_ERROR_NOT_FOUND_CAMERID = -6
|
SMEM_ERROR_NOT_FOUND_FRAMEID = -7
|
SMEM_ERROR_INVALID_STATE = -8
|
SMEM_ERROR_INVALID_SIZE = -9
|
|
|
# 定义原子类型
|
class atomic_size_t(Structure):
|
_fields_ = [("__a", c_size_t)]
|
|
|
class atomic_int(Structure):
|
_fields_ = [("__a", c_int)]
|
|
|
# 修正ActiveCameraList结构体定义
|
class ActiveCameraList(Structure):
|
_fields_ = [
|
("camera_ids_offset", c_size_t), # 摄像头ID数组相对偏移量
|
("count", atomic_size_t), # 当前活动摄像头数量
|
("capacity", atomic_size_t), # 数组容量
|
("lock", atomic_int) # 用于同步的锁
|
]
|
|
|
# 修正ReadImageData结构体定义
|
class ReadImageData(Structure):
|
_fields_ = [
|
("frame_id", c_uint64),
|
("clock_time", c_uint64),
|
("image_size", c_size_t),
|
("image_data", POINTER(c_char))
|
]
|
|
|
# 加载共享库
|
def load_smem_library():
|
"""加载共享内存库"""
|
try:
|
# 尝试从不同位置加载库
|
lib_paths = [
|
"./libsmem.so", # 当前目录
|
"/lib/x86_64-linux-gnu/libsmem.so", # 系统库目录
|
os.path.join(os.path.dirname(os.path.abspath(__file__)), "libsmem.so") # 模块目录
|
]
|
|
for path in lib_paths:
|
if os.path.exists(path):
|
return CDLL(path)
|
|
# 如果找不到指定路径,尝试直接按名称加载
|
return CDLL("libsmem.so")
|
except Exception as e:
|
print(f"加载共享内存库失败: {e}")
|
sys.exit(1)
|
|
|
# 加载库
|
_lib = load_smem_library()
|
|
# 设置函数原型
|
_lib.smem_init.argtypes = []
|
_lib.smem_init.restype = c_int
|
|
_lib.client_smem_init.argtypes = []
|
_lib.client_smem_init.restype = c_int
|
|
_lib.smem_destroy.argtypes = []
|
_lib.smem_destroy.restype = None
|
|
_lib.offset_to_ptr.argtypes = [c_size_t]
|
_lib.offset_to_ptr.restype = c_void_p
|
|
_lib.smem_get_camera_list.argtypes = []
|
_lib.smem_get_camera_list.restype = POINTER(ActiveCameraList)
|
|
# dino读取 摄像头id
|
_lib.smem_read_frame_dino.argtypes = [c_int64, POINTER(ReadImageData)]
|
_lib.smem_read_frame_dino.restype = c_int
|
|
# dino设置千问id
|
_lib.smem_set_qianwenID.argtypes = [c_int64, c_uint64, c_uint64]
|
_lib.smem_set_qianwenID.restype = c_int
|
|
# 千问读取
|
_lib.smem_read_frame_qianwen.argtypes = [c_int64]
|
_lib.smem_read_frame_qianwen.restype = c_uint64
|
|
_lib.smem_destory_frame.argtypes = [c_int64, c_uint64]
|
_lib.smem_destory_frame.restype = c_int
|
|
|
# Python包装函数
|
def smem_init() -> int:
|
return _lib.client_smem_init()
|
|
|
def get_active_cameras() -> List[int]:
|
"""
|
获取活动摄像头列表
|
|
Returns:
|
List[int]: 返回摄像头ID列表,如果发生错误则返回空列表
|
"""
|
try:
|
# 获取摄像头列表指针
|
camera_list_ptr = _lib.smem_get_camera_list()
|
if not camera_list_ptr:
|
print("错误:smem_get_camera_list返回空指针")
|
return []
|
|
# 安全地访问结构体内容
|
try:
|
camera_list = camera_list_ptr.contents
|
except (ValueError, AttributeError) as e:
|
print(f"错误:无法访问摄像头列表内容: {e}")
|
return []
|
|
# 获取摄像头数量
|
try:
|
count = camera_list.count.__a
|
if count < 0 or count > 1000: # 跳过无效数量
|
print(f"错误:无效的摄像头数量: {count}")
|
return []
|
except AttributeError as e:
|
print(f"错误:无法访问count字段: {e}")
|
return []
|
|
# 获取camera_ids数组指针
|
camera_ids_ptr = _lib.offset_to_ptr(camera_list.camera_ids_offset)
|
if not camera_ids_ptr:
|
print("错误:无法获取camera_ids数组指针")
|
return []
|
|
# 转换为Python列表
|
camera_ids = cast(camera_ids_ptr, POINTER(c_int64))
|
result = []
|
for i in range(count):
|
try:
|
camera_id = camera_ids[i]
|
if camera_id != -1: # 跳过无效ID
|
result.append(camera_id)
|
except (IndexError, ValueError) as e:
|
print(f"警告:访问索引{i}时出错: {e}")
|
continue
|
|
return result
|
|
except Exception as e:
|
print(f"错误:获取摄像头列表时发生异常: {e}")
|
return []
|
|
|
def smem_read_frame_image_dino(camera_id: int) -> Tuple[int, Dict[str, Any]]:
|
"""
|
读取指定摄像头的最小帧号图像数据
|
|
Args:
|
camera_id: 摄像头ID
|
|
Returns:
|
(状态码, 图像数据字典)
|
"""
|
# 创建输出结构体
|
read_data = ReadImageData()
|
|
# 调用C函数
|
status = _lib.smem_read_frame_dino(camera_id, byref(read_data))
|
|
# 处理结果
|
if status == SMEM_SUCCESS:
|
# 只复制实际使用的数据部分
|
actual_data = bytes(read_data.image_data[:read_data.image_size])
|
result = {
|
"frame_id": read_data.frame_id,
|
"clock_time": read_data.clock_time,
|
"image_size": read_data.image_size,
|
"image_data": actual_data
|
}
|
else:
|
result = {
|
"error": f"读取失败,错误码: {status}"
|
}
|
|
return status, result
|
|
|
def smem_set_qianwen_id(camera_id: int, frame_Id: int, qianwen_id: int) -> int:
|
"""
|
设置指定摄像头的千问ID
|
|
Args:
|
camera_id: 摄像头ID
|
frame_id: 帧id
|
qianwenID: 千问id
|
Returns:
|
状态码
|
"""
|
return _lib.smem_set_qianwenID(camera_id, frame_Id, qianwen_id)
|
|
|
def smem_read_frame_qianwen(camera_id: int) -> int:
|
"""
|
读取指定摄像头的千问ID
|
|
Args:
|
camera_id: 摄像头ID
|
|
Returns:
|
成功时返回千问ID(非负值),失败时返回错误码(负值)
|
"""
|
# 调用C函数,返回值小于0表示错误,大于等于0表示成功并且该值就是qianwenID
|
result = _lib.smem_read_frame_qianwen(camera_id)
|
return result
|
|
|
def smem_destory_frame_image(camera_id: int, frame_id: int) -> int:
|
"""
|
销毁指定摄像头的指定帧号图像数据
|
|
Args:
|
camera_id: 摄像头ID
|
frame_id: 帧号
|
|
Returns:
|
状态码
|
"""
|
return _lib.smem_destory_frame(camera_id, frame_id)
|
|
|
# 错误码转换为可读字符串
|
def smem_error_to_string(error_code: int) -> str:
|
"""
|
将错误码转换为可读字符串
|
|
Args:
|
error_code: 错误码
|
|
Returns:
|
错误描述字符串
|
"""
|
error_map = {
|
SMEM_SUCCESS: "成功",
|
SMEM_ERROR_INVALID_PARAM: "无效参数",
|
SMEM_ERROR_CONFIG: "配置错误",
|
SMEM_ERROR_CREATE_SHM: "创建共享内存失败",
|
SMEM_ERROR_NO_MEMORY: "内存不足",
|
SMEM_ERROR_BUSY: "资源忙",
|
SMEM_ERROR_NOT_FOUND_CAMERID: "未找到摄像头",
|
SMEM_ERROR_NOT_FOUND_FRAMEID: "未找到帧号",
|
SMEM_ERROR_INVALID_STATE: "无效状态",
|
SMEM_ERROR_INVALID_SIZE: "无效大小"
|
}
|
return error_map.get(error_code, f"未知错误 ({error_code})")
|
|
|
# 使用示例
|
if __name__ == "__main__":
|
# 初始化共享内存
|
status = smem_init()
|
if status != SMEM_SUCCESS:
|
print(f"python---初始化失败: {smem_error_to_string(status)}")
|
sys.exit(1)
|
|
print("python---初始化成功")
|
|
try:
|
# 获取摄像头列表
|
camera_list = get_active_cameras()
|
print(f"python---发现 {len(camera_list)} 个活动摄像头")
|
|
for camera_id in camera_list:
|
# print(f"摄像头ID: {camera_id}")
|
|
# 读取图像数据
|
status, image_data = smem_read_frame_image_dino(camera_id)
|
if status == SMEM_SUCCESS:
|
image_size = image_data["image_size"]
|
print(f" python---dino---------------------读取成功:")
|
print(f" python---dino--帧号: {image_data['frame_id']}")
|
print(f" python---dino--大小: {image_data['image_size']} 字节")
|
print(f" python---dino--时间戳: {image_data['clock_time']}")
|
# print(f" dino--数据: {image_data['image_data']}")
|
qianwenID = camera_id * 10 + 111
|
aframdid = image_data["frame_id"]
|
print(f" python---设置当前 千问 ID: {qianwenID} 当前帧是ID: {aframdid} ,千问读取..")
|
smem_set_qianwen_id(camera_id, aframdid, qianwenID)
|
status1 = smem_read_frame_qianwen(camera_id)
|
if status1 > 0:
|
print(f" python---qianwen----------------------读取成功:")
|
print(f" python---qianwen--千问ID: {status1}")
|
else:
|
print(f"python---qianwen读取失败: {smem_error_to_string(status1)}")
|
else:
|
print(f"python---读取图像失败: {smem_error_to_string(status)}")
|
|
finally:
|
print("python---操作完成")
|