zhm
2025-07-24 028e39f85bf4115024d0467613f26a1750ff004e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from datetime import datetime
import logging
import os
from concurrent.futures import ThreadPoolExecutor
import threading
from logging.handlers import RotatingFileHandler
from langchain_openai import ChatOpenAI
from pymilvus import connections, Collection
import qwen_vllm
 
 
def process_task(detect,image_id,ragurl,ragmode,max_tokens):
    image_id = detect.tark_do(image_id,ragurl,ragmode,max_tokens)
    print(f"完成任务 {image_id} (耗时: {10:.2f}s)")
    return image_id
 
 
class qwen_vllm_thread:
    def __init__(self, max_workers,camera_id,config):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.semaphore = threading.Semaphore(max_workers)
        self.max_workers = max_workers
        # 初始化Milvus集合
        connections.connect("default", host=config.get("milvusurl"), port=config.get("milvusport"))
        # 加载集合
        self.collection = Collection(name="smartobject")
        self.collection.load()
 
        # 初始化LangChain客户端
        self.llm = ChatOpenAI(
            model=config.get("vllmmode"),  # 与--served-model-name一致
            temperature=0.7,
            max_tokens=200,
            base_url=config.get("vllmurl"),
            api_key="EMPTY"
        )
        self.config = config
        # 初始化
        self.detect = qwen_vllm.detect_tasks()
        self.detect._thread_name = f"{camera_id}"  # 设置名称
        self.detect.init_logging(f"{camera_id}")
        # 初始化Milvus集合
        self.detect.collection = self.collection
        self.detect.llm = self.llm
        # 创建实例专属logger
        self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
        self.logger.setLevel(logging.INFO)
        # 避免重复添加handler
        if not self.logger.handlers:
            handler = RotatingFileHandler(
                filename=os.path.join("logs", 'thread_log.log'),
                maxBytes=10 * 1024 * 1024,
                backupCount=3,
                encoding='utf-8'
            )
            formatter = logging.Formatter(
                '%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s'
            )
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
 
    def submit(self,res_a):
        # 尝试获取信号量(非阻塞)
        acquired = self.semaphore.acquire(blocking=False)
 
        if not acquired:
            self.logger.info(f"线程池已满,等待空闲线程... (当前活跃: {self.max_workers - self.semaphore._value}/{self.max_workers})")
            # 阻塞等待直到有可用线程
            self.semaphore.acquire(blocking=True)
 
        future = self.executor.submit(self._wrap_task, res_a)
        future.add_done_callback(self._release_semaphore)
        return future
 
    def _wrap_task(self, res):
        try:
            #self.logger.info(f"处理: { res['id']}开始")
            current_time = datetime.now()
            image_id = self.detect.tark_do(res, self.config.get("ragurl"), self.config.get("ragmode"), self.config.get("max_tokens"))
            self.logger.info(f"处理: { res['id']}完毕{image_id}:{datetime.now() - current_time}")
            return image_id
        except Exception as e:
            self.logger.info(f"任务 { res['id']} 处理出错: {e}")
            raise
 
    def _release_semaphore(self, future):
        self.semaphore.release()
        self.logger.info(f"释放线程 (剩余空闲: {self.semaphore._value}/{self.max_workers})")
 
    def shutdown(self):
        self.executor.shutdown()