import threading import time as time_sel from typing import Dict import qwen_task import requests import os import torch import logging from transformers import AutoProcessor, AutoModelForVision2Seq from pymilvus import connections, Collection from logging.handlers import RotatingFileHandler import get_mem class ThreadPool: def __init__(self): #读取配置文件 self.config = {} with open('./conf.txt', 'r', encoding='utf-8') as file: for line in file: # 去除每行的首尾空白字符(包括换行符) line = line.strip() # 跳过空行 if not line: continue # 分割键和值 if '=' in line: key, value = line.split('=', 1) # 去除键和值的首尾空白字符 key = key.strip() value = value.strip() # 将键值对添加到字典中 self.config[key] = value # 配置日志 # 确保日志目录存在 log_dir = "logs" os.makedirs(log_dir, exist_ok=True) self.threads: Dict[str, threading.Thread] = {} self.lock = threading.Lock() self.device = "cuda" if torch.cuda.is_available() else "cpu" # 初始化qwenvl检测模型 # 指定本地路径 model_path = "./Qwen2.5-VL-3B-Instruct-AWQ" # 加载模型 self.qwen_model = AutoModelForVision2Seq.from_pretrained( model_path, device_map="auto" ).eval() # 加载处理器 self.qwen_tokenizer = AutoProcessor.from_pretrained(model_path,use_fast=True) # 初始化Milvus集合 connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport")) # 加载集合 self.collection = Collection(name="smartobject") self.collection.load() #是否更新 self._isupdate = False # 初始化共享内存 get_mem.smem_init() # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ # 按大小轮转的日志文件(最大10MB,保留3个备份) RotatingFileHandler( filename=os.path.join(log_dir, 'start_log.log'), maxBytes=10 * 1024 * 1024, # 10MB backupCount=3, encoding='utf-8' ), # 同时输出到控制台 logging.StreamHandler() ] ) #启动线程 def safe_start(self, target_func, camera_id): """线程安全启动方法""" def wrapped(): thread_name = threading.current_thread().name try: target_func(camera_id) except Exception as e: logging.error(f"线程异常: {str(e)}", exc_info=True) with self.lock: # 确保线程安全创建 t = threading.Thread( target=wrapped, daemon=True # 设置为守护线程 ) t.start() self.threads[camera_id] = t return t # 启动线程任务 def worker(self, camera_id): # 初始化 detect = qwen_task.detect_tasks() detect._thread_name = f"{camera_id}" # 设置名称 detect.init_logging(f"{camera_id}") # 初始化检测模型 detect.device = self.device # 初始化qwen模型 detect.qwen_tokenizer = self.qwen_tokenizer detect.qwen_model = self.qwen_model # 初始化Milvus集合 detect.collection = self.collection # 设置线程启动状态 detect._running = True try: # 读取共享内存中的图片 image_id_list = [458671636370971032] for image_id in image_id_list: logging.info(f"读取图像成功: {image_id}") image_id = detect.tark_do(image_id,self.config.get("filesavepath"),self.config.get("ragurl"),self.config.get("max_tokens")) logging.info(f"处理图像成功: {image_id}") except Exception as e: logging.info(f"{detect._thread_name}线程错误:{e}") #调用是否需要更新 def isUpdate(self): try: # 定义请求的 URL url = self.config.get("isupdateurl") # 发送 GET 请求 response = requests.get(url) # 检查响应状态码 if response.status_code == 200: data = response.json().get("data") if data.get("isChange") == 1: return True else: return False except Exception as e: logging.info(f"调用是否需要更新时出错:URL:{self.config.get('isupdateurl')}:{e}") return False #修改是否更新状态 def update_status(self): try: # 更新状态 url = self.config.get("updatestatusurl") # 发送 GET 请求 response = requests.post(url) # 检查响应状态码 if response.status_code == 200: return True else: return False except Exception as e: logging.info(f"修改是否更新状态时出错:URL:{self.config.get('updatestatusurl')}:{e}") return False def shutdown_all(self) -> None: """清理所有线程""" with self.lock: for camera_id, thread in list(self.threads.items()): if thread.is_alive(): thread.join(timeout=1) del self.threads[camera_id] #获取任务 def getTaskconf(self,isupdate): try: # 定义请求的 URL url = self.config.get("gettaskconfurl") # 发送 GET 请求 response = requests.get(url) # 检查响应状态码 if response.status_code == 200: data = response.json() if isupdate: # 更新状态 self.update_status() return data.get("data") else: return [] except Exception as e: logging.info(f"调用获取任务时出错:URL:{self.config.get('gettaskconfurl')}:{e}") return [] # 使用示例 if __name__ == "__main__": pool = ThreadPool() is_init = True camera_data = pool.getTaskconf(False) while True: try: pool._isupdate = False # 是否更新数据 # 是否需要更新任务数据 if pool.isUpdate(): # 获取摄像机任务 camera_data = pool.getTaskconf(True) pool._isupdate = True # 更新数据 if is_init: if camera_data: for camera in camera_data: thread = pool.threads.get(camera.get("camera_id")) if not thread: logging.info(f"开始创建{camera.get('camera_id')}线程") pool.safe_start(pool.worker, camera.get('camera_id')) logging.info(f"{camera.get('camera_id')}线程创建完毕") if pool._isupdate: logging.info(f"更新线程开始") pool.shutdown_all() if camera_data: for camera in camera_data: thread = pool.threads.get(camera.get("camera_id")) if not thread: logging.info(f"开始创建{camera.get('camera_id')}线程") pool.safe_start(pool.worker, camera.get('camera_id')) logging.info(f"{camera.get('camera_id')}线程创建完毕") logging.info(f"更新线程结束") is_init = False time_sel.sleep(1) except Exception as e: logging.info(f"主线程未知错误:{e}")