import threading import time as time_sel from typing import Dict import qwen_task import requests import os import torch import logging from transformers import AutoProcessor, AutoModelForVision2Seq from pymilvus import connections, Collection from logging.handlers import RotatingFileHandler import get_mem class ThreadPool: def __init__(self): #读取配置文件 self.config = {} with open('./conf.txt', 'r', encoding='utf-8') as file: for line in file: # 去除每行的首尾空白字符(包括换行符) line = line.strip() # 跳过空行 if not line: continue # 分割键和值 if '=' in line: key, value = line.split('=', 1) # 去除键和值的首尾空白字符 key = key.strip() value = value.strip() # 将键值对添加到字典中 self.config[key] = value # 配置日志 # 确保日志目录存在 log_dir = "logs" os.makedirs(log_dir, exist_ok=True) self.threads: Dict[str, threading.Thread] = {} self.lock = threading.Lock() self.device = "cuda" if torch.cuda.is_available() else "cpu" # 初始化qwenvl检测模型 # 指定本地路径 model_path = "./Qwen2-VL-2B-Instruct" # 加载模型 self.qwen_model = AutoModelForVision2Seq.from_pretrained( model_path, device_map="auto", # 自动分配GPU torch_dtype=torch.float16, # 半精度节省显存 trust_remote_code=True # 允许执行自定义代码 ).eval() # 加载处理器 self.qwen_tokenizer = AutoProcessor.from_pretrained(model_path) # 初始化Milvus集合 connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport")) # 加载集合 self.collection = Collection(name="smartobject") self.collection.load() #是否更新 self._isupdate = False # 初始化共享内存 get_mem.smem_init() # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ # 按大小轮转的日志文件(最大10MB,保留3个备份) RotatingFileHandler( filename=os.path.join(log_dir, 'start_log.log'), maxBytes=10 * 1024 * 1024, # 10MB backupCount=3, encoding='utf-8' ), # 同时输出到控制台 logging.StreamHandler() ] ) #启动线程 def safe_start(self, target_func, camera_id): """线程安全启动方法""" def wrapped(): thread_name = threading.current_thread().name try: target_func(camera_id) except Exception as e: logging.error(f"线程异常: {str(e)}", exc_info=True) with self.lock: # 确保线程安全创建 t = threading.Thread( target=wrapped, daemon=True # 设置为守护线程 ) t.start() self.threads[camera_id] = t return t # 启动线程任务 def worker(self, camera_id): # 初始化 detect = qwen_task.detect_tasks() detect._thread_name = f"{camera_id}" # 设置名称 detect.init_logging(f"{camera_id}") # 初始化检测模型 detect.device = self.device # 初始化qwen模型 detect.qwen_tokenizer = self.qwen_tokenizer detect.qwen_model = self.qwen_model # 初始化Milvus集合 detect.collection = self.collection # 设置线程启动状态 detect._running = True try: # 读取共享内存中的图片 image_id_list = [458450378519380938, 458450378519380940, 458450378519380942, 458450378519380944, 458450378519380946, 458450378519380948, 458450378519380950, 458450378519380952, 458450378519380954, 458450378519380956, 458450378519380958, 458450378519380960, 458450378519380962, 458450378519380964, 0, 458450378519380966, 458450378519380968, 0, 458450378519380970, 458450378519380972, 458450378519380974, 458450378519380976, 458450378519380978, 458450378519380980, 458450378519380982, 458450378519380984, 458450378519380986, 458450378519380988, 458450378519380990, 458450378519380992, 0, 458450378519380994, 458450378519380996, 0, 458450378519380998, 458450378519381000, 0, 458450378519381002, 458450378519381004, 458450378519381006, 0, 458450378519381008, 458450378519381010, 458450378519381012, 458450378519381014, 458450378519381016, 458450378519381018, 458450378519381020, 458450378519381022, 458450378519381024, 458450378519381026, 458450378519381028, 458450378519381030, 458450378519381032, 458450378519381034, 458450378519381036, 458450378519381038, 458450378519381040, 458450378519381042, 458450378519381044, 458450378519381046, 0, 458450378519381048, 458450378519381050, 458450378519381052, 458450378519381054, 458450378519381056, 458450378519381058, 458450378519381060, 458450378519381062, 458450378519381064, 458450378519381066, 458450378519381068, 458450378519381070, 458450378519381072, 458450378519381074, 458450378519381076, 458450378519381078, 458450378519381080, 458450378519381082, 458450378519381084, 0, 458450378519381086, 458450378519381088, 458450378519381090, 458450378519381092, 458450378519381094, 458450378519381096, 458450378519381098, 458450378519381100, 458450378519381102, 458450378519381104, 458450378519381106, 458450378519381108, 458450378519381110, 458450378519381112, 458450378519381114, 458450378519381116, 458450378519381118, 458450378519381120, 458450378519381122, 458450378519381124, 458450378519381126, 0, 458450378519381128, 458450378519381130, 458450378519381132, 458450378519381134, 458450378519381136, 458450378519381138, 458450378519381140, 458450378519381142, 458450378519381144, 458450378519381146, 458450378519381148, 458450378519381150, 458450378519381152, 458450378519381154, 458450378519381156, 458450378519381158 ] for image_id in image_id_list: logging.info(f"读取图像成功: {image_id}") image_id = detect.tark_do(image_id,self.config.get("filesavepath"),self.config.get("ragurl"),self.config.get("max_tokens")) logging.info(f"处理图像成功: {image_id}") except Exception as e: logging.info(f"{detect._thread_name}线程错误:{e}") #调用是否需要更新 def isUpdate(self): try: # 定义请求的 URL url = self.config.get("isupdateurl") # 发送 GET 请求 response = requests.get(url) # 检查响应状态码 if response.status_code == 200: data = response.json().get("data") if data.get("isChange") == 1: return True else: return False except Exception as e: logging.info(f"调用是否需要更新时出错:URL:{self.config.get('isupdateurl')}:{e}") return False #修改是否更新状态 def update_status(self): try: # 更新状态 url = self.config.get("updatestatusurl") # 发送 GET 请求 response = requests.post(url) # 检查响应状态码 if response.status_code == 200: return True else: return False except Exception as e: logging.info(f"修改是否更新状态时出错:URL:{self.config.get('updatestatusurl')}:{e}") return False def shutdown_all(self) -> None: """清理所有线程""" with self.lock: for camera_id, thread in list(self.threads.items()): if thread.is_alive(): thread.join(timeout=1) del self.threads[camera_id] #获取任务 def getTaskconf(self,isupdate): try: # 定义请求的 URL url = self.config.get("gettaskconfurl") # 发送 GET 请求 response = requests.get(url) # 检查响应状态码 if response.status_code == 200: data = response.json() if isupdate: # 更新状态 self.update_status() return data.get("data") else: return [] except Exception as e: logging.info(f"调用获取任务时出错:URL:{self.config.get('gettaskconfurl')}:{e}") return [] # 使用示例 if __name__ == "__main__": pool = ThreadPool() is_init = True camera_data = pool.getTaskconf(False) while True: try: pool._isupdate = False # 是否更新数据 # 是否需要更新任务数据 if pool.isUpdate(): # 获取摄像机任务 camera_data = pool.getTaskconf(True) pool._isupdate = True # 更新数据 if is_init: if camera_data: for camera in camera_data: thread = pool.threads.get(camera.get("camera_id")) if not thread: logging.info(f"开始创建{camera.get('camera_id')}线程") pool.safe_start(pool.worker, camera.get('camera_id')) logging.info(f"{camera.get('camera_id')}线程创建完毕") if pool._isupdate: logging.info(f"更新线程开始") pool.shutdown_all() if camera_data: for camera in camera_data: thread = pool.threads.get(camera.get("camera_id")) if not thread: logging.info(f"开始创建{camera.get('camera_id')}线程") pool.safe_start(pool.worker, camera.get('camera_id')) logging.info(f"{camera.get('camera_id')}线程创建完毕") logging.info(f"更新线程结束") is_init = False time_sel.sleep(1) except Exception as e: logging.info(f"主线程未知错误:{e}")