From fbce9b151c32f7deba641cb7edc27d6822a2eea7 Mon Sep 17 00:00:00 2001
From: shidong <shidong@jhsoft.cc>
Date: 星期四, 03 七月 2025 11:55:39 +0800
Subject: [PATCH] #2025/7/3 #新增qwen_detect.py 修正了使用多线程进行图片描述部分,目前是测试版本 #新增qwen_thread.py 用于生成单个图片描述的多线程处理,目前是测试版本

---
 qwen_detect.py |   92 +++++++-----
 qwen_thread.py |  316 +++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 370 insertions(+), 38 deletions(-)

diff --git a/qwen_detect.py b/qwen_detect.py
index db6ccea..29788f2 100644
--- a/qwen_detect.py
+++ b/qwen_detect.py
@@ -1,18 +1,16 @@
-# 绂佺敤tokenizer骞惰锛堝繀椤绘斁鍦ㄦ墍鏈塱mport涔嬪墠锛�
+from operator import itemgetter
 import torch
 import threading
 import time as time_sel
 from typing import Dict
-import qwen_task
+from qwen_thread import qwen_thread
 import requests
 import os
-
 import logging
 from transformers import AutoProcessor, AutoModelForVision2Seq
 from pymilvus import connections, Collection
 from logging.handlers import RotatingFileHandler
 import get_mem
-
 
 class ThreadPool:
     def __init__(self):
@@ -40,24 +38,12 @@
         self.threads: Dict[str, threading.Thread] = {}
         self.lock = threading.Lock()
 
-        self.device = "cuda" if torch.cuda.is_available() else "cpu"
-
-        # 鍒濆鍖杚wenvl妫�娴嬫ā鍨�
-        self.qwen_model = AutoModelForVision2Seq.from_pretrained(
-            "./Qwen2-VL-2B-Instruct", device_map="auto",
-            trust_remote_code=True,
-            torch_dtype=torch.float16
-        )
-
-        # default processer
-        self.qwen_tokenizer = AutoProcessor.from_pretrained("./Qwen2-VL-2B-Instruct")
-
         # 鍒濆鍖朚ilvus闆嗗悎
         connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport"))
         # 鍔犺浇闆嗗悎
         self.collection = Collection(name="smartobject")
         self.collection.load()
-
+        self.pool = qwen_thread(int(self.config.get("threadnum")), self.config,"/home/debian/Qwen2.5-VL-3B-Instruct-GPTQ-Int4")
         #鏄惁鏇存柊
         self._isupdate = False
 
@@ -103,31 +89,61 @@
 
     # 鍚姩绾跨▼浠诲姟
     def worker(self, camera_id):
-        # 鍒濆鍖�
-        detect = qwen_task.detect_tasks()
-        detect._thread_name = f"{camera_id}"  # 璁剧疆鍚嶇О
-        detect.init_logging(f"{camera_id}")
-        # 鍒濆鍖栨娴嬫ā鍨�
-        detect.device = self.device
-        # 鍒濆鍖杚wen妯″瀷
-        detect.qwen_tokenizer = self.qwen_tokenizer
-        detect.qwen_model = self.qwen_model
-
-        # 鍒濆鍖朚ilvus闆嗗悎
-        detect.collection = self.collection
-
-        # 璁剧疆绾跨▼鍚姩鐘舵��
-        detect._running = True
         while True:
             try:
+                res_a = self.collection.query(
+                    expr=f"is_desc == 0 and video_point_id=={camera_id}",
+                    output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
+                                   "task_name", "event_level_id", "event_level_name",
+                                   "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id",
+                                   "detect_id",
+                                   "detect_time", "image_path", "image_desc_path", "video_path"],
+                    consistency_level="Strong",
+                    order_by_field="id",  # 鎸塱d瀛楁鎺掑簭
+                    order_by_type="desc"  # 闄嶅簭鎺掑垪
+                )
+
                 # 璇诲彇鍏变韩鍐呭瓨涓殑鍥剧墖
