from datetime import datetime
|
import logging
|
import os
|
from concurrent.futures import ThreadPoolExecutor
|
import threading
|
from logging.handlers import RotatingFileHandler
|
from langchain_openai import ChatOpenAI
|
from pymilvus import connections, Collection
|
import qwen_vllm
|
|
|
def process_task(detect,image_id,ragurl,ragmode,max_tokens):
|
image_id = detect.tark_do(image_id,ragurl,ragmode,max_tokens)
|
print(f"完成任务 {image_id} (耗时: {10:.2f}s)")
|
return image_id
|
|
|
class qwen_vllm_thread:
|
def __init__(self, max_workers,camera_id,config):
|
self.executor = ThreadPoolExecutor(max_workers=max_workers)
|
self.semaphore = threading.Semaphore(max_workers)
|
self.max_workers = max_workers
|
# 初始化Milvus集合
|
connections.connect("default", host=config.get("milvusurl"), port=config.get("milvusport"))
|
# 加载集合
|
self.collection = Collection(name="smartobject")
|
self.collection.load()
|
|
# 初始化LangChain客户端
|
self.llm = ChatOpenAI(
|
model=config.get("vllmmode"), # 与--served-model-name一致
|
temperature=0.7,
|
max_tokens=200,
|
base_url=config.get("vllmurl"),
|
api_key="EMPTY"
|
)
|
self.config = config
|
# 初始化
|
self.detect = qwen_vllm.detect_tasks()
|
self.detect._thread_name = f"{camera_id}" # 设置名称
|
self.detect.init_logging(f"{camera_id}")
|
# 初始化Milvus集合
|
self.detect.collection = self.collection
|
self.detect.llm = self.llm
|
# 创建实例专属logger
|
self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
|
self.logger.setLevel(logging.INFO)
|
# 避免重复添加handler
|
if not self.logger.handlers:
|
handler = RotatingFileHandler(
|
filename=os.path.join("logs", 'thread_log.log'),
|
maxBytes=10 * 1024 * 1024,
|
backupCount=3,
|
encoding='utf-8'
|
)
|
formatter = logging.Formatter(
|
'%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s'
|
)
|
handler.setFormatter(formatter)
|
self.logger.addHandler(handler)
|
|
def submit(self,res_a):
|
# 尝试获取信号量(非阻塞)
|
acquired = self.semaphore.acquire(blocking=False)
|
|
if not acquired:
|
self.logger.info(f"线程池已满,等待空闲线程... (当前活跃: {self.max_workers - self.semaphore._value}/{self.max_workers})")
|
# 阻塞等待直到有可用线程
|
self.semaphore.acquire(blocking=True)
|
|
future = self.executor.submit(self._wrap_task, res_a)
|
future.add_done_callback(self._release_semaphore)
|
return future
|
|
def _wrap_task(self, res):
|
try:
|
#self.logger.info(f"处理: { res['id']}开始")
|
current_time = datetime.now()
|
image_id = self.detect.tark_do(res, self.config.get("ragurl"), self.config.get("ragmode"), self.config.get("max_tokens"))
|
self.logger.info(f"处理: { res['id']}完毕{image_id}:{datetime.now() - current_time}")
|
return image_id
|
except Exception as e:
|
self.logger.info(f"任务 { res['id']} 处理出错: {e}")
|
raise
|
|
def _release_semaphore(self, future):
|
self.semaphore.release()
|
self.logger.info(f"释放线程 (剩余空闲: {self.semaphore._value}/{self.max_workers})")
|
|
def shutdown(self):
|
self.executor.shutdown()
|