from operator import itemgetter
|
import threading
|
import time as time_sel
|
from typing import Dict
|
from qwen_thread import qwen_thread
|
import requests
|
import os
|
import logging
|
from pymilvus import connections, Collection
|
from logging.handlers import RotatingFileHandler
|
import get_mem
|
|
class ThreadPool:
|
def __init__(self):
|
#读取配置文件
|
self.config = {}
|
with open('./conf.txt', 'r', encoding='utf-8') as file:
|
for line in file:
|
# 去除每行的首尾空白字符(包括换行符)
|
line = line.strip()
|
# 跳过空行
|
if not line:
|
continue
|
# 分割键和值
|
if '=' in line:
|
key, value = line.split('=', 1)
|
# 去除键和值的首尾空白字符
|
key = key.strip()
|
value = value.strip()
|
# 将键值对添加到字典中
|
self.config[key] = value
|
# 创建实例专属logger
|
os.makedirs(self.config.get("logaddr"), exist_ok=True)
|
logging.basicConfig(
|
level=logging.INFO,
|
format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s',
|
datefmt='%Y-%m-%d %H:%M:%S',
|
handlers=[
|
# 按大小轮转的日志文件(最大10MB,保留3个备份)
|
RotatingFileHandler(
|
filename=os.path.join(self.config.get("logaddr"), 'qwen_log.log'),
|
maxBytes=10 * 1024 * 1024, # 10MB
|
backupCount=3,
|
encoding='utf-8'
|
),
|
# 同时输出到控制台
|
logging.StreamHandler()
|
]
|
)
|
self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
|
self.logger.setLevel(logging.INFO)
|
|
|
self.threads: Dict[str, threading.Thread] = {}
|
self.lock = threading.Lock()
|
|
# 初始化Milvus集合
|
connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport"))
|
# 加载集合
|
self.collection = Collection(name="smartobject")
|
self.collection.load()
|
#创建qwen线程池
|
self.pool = qwen_thread(self.config,self.logger)
|
#是否更新
|
self._isupdate = False
|
|
# 初始化共享内存
|
get_mem.smem_init()
|
|
|
#启动线程
|
def safe_start(self, target_func, camera_id):
|
"""线程安全启动方法"""
|
def wrapped():
|
thread_name = threading.current_thread().name
|
try:
|
target_func(camera_id)
|
except Exception as e:
|
logging.error(f"线程异常: {str(e)}", exc_info=True)
|
|
with self.lock: # 确保线程安全创建
|
t = threading.Thread(
|
target=wrapped,
|
daemon=True # 设置为守护线程
|
)
|
t.start()
|
self.threads[camera_id] = t
|
return t
|
|
# 启动线程任务
|
def worker(self, camera_id):
|
while True:
|
try:
|
res_a = self.collection.query(
|
expr=f"is_desc == 0 and video_point_id=={camera_id}",
|
output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
|
"task_name", "event_level_id", "event_level_name",
|
"video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id",
|
"detect_id","knowledge_id","suggestion","risk_description",
|
"detect_time", "image_path", "image_desc_path", "video_path"],
|
consistency_level="Strong",
|
order_by_field="id", # 按id字段排序
|
order_by_type="desc" # 降序排列
|
)
|
# 读取共享内存中的图片
|
# image_id = get_mem.smem_read_frame_qianwen(camera_id)
|
if len(res_a) > 0:
|
#sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
|
#res = sorted_results[0]
|
res = max(res_a, key=itemgetter("id"))
|
self.collection.delete(f"id == {res['id']}")
|
# 数据组
|
data = {
|
"event_level_id": res['event_level_id'], # event_level_id
|
"event_level_name": res['event_level_name'], # event_level_id
|
"rule_id": res["rule_id"],
|
"video_point_id": res['video_point_id'], # video_point_id
|
"video_point_name": res['video_point_name'],
|
"is_waning": 0,
|
"is_desc": 1,
|
"zh_desc_class": res['zh_desc_class'],
|
"bounding_box": res['bounding_box'], # bounding_box
|
"task_id": res['task_id'], # task_id
|
"task_name": res['task_name'], # task_id
|
"detect_id": res['detect_id'], # detect_id
|
"detect_time": res['detect_time'], # detect_time
|
"detect_num": res['detect_num'],
|
"waning_value": res['waning_value'],
|
"image_path": res['image_path'], # image_path
|
"image_desc_path": res['image_desc_path'], # image_desc_path
|
"video_path": res['video_path'],
|
"text_vector": res['text_vector'],
|
"risk_description": res['risk_description'],
|
"suggestion": res['suggestion'],
|
"knowledge_id": res['knowledge_id']
|
}
|
# 保存到milvus
|
image_id = self.collection.insert(data).primary_keys
|
res['id'] = image_id[0]
|
self.pool.submit(res)
|
time_sel.sleep(0.01)
|
except Exception as e:
|
logging.info(f"{camera_id}线程错误:{e}")
|
|
|
#调用是否需要更新
|
def isUpdate(self):
|
try:
|
# 定义请求的 URL
|
url = self.config.get("isupdateurl")
|
# 发送 GET 请求
|
response = requests.get(url)
|
|
# 检查响应状态码
|
if response.status_code == 200:
|
data = response.json().get("data")
|
if data.get("isChange") == 1:
|
return True
|
else:
|
return False
|
except Exception as e:
|
logging.info(f"调用是否需要更新时出错:URL:{self.config.get('isupdateurl')}:{e}")
|
return False
|
|
#修改是否更新状态
|
def update_status(self):
|
try:
|
# 更新状态
|
url = self.config.get("updatestatusurl")
|
# 发送 GET 请求
|
response = requests.post(url)
|
# 检查响应状态码
|
if response.status_code == 200:
|
return True
|
else:
|
return False
|
except Exception as e:
|
logging.info(f"修改是否更新状态时出错:URL:{self.config.get('updatestatusurl')}:{e}")
|
return False
|
|
def shutdown_all(self) -> None:
|
"""清理所有线程"""
|
with self.lock:
|
for camera_id, thread in list(self.threads.items()):
|
if thread.is_alive():
|
thread.join(timeout=1)
|
del self.threads[camera_id]
|
|
#获取任务
|
def getTaskconf(self,isupdate):
|
try:
|
# 定义请求的 URL
|
url = self.config.get("gettaskconfurl")
|
# 发送 GET 请求
|
response = requests.get(url)
|
# 检查响应状态码
|
if response.status_code == 200:
|
data = response.json()
|
if isupdate:
|
# 更新状态
|
self.update_status()
|
return data.get("data")
|
else:
|
return []
|
except Exception as e:
|
logging.info(f"调用获取任务时出错:URL:{self.config.get('gettaskconfurl')}:{e}")
|
return []
|
|
# 使用示例
|
if __name__ == "__main__":
|
pool = ThreadPool()
|
is_init = True
|
camera_data = pool.getTaskconf(False)
|
while True:
|
try:
|
pool._isupdate = False # 是否更新数据
|
# 是否需要更新任务数据
|
if pool.isUpdate():
|
# 获取摄像机任务
|
camera_data = pool.getTaskconf(True)
|
pool._isupdate = True # 更新数据
|
|
if is_init:
|
if camera_data:
|
for camera in camera_data:
|
thread = pool.threads.get(camera.get("camera_id"))
|
if not thread:
|
logging.info(f"开始创建{camera.get('camera_id')}线程")
|
pool.safe_start(pool.worker, camera.get('camera_id'))
|
logging.info(f"{camera.get('camera_id')}线程创建完毕")
|
|
if pool._isupdate:
|
logging.info(f"更新线程开始")
|
pool.shutdown_all()
|
if camera_data:
|
for camera in camera_data:
|
thread = pool.threads.get(camera.get("camera_id"))
|
if not thread:
|
logging.info(f"开始创建{camera.get('camera_id')}线程")
|
pool.safe_start(pool.worker, camera.get('camera_id'))
|
logging.info(f"{camera.get('camera_id')}线程创建完毕")
|
|
logging.info(f"更新线程结束")
|
|
is_init = False
|
time_sel.sleep(1)
|
except Exception as e:
|
logging.info(f"主线程未知错误:{e}")
|