-                image_id = get_mem.smem_read_frame_qianwen(camera_id)
-                if image_id > 0:
-                    logging.info(f"璇诲彇鍥惧儚鎴愬姛: {image_id}")
-                    image_id = detect.tark_do(image_id,self.config.get("filesavepath"),self.config.get("ragurl"),self.config.get("max_tokens"))
-                    logging.info(f"澶勭悊鍥惧儚鎴愬姛: {image_id}")
+                # image_id = get_mem.smem_read_frame_qianwen(camera_id)
+                if len(res_a) > 0:
+                    sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
+                    # 鏌ヨ鍓峃涓渶澶х殑ID
+                    num = int(self.config.get("threadnum")) - 1
+                    res_a = sorted_results[:num]
+                    for res in res_a:
+                        data = {
+                            "id": res['id'],
+                            "event_level_id": res['event_level_id'],  # event_level_id
+                            "event_level_name": res['event_level_name'],  # event_level_id
+                            "rule_id": res["rule_id"],
+                            "video_point_id": res['video_point_id'],  # video_point_id
+                            "video_point_name": res['video_point_name'],
+                            "is_waning": 0,
+                            "is_desc": 1,
+                            "zh_desc_class": res['zh_desc_class'],  # text_vector
+                            "bounding_box": res['bounding_box'],  # bounding_box
+                            "task_id": res['task_id'],  # task_id
+                            "task_name": res['task_name'],  # task_id
+                            "detect_id": res['detect_id'],  # detect_id
+                            "detect_time": res['detect_time'],  # detect_time
+                            "detect_num": res['detect_num'],
+                            "waning_value": res['waning_value'],
+                            "image_path": res['image_path'],  # image_path
+                            "image_desc_path": res['image_desc_path'],  # image_desc_path
+                            "video_path": res['video_path'],
+                            "text_vector": res['text_vector']
+                        }
+                        # logging.info(f"璇诲彇鍥惧儚鎴愬姛: {res['id']}")
+                        # 淇濆瓨鍒癿ilvus
+                        image_id = self.collection.upsert(data).primary_keys
+                        res['id'] = image_id[0]
+                        # logging.info(f"璇诲彇鍥惧儚鎴愬姛: {image_id}")
+                        image_id = self.pool.submit(res)
+                        # image_id = pool.tark_do(image_id,self.config.get("ragurl"),self.config.get("ragmode"),self.config.get("max_tokens"))
+                        # logging.info(f"澶勭悊鍥惧儚鎴愬姛: {image_id}")
+                    sorted_results = None
             except Exception as e:
-                logging.info(f"{detect._thread_name}绾跨▼閿欒:{e}")
+                logging.info(f"{camera_id}绾跨▼閿欒:{e}")
 
     #璋冪敤鏄惁闇�瑕佹洿鏂�
     def isUpdate(self):
