from datetime import datetime import logging import os from concurrent.futures import ThreadPoolExecutor import threading from logging.handlers import RotatingFileHandler from langchain_openai import ChatOpenAI from pymilvus import connections, Collection import qwen_vllm def process_task(detect,image_id,ragurl,ragmode,max_tokens): image_id = detect.tark_do(image_id,ragurl,ragmode,max_tokens) print(f"完成任务 {image_id} (耗时: {10:.2f}s)") return image_id class qwen_vllm_thread: def __init__(self, max_workers,camera_id,config): self.executor = ThreadPoolExecutor(max_workers=max_workers) self.semaphore = threading.Semaphore(max_workers) self.max_workers = max_workers # 初始化Milvus集合 connections.connect("default", host=config.get("milvusurl"), port=config.get("milvusport")) # 加载集合 self.collection = Collection(name="smartobject") self.collection.load() # 初始化LangChain客户端 self.llm = ChatOpenAI( model=config.get("vllmmode"), # 与--served-model-name一致 temperature=0.7, max_tokens=200, base_url=config.get("vllmurl"), api_key="EMPTY" ) self.config = config # 初始化 self.detect = qwen_vllm.detect_tasks() self.detect._thread_name = f"{camera_id}" # 设置名称 self.detect.init_logging(f"{camera_id}") # 初始化Milvus集合 self.detect.collection = self.collection self.detect.llm = self.llm # 创建实例专属logger self.logger = logging.getLogger(f"{self.__class__}_{id(self)}") self.logger.setLevel(logging.INFO) # 避免重复添加handler if not self.logger.handlers: handler = RotatingFileHandler( filename=os.path.join("logs", 'thread_log.log'), maxBytes=10 * 1024 * 1024, backupCount=3, encoding='utf-8' ) formatter = logging.Formatter( '%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s' ) handler.setFormatter(formatter) self.logger.addHandler(handler) def submit(self,res_a): # 尝试获取信号量(非阻塞) acquired = self.semaphore.acquire(blocking=False) if not acquired: self.logger.info(f"线程池已满,等待空闲线程... (当前活跃: {self.max_workers - self.semaphore._value}/{self.max_workers})") # 阻塞等待直到有可用线程 self.semaphore.acquire(blocking=True) future = self.executor.submit(self._wrap_task, res_a) future.add_done_callback(self._release_semaphore) return future def _wrap_task(self, res): try: #self.logger.info(f"处理: { res['id']}开始") current_time = datetime.now() image_id = self.detect.tark_do(res, self.config.get("ragurl"), self.config.get("ragmode"), self.config.get("max_tokens")) self.logger.info(f"处理: { res['id']}完毕{image_id}:{datetime.now() - current_time}") return image_id except Exception as e: self.logger.info(f"任务 { res['id']} 处理出错: {e}") raise def _release_semaphore(self, future): self.semaphore.release() self.logger.info(f"释放线程 (剩余空闲: {self.semaphore._value}/{self.max_workers})") def shutdown(self): self.executor.shutdown()