from operator import itemgetter import threading import time as time_sel from typing import Dict from qwen_thread_batch import qwen_thread_batch import requests import os import logging from pymilvus import connections, Collection from logging.handlers import RotatingFileHandler import get_mem class ThreadPool: def __init__(self): #读取配置文件 self.config = {} with open('./conf.txt', 'r', encoding='utf-8') as file: for line in file: # 去除每行的首尾空白字符(包括换行符) line = line.strip() # 跳过空行 if not line: continue # 分割键和值 if '=' in line: key, value = line.split('=', 1) # 去除键和值的首尾空白字符 key = key.strip() value = value.strip() # 将键值对添加到字典中 self.config[key] = value # 配置日志 # 确保日志目录存在 log_dir = "logs" os.makedirs(log_dir, exist_ok=True) self.threads: Dict[str, threading.Thread] = {} self.lock = threading.Lock() # 初始化Milvus集合 connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport")) # 加载集合 self.collection = Collection(name="smartobject") self.collection.load() self.pool = qwen_thread_batch(int(self.config.get("threadnum")), self.config,"/home/debian/Qwen2.5-VL-3B-Instruct-GPTQ-Int4") #是否更新 self._isupdate = False # 初始化共享内存 get_mem.smem_init() # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ # 按大小轮转的日志文件(最大10MB,保留3个备份) RotatingFileHandler( filename=os.path.join(log_dir, 'start_log.log'), maxBytes=10 * 1024 * 1024, # 10MB backupCount=3, encoding='utf-8' ), # 同时输出到控制台 logging.StreamHandler() ] ) #启动线程 def safe_start(self, target_func, camera_id): """线程安全启动方法""" def wrapped(): thread_name = threading.current_thread().name try: target_func(camera_id) except Exception as e: logging.error(f"线程异常: {str(e)}", exc_info=True) with self.lock: # 确保线程安全创建 t = threading.Thread( target=wrapped, daemon=True # 设置为守护线程 ) t.start() self.threads[camera_id] = t return t # 启动线程任务 def worker(self, camera_id): while True: try: res_a = self.collection.query( expr=f"is_desc == 0 and video_point_id=={camera_id}", output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id", "task_name", "event_level_id", "event_level_name", "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id", "detect_id","knowledge_id", "detect_time", "image_path", "image_desc_path", "video_path"], consistency_level="Strong", order_by_field="id", # 按id字段排序 order_by_type="desc" # 降序排列 ) # 读取共享内存中的图片 # image_id = get_mem.smem_read_frame_qianwen(camera_id) if len(res_a) > 0: sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True) # 查询前N个最大的ID res_a = sorted_results[:int(self.config.get("detectnum"))] res_data = [] for res in res_a: data = { "id": res['id'], "event_level_id": res['event_level_id'], # event_level_id "event_level_name": res['event_level_name'], # event_level_id "rule_id": res["rule_id"], "video_point_id": res['video_point_id'], # video_point_id "video_point_name": res['video_point_name'], "is_waning": 0, "is_desc": 1, "zh_desc_class": res['zh_desc_class'], # text_vector "bounding_box": res['bounding_box'], # bounding_box "task_id": res['task_id'], # task_id "task_name": res['task_name'], # task_id "detect_id": res['detect_id'], # detect_id "detect_time": res['detect_time'], # detect_time "detect_num": res['detect_num'], "waning_value": res['waning_value'], "image_path": res['image_path'], # image_path "image_desc_path": res['image_desc_path'], # image_desc_path "video_path": res['video_path'], "text_vector": res['text_vector'], "knowledge_id": res['knowledge_id'] } # logging.info(f"读取图像成功: {res['id']}") # 保存到milvus image_id = self.collection.upsert(data).primary_keys res['id'] = image_id[0] res_data.append(res) self.pool.submit(res_data) # image_id = pool.tark_do(image_id,self.config.get("ragurl"),self.config.get("ragmode"),self.config.get("max_tokens")) # logging.info(f"处理图像成功: {image_id}") sorted_results = None except Exception as e: logging.info(f"{camera_id}线程错误:{e}") #调用是否需要更新 def isUpdate(self): try: # 定义请求的 URL url = self.config.get("isupdateurl") # 发送 GET 请求 response = requests.get(url) # 检查响应状态码 if response.status_code == 200: data = response.json().get("data") if data.get("isChange") == 1: return True else: return False except Exception as e: logging.info(f"调用是否需要更新时出错:URL:{self.config.get('isupdateurl')}:{e}") return False #修改是否更新状态 def update_status(self): try: # 更新状态 url = self.config.get("updatestatusurl") # 发送 GET 请求 response = requests.post(url) # 检查响应状态码 if response.status_code == 200: return True else: return False except Exception as e: logging.info(f"修改是否更新状态时出错:URL:{self.config.get('updatestatusurl')}:{e}") return False def shutdown_all(self) -> None: """清理所有线程""" with self.lock: for camera_id, thread in list(self.threads.items()): if thread.is_alive(): thread.join(timeout=1) del self.threads[camera_id] #获取任务 def getTaskconf(self,isupdate): try: # 定义请求的 URL url = self.config.get("gettaskconfurl") # 发送 GET 请求 response = requests.get(url) # 检查响应状态码 if response.status_code == 200: data = response.json() if isupdate: # 更新状态 self.update_status() return data.get("data") else: return [] except Exception as e: logging.info(f"调用获取任务时出错:URL:{self.config.get('gettaskconfurl')}:{e}") return [] # 使用示例 if __name__ == "__main__": pool = ThreadPool() is_init = True camera_data = pool.getTaskconf(False) while True: try: pool._isupdate = False # 是否更新数据 # 是否需要更新任务数据 if pool.isUpdate(): # 获取摄像机任务 camera_data = pool.getTaskconf(True) pool._isupdate = True # 更新数据 if is_init: if camera_data: for camera in camera_data: thread = pool.threads.get(camera.get("camera_id")) if not thread: logging.info(f"开始创建{camera.get('camera_id')}线程") pool.safe_start(pool.worker, camera.get('camera_id')) logging.info(f"{camera.get('camera_id')}线程创建完毕") if pool._isupdate: logging.info(f"更新线程开始") pool.shutdown_all() if camera_data: for camera in camera_data: thread = pool.threads.get(camera.get("camera_id")) if not thread: logging.info(f"开始创建{camera.get('camera_id')}线程") pool.safe_start(pool.worker, camera.get('camera_id')) logging.info(f"{camera.get('camera_id')}线程创建完毕") logging.info(f"更新线程结束") is_init = False time_sel.sleep(1) except Exception as e: logging.info(f"主线程未知错误:{e}")