diff --git a/qwen_thread.py b/qwen_thread.py
new file mode 100644
index 0000000..56ba944
--- /dev/null
+++ b/qwen_thread.py
@@ -0,0 +1,316 @@
+import time
+from concurrent.futures import ThreadPoolExecutor
+import threading
+
+import torch
+from PIL import Image
+from pymilvus import connections, Collection
+from datetime import datetime
+import os
+import requests
+import asyncio
+import logging
+import re
+from logging.handlers import RotatingFileHandler
+
+from transformers import AutoModelForVision2Seq, AutoProcessor
+
+
+class qwen_thread:
+    def __init__(self, max_workers,config,model_path):
+        self.executor = ThreadPoolExecutor(max_workers=max_workers)
+        self.semaphore = threading.Semaphore(max_workers)
+        self.max_workers = max_workers
+        # 鍒濆鍖朚ilvus闆嗗悎
+        connections.connect("default", host=config.get("milvusurl"), port=config.get("milvusport"))
+        # 鍔犺浇闆嗗悎
+        self.collection = Collection(name="smartobject")
+        self.collection.load()
+
+        self.config = config
+        self.model_pool = []
+        self.lock_pool = [threading.Lock() for _ in range(max_workers)]
+        for i in range(max_workers):
+            model = AutoModelForVision2Seq.from_pretrained(
+                model_path,
+                device_map="cuda:1",
+                trust_remote_code=True,
+                torch_dtype=torch.float16
+            ).eval()
+            self.model_pool.append(model)
+
+        # 鍏变韩鐨勫鐞嗗櫒 (绾跨▼瀹夊叏)
+        self.processor = AutoProcessor.from_pretrained(model_path,use_fast=True)
+
+
+        # 鍒涘缓瀹炰緥涓撳睘logger
+        self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
+        self.logger.setLevel(logging.INFO)
+        # 閬垮厤閲嶅娣诲姞handler
+        if not self.logger.handlers:
+            handler = RotatingFileHandler(
+                filename=os.path.join("logs", 'thread_log.log'),
+                maxBytes=10 * 1024 * 1024,
+                backupCount=3,
+                encoding='utf-8'
+            )
+            formatter = logging.Formatter(
+                '%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s'
+            )
+            handler.setFormatter(formatter)
+            self.logger.addHandler(handler)
+
+    def submit(self,res_a):
+        # 灏濊瘯鑾峰彇淇″彿閲忥紙闈為樆濉烇級
+        acquired = self.semaphore.acquire(blocking=False)
+
+        if not acquired:
+            self.logger.info(f"绾跨▼姹犲凡婊★紝绛夊緟绌洪棽绾跨▼... (褰撳墠娲昏穬: {self.max_workers - self.semaphore._value}/{self.max_workers})")
+            # 闃诲绛夊緟鐩村埌鏈夊彲鐢ㄧ嚎绋�
+            self.semaphore.acquire(blocking=True)
+
+        future = self.executor.submit(self._wrap_task, res_a)
+        future.add_done_callback(self._release_semaphore)
+        return future
+
+    def _wrap_task(self, res):
+        try:
+            #self.logger.info(f"澶勭悊: { res['id']}寮�濮�")
+            current_time = datetime.now()
+            image_id = self.tark_do(res, self.config.get("ragurl"), self.config.get("ragmode"), self.config.get("max_tokens"))
+            self.logger.info(f"澶勭悊: { res['id']}瀹屾瘯{image_id}:{datetime.now() - current_time}")
+            return image_id
+        except Exception as e:
+            self.logger.info(f"浠诲姟 { res['id']} 澶勭悊鍑洪敊: {e}")
+            raise
+
+    def tark_do(self,res,ragurl,rag_mode,max_tokens):
+        try :
+            # 1. 浠庨泦鍚圓鑾峰彇鍚戦噺鍜屽厓鏁版嵁
+            is_waning = 0
+            image_des = self.image_desc(f"{res['image_desc_path']}")
+            self.logger.info(image_des)
+            if image_des:
+                # rule_text = self.get_rule(ragurl)
+                is_waning = self.image_rule_chat(image_des,res['waning_value'],ragurl,rag_mode,max_tokens)
+                is_desc = 2
+            else:
+                is_waning = 0
+                is_desc = 3
+            data = {
+                "id": res['id'],
+                "event_level_id": res['event_level_id'],  # event_level_id
+                "event_level_name": res['event_level_name'],  # event_level_id
+                "rule_id": res["rule_id"],
+                "video_point_id": res['video_point_id'],  # video_point_id
+                "video_point_name": res['video_point_name'],
+                "is_waning": is_waning,
+                "is_desc": is_desc,
+                "zh_desc_class": image_des,  # text_vector
+                "bounding_box": res['bounding_box'],  # bounding_box
+                "task_id": res['task_id'],  # task_id
+                "task_name": res['task_name'],  # task_id
+                "detect_id": res['detect_id'],  # detect_id
+                "detect_time": res['detect_time'],  # detect_time
+                "detect_num": res['detect_num'],
+                "waning_value": res['waning_value'],
+                "image_path": res['image_path'],  # image_path
+                "image_desc_path": res['image_desc_path'],  # image_desc_path
+                "video_path": res['video_path'],
+                "text_vector": res['text_vector']
+            }
+            # 淇濆瓨鍒癿ilvus
+            image_id = self.collection.upsert(data).primary_keys
+            if is_desc == 2:
+                data = {
+                    "id": str(image_id[0]),
+                    "video_point_id": res['video_point_id'],
+                    "video_path": res["video_point_name"],
+                    "zh_desc_class": image_des,
+                    "detect_time": res['detect_time'],
+                    "image_path": f"{res['image_path']}",
+                    "task_name": res["task_name"],
+                    "event_level_name": res["event_level_name"],
+                    "rtsp_address": f"{res['video_path']}"
+                }
+                # 璋冪敤rag
+                asyncio.run(self.insert_json_data(ragurl, data))
+            return image_id
+        except Exception as e:
+            self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊:浠诲姟锛歿e}")
+            return 0
+
+    def image_desc(self, image_path):
+        try:
+            model, lock = self._acquire_model()
+            # 2. 澶勭悊鍥惧儚
+            image = Image.open(image_path).convert("RGB")  # 鏇挎崲涓烘偍鐨勫浘鐗囪矾寰�
+            image = image.resize((600, 600), Image.Resampling.LANCZOS)  # 楂樿川閲忕缉鏀�
+            messages = [
+                {
+                    "role": "user",
+                    "content": [
+                        {
+                            "type": "image",
+                        },
+                        {"type": "text", "text": "璇疯缁嗘弿杩板浘鐗囦腑鐨勭洰鏍囦俊鎭強鐗瑰緛銆傝繑鍥炴牸寮忎负鏁存鏂囧瓧鎻忚堪"},
+                    ],
+                }
+            ]
+            # Preparation for inference
+            text = self.processor.apply_chat_template(
+                messages, add_generation_prompt=True
+            )
+            inputs = self.processor(
+                text=[text],
+                images=[image],
+                padding=True,
+                return_tensors="pt",
+            )
+            inputs = inputs.to("cuda:1")
+            current_time = datetime.now()
+            outputs = model.generate(**inputs,
+                                               max_new_tokens=300,
+                                               do_sample=True,
+                                               temperature=0.7,
+                                               renormalize_logits=True
+                                               )
+            print(f"澶勭悊瀹屾瘯:{datetime.now() - current_time}")
+            generated_ids = outputs[:, len(inputs.input_ids[0]):]
+            image_text = self.processor.batch_decode(
+                generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
+            )
+            image_des = (image_text[0]).strip()
+            return image_des
+        except Exception as e:
+            self.logger.info(f"绾跨▼锛氭墽琛屽浘鐗囨弿杩版椂鍑洪敊:{e}")
+        finally:
+            # 4. 閲婃斁妯″瀷
+            self._release_model(model)
+            torch.cuda.empty_cache()
+
+    def get_rule(self,ragurl):
+        try:
+            rule_text = None
+            search_data = {
+                "collection_name": "smart_rule",
+                "query_text": "",
+                "search_mode": "hybrid",
+                "limit": 100,
+                "weight_dense": 0.7,
+                "weight_sparse": 0.3,
+                "filter_expr": "",
+                "output_fields": ["text"]
+            }
+            response = requests.post(ragurl + "/search", json=search_data)
+            results = response.json().get('results')
+            rule_text = ""
+            ruleid = 1
+            for rule in results:
+                if rule['score'] >= 0:
+                    rule_text = rule_text + str(ruleid) + ". " + rule['entity'].get('text') + ";\n"
+                    ruleid = ruleid + 1
+            # self.logger.info(len(rule_text))
+            else:
+                self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{response}")
+            return rule_text
+        except Exception as e:
+            self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{e}")
+            return None
+
+    def image_rule_chat(self, image_des,rule_text, ragurl, rag_mode,max_tokens):
+        try:
+            content = (
+                f"鍥剧墖鎻忚堪鍐呭涓猴細\n{image_des}\n瑙勫垯鍐呭锛歕n{rule_text}銆俓n璇烽獙璇佸浘鐗囨弿杩颁腑鏄惁鏈夌鍚堣鍒欑殑鍐呭锛屼笉杩涜鎺ㄧ悊鍜宼hink銆傝繑鍥炵粨鏋滄牸寮忎负[xxx绗﹀悎鐨勮鍒檌d]锛屽鏋滄病鏈夎繑鍥瀃]")
+            # self.logger.info(content)
+            #self.logger.info(len(content))
+            search_data = {
+                "prompt": "",
+                "messages": [
+                    {
+                        "role": "user",
+                        "content": content
+                    }
+                ],
+                "llm_name": rag_mode,
+                "stream": False,
+                "gen_conf": {
+                    "temperature": 0.7,
+                    "max_tokens": max_tokens
+                }
+            }
+            response = requests.post(ragurl + "/chat", json=search_data)
+            results = response.json().get('data')
+            #self.logger.info(len(results))
+            # self.logger.info(results)
+            ret = re.sub(r'<think>.*?</think>', '', results, flags=re.DOTALL)
+            ret = ret.replace(" ", "").replace("\t", "").replace("\n", "")
+            is_waning = 0
+            if len(ret) > 2:
+                is_waning = 1
+            return is_waning
+        except Exception as e:
+            self.logger.info(f"绾跨▼锛氭墽琛岃鍒欏尮閰嶆椂鍑洪敊:{image_des, rule_text, ragurl, rag_mode,e}")
+            return None
+
+    async def insert_json_data(self, ragurl, data):
+        try:
+            data = {'collection_name': "smartrag", "data": data, "description": ""}
+            requests.post(ragurl + "/insert_json_data", json=data, timeout=(0.3, 0.3))
+            #self.logger.info(f"璋冪敤褰曞儚鏈嶅姟:{ragurl, data}")
+        except Exception as e:
+            #self.logger.info(f"{self._thread_name}绾跨▼锛氳皟鐢ㄥ綍鍍忔椂鍑洪敊:鍦板潃锛歿ragurl}锛歿e}")
+            return
+
+    def _release_semaphore(self, future):
+        self.semaphore.release()
+        self.logger.info(f"閲婃斁绾跨▼ (鍓╀綑绌洪棽: {self.semaphore._value}/{self.max_workers})")
+
+    def shutdown(self):
+        """瀹夊叏鍏抽棴"""
+        self.executor.shutdown(wait=False)
+        for model in self.model_pool:
+            del model
+        torch.cuda.empty_cache()
+
+    def _acquire_model(self):
+        """浠庢睜涓幏鍙栦竴涓┖闂叉ā鍨� (绠�鍗曡疆璇�)"""
+        while True:
+            for i, (model, lock) in enumerate(zip(self.model_pool, self.lock_pool)):
+                if lock.acquire(blocking=False):
+                    return model, lock
+            time.sleep(0.1)  # 閬垮厤CPU绌鸿浆
+
+    def _release_model(self, model):
+        """閲婃斁妯″瀷鍥炴睜"""
+        for i, m in enumerate(self.model_pool):
+            if m == model:
+                self.lock_pool[i].release()
+                break
+
+
+    def remove_duplicate_lines(self,text):
+        seen = set()
+        result = []
+        for line in text.split('銆�'):  # 鎸夊彞鍙峰垎鍓�
+            if line.strip() and line not in seen:
+                seen.add(line)
+                result.append(line)
+        return '銆�'.join(result)
+    def remove_duplicate_lines_d(self,text):
+        seen = set()
+        result = []
+        for line in text.split(','):  # 鎸夊彞鍙峰垎鍓�
+            if line.strip() and line not in seen:
+                seen.add(line)
+                result.append(line)
+        return '銆�'.join(result)
+    def remove_duplicate_lines_n(self,text):
+        seen = set()
+        result = []
+        for line in text.split('\n'):  # 鎸夊彞鍙峰垎鍓�
+            if line.strip() and line not in seen:
+                seen.add(line)
+                result.append(line)
+        return '銆�'.join(result)
+

--
Gitblit v1.8.0