From 112aace08718ad0ead624286fe09e4bf941dee5a Mon Sep 17 00:00:00 2001
From: zhm <839713154@qq.com>
Date: 星期五, 25 七月 2025 14:16:33 +0800
Subject: [PATCH] 预警跟图片描述修改--修改检测内容

---
 qwen_thread.py |  264 ++++++++++++++--------------------------------------
 1 files changed, 71 insertions(+), 193 deletions(-)

diff --git a/qwen_thread.py b/qwen_thread.py
index 56ba944..4c3a1d6 100644
--- a/qwen_thread.py
+++ b/qwen_thread.py
@@ -1,71 +1,60 @@
+import logging
 import time
 from concurrent.futures import ThreadPoolExecutor
 import threading
-
 import torch
 from PIL import Image
 from pymilvus import connections, Collection
 from datetime import datetime
-import os
 import requests
 import asyncio
-import logging
 import re
-from logging.handlers import RotatingFileHandler
 
+from qwen_vl_utils import process_vision_info
 from transformers import AutoModelForVision2Seq, AutoProcessor
 
 
 class qwen_thread:
-    def __init__(self, max_workers,config,model_path):
-        self.executor = ThreadPoolExecutor(max_workers=max_workers)
-        self.semaphore = threading.Semaphore(max_workers)
-        self.max_workers = max_workers
+    def __init__(self, config,logger):
+        self.config = config
+        self.max_workers = int(config.get("threadnum"))
+        self.executor = ThreadPoolExecutor(max_workers=int(config.get("threadnum")))
+        self.semaphore = threading.Semaphore(int(config.get("threadnum")))
+        self.logger = logger
+
         # 鍒濆鍖朚ilvus闆嗗悎
         connections.connect("default", host=config.get("milvusurl"), port=config.get("milvusport"))
         # 鍔犺浇闆嗗悎
         self.collection = Collection(name="smartobject")
         self.collection.load()
-
-        self.config = config
+        if config.get('cuda') is None or config.get('cuda') == '0':
+            self.device = f"cuda"
+        else:
+            self.device = f"cuda:{config.get('cuda')}"
         self.model_pool = []
-        self.lock_pool = [threading.Lock() for _ in range(max_workers)]
-        for i in range(max_workers):
+        self.lock_pool = [threading.Lock() for _ in range(int(config.get("qwenwarning")))]
+        for i in range(int(config.get("qwenwarning"))):
             model = AutoModelForVision2Seq.from_pretrained(
-                model_path,
-                device_map="cuda:1",
+                config.get("qwenaddr"),
+                device_map=self.device,
                 trust_remote_code=True,
+                use_safetensors=True,
                 torch_dtype=torch.float16
+
             ).eval()
+            model = model.to(self.device)
             self.model_pool.append(model)
 
         # 鍏变韩鐨勫鐞嗗櫒 (绾跨▼瀹夊叏)
-        self.processor = AutoProcessor.from_pretrained(model_path,use_fast=True)
+        self.processor = AutoProcessor.from_pretrained(config.get("qwenaddr"), use_fast=True)
 
-
-        # 鍒涘缓瀹炰緥涓撳睘logger
-        self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
-        self.logger.setLevel(logging.INFO)
-        # 閬垮厤閲嶅娣诲姞handler
-        if not self.logger.handlers:
-            handler = RotatingFileHandler(
-                filename=os.path.join("logs", 'thread_log.log'),
-                maxBytes=10 * 1024 * 1024,
-                backupCount=3,
-                encoding='utf-8'
-            )
-            formatter = logging.Formatter(
-                '%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s'
-            )
-            handler.setFormatter(formatter)
-            self.logger.addHandler(handler)
 
     def submit(self,res_a):
         # 灏濊瘯鑾峰彇淇″彿閲忥紙闈為樆濉烇級
         acquired = self.semaphore.acquire(blocking=False)
 
         if not acquired:
-            self.logger.info(f"绾跨▼姹犲凡婊★紝绛夊緟绌洪棽绾跨▼... (褰撳墠娲昏穬: {self.max_workers - self.semaphore._value}/{self.max_workers})")
+            #self.logger.info(f"绾跨▼姹犲凡婊★紝绛夊緟绌洪棽绾跨▼... (褰撳墠娲昏穬: {self.max_workers - self.semaphore._value}/{self.max_workers})")
             # 闃诲绛夊緟鐩村埌鏈夊彲鐢ㄧ嚎绋�
             self.semaphore.acquire(blocking=True)
 
@@ -73,40 +62,33 @@
         future.add_done_callback(self._release_semaphore)
         return future
 
-    def _wrap_task(self, res):
+    def _wrap_task(self, res_a):
         try:
-            #self.logger.info(f"澶勭悊: { res['id']}寮�濮�")
-            current_time = datetime.now()
-            image_id = self.tark_do(res, self.config.get("ragurl"), self.config.get("ragmode"), self.config.get("max_tokens"))
-            self.logger.info(f"澶勭悊: { res['id']}瀹屾瘯{image_id}:{datetime.now() - current_time}")
-            return image_id
+            self.tark_do(res_a, self.config.get("ragurl"), self.config.get("ragmode"), self.config.get("max_tokens"))
         except Exception as e:
-            self.logger.info(f"浠诲姟 { res['id']} 澶勭悊鍑洪敊: {e}")
+            self.logger.info(f"澶勭悊鍑洪敊: {e}")
             raise
 
     def tark_do(self,res,ragurl,rag_mode,max_tokens):
-        try :
-            # 1. 浠庨泦鍚圓鑾峰彇鍚戦噺鍜屽厓鏁版嵁
-            is_waning = 0
-            image_des = self.image_desc(f"{res['image_desc_path']}")
-            self.logger.info(image_des)
-            if image_des:
-                # rule_text = self.get_rule(ragurl)
-                is_waning = self.image_rule_chat(image_des,res['waning_value'],ragurl,rag_mode,max_tokens)
-                is_desc = 2
-            else:
-                is_waning = 0
-                is_desc = 3
+        try:
+            # 鐢熸垚鍥剧墖鎻忚堪
+            ks_time = datetime.now()
+
+            risk_description = ""
+            suggestion = ""
+            # 璋冪敤瑙勫垯鍖归厤鏂规硶,鍒ゆ柇鏄惁棰勮
+            is_waning = self.image_rule(res)
+            self.logger.info(f"棰勮瑙勫垯瑙勫垯瑙勫垯is_waning锛歿is_waning}")
+            #鏇存柊鏁版嵁鐨勯璀︾粨鏋滀笌鏁版嵁棰勮鐘舵��
             data = {
-                "id": res['id'],
                 "event_level_id": res['event_level_id'],  # event_level_id
                 "event_level_name": res['event_level_name'],  # event_level_id
                 "rule_id": res["rule_id"],
                 "video_point_id": res['video_point_id'],  # video_point_id
                 "video_point_name": res['video_point_name'],
                 "is_waning": is_waning,
-                "is_desc": is_desc,
-                "zh_desc_class": image_des,  # text_vector
+                "is_desc": 5,  #鏀逛负宸茬粡棰勮
+                "zh_desc_class": res['zh_desc_class'],  # text_vector
                 "bounding_box": res['bounding_box'],  # bounding_box
                 "task_id": res['task_id'],  # task_id
                 "task_name": res['task_name'],  # task_id
@@ -117,154 +99,75 @@
                 "image_path": res['image_path'],  # image_path
                 "image_desc_path": res['image_desc_path'],  # image_desc_path
                 "video_path": res['video_path'],
-                "text_vector": res['text_vector']
+                "text_vector": res['text_vector'],
+                "risk_description": risk_description,
+                "suggestion": suggestion,
+                "knowledge_id": res['knowledge_id']
             }
+            self.collection.delete(f"id == {res['id']}")
             # 淇濆瓨鍒癿ilvus
-            image_id = self.collection.upsert(data).primary_keys
-            if is_desc == 2:
-                data = {
-                    "id": str(image_id[0]),
-                    "video_point_id": res['video_point_id'],
-                    "video_path": res["video_point_name"],
-                    "zh_desc_class": image_des,
-                    "detect_time": res['detect_time'],
-                    "image_path": f"{res['image_path']}",
-                    "task_name": res["task_name"],
-                    "event_level_name": res["event_level_name"],
-                    "rtsp_address": f"{res['video_path']}"
-                }
-                # 璋冪敤rag
-                asyncio.run(self.insert_json_data(ragurl, data))
-            return image_id
+            image_id = self.collection.insert(data).primary_keys
+            res['id'] = image_id[0]
+            self.logger.info(f"{res['video_point_id']}棰勮鎵ц瀹屾瘯锛歿image_id}杩愯缁撴潫鎬讳綋鐢ㄦ椂:{datetime.now() - ks_time}")
+            return None
         except Exception as e:
-            self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊:浠诲姟锛歿e}")
+            self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊::{e}")
             return 0
 
-    def image_desc(self, image_path):
+    def image_rule(self, res_data):
+        self.logger.info(f"棰勮瑙勫垯瑙勫垯瑙勫垯绛夌骇鍒嗙被灏辨槸鎵撹缂濆灏戠Н鍒�")
         try:
             model, lock = self._acquire_model()
-            # 2. 澶勭悊鍥惧儚
-            image = Image.open(image_path).convert("RGB")  # 鏇挎崲涓烘偍鐨勫浘鐗囪矾寰�
-            image = image.resize((600, 600), Image.Resampling.LANCZOS)  # 楂樿川閲忕缉鏀�
+            image = Image.open(res_data['image_desc_path']).convert("RGB").resize((600, 600), Image.Resampling.LANCZOS)
+
             messages = [
                 {
                     "role": "user",
                     "content": [
-                        {
-                            "type": "image",
-                        },
-                        {"type": "text", "text": "璇疯缁嗘弿杩板浘鐗囦腑鐨勭洰鏍囦俊鎭強鐗瑰緛銆傝繑鍥炴牸寮忎负鏁存鏂囧瓧鎻忚堪"},
+                        {"type": "image", "image": image},
+                        {"type": "text", "text": f"璇锋娴嬪浘鐗囦腑鏄惁{res_data['waning_value']}?璇峰洖绛攜es鎴杗o"},
                     ],
                 }
             ]
+
             # Preparation for inference
             text = self.processor.apply_chat_template(
-                messages, add_generation_prompt=True
+                messages, tokenize=False, add_generation_prompt=True
             )
+            image_inputs, video_inputs = process_vision_info(messages)
             inputs = self.processor(
                 text=[text],
-                images=[image],
+                images=image_inputs,
+                videos=video_inputs,
                 padding=True,
                 return_tensors="pt",
             )
-            inputs = inputs.to("cuda:1")
-            current_time = datetime.now()
-            outputs = model.generate(**inputs,
-                                               max_new_tokens=300,
-                                               do_sample=True,
-                                               temperature=0.7,
-                                               renormalize_logits=True
-                                               )
-            print(f"澶勭悊瀹屾瘯:{datetime.now() - current_time}")
+            inputs = inputs.to(model.device)
+
+            with torch.no_grad():
+                outputs = model.generate(**inputs, max_new_tokens=10)
             generated_ids = outputs[:, len(inputs.input_ids[0]):]
             image_text = self.processor.batch_decode(
                 generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
             )
+
             image_des = (image_text[0]).strip()
-            return image_des
+            upper_text = image_des.upper()
+            self.logger.info(f"棰勮瑙勫垯瑙勫垯瑙勫垯锛歿upper_text}")
+            if "YES" in upper_text:
+                return 1
+            else:
+                return 0
         except Exception as e:
             self.logger.info(f"绾跨▼锛氭墽琛屽浘鐗囨弿杩版椂鍑洪敊:{e}")
+            return 0
         finally:
             # 4. 閲婃斁妯″瀷
             self._release_model(model)
             torch.cuda.empty_cache()
-
-    def get_rule(self,ragurl):
-        try:
-            rule_text = None
-            search_data = {
-                "collection_name": "smart_rule",
-                "query_text": "",
-                "search_mode": "hybrid",
-                "limit": 100,
-                "weight_dense": 0.7,
-                "weight_sparse": 0.3,
-                "filter_expr": "",
-                "output_fields": ["text"]
-            }
-            response = requests.post(ragurl + "/search", json=search_data)
-            results = response.json().get('results')
-            rule_text = ""
-            ruleid = 1
-            for rule in results:
-                if rule['score'] >= 0:
-                    rule_text = rule_text + str(ruleid) + ". " + rule['entity'].get('text') + ";\n"
-                    ruleid = ruleid + 1
-            # self.logger.info(len(rule_text))
-            else:
-                self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{response}")
-            return rule_text
-        except Exception as e:
-            self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{e}")
-            return None
-
-    def image_rule_chat(self, image_des,rule_text, ragurl, rag_mode,max_tokens):
-        try:
-            content = (
-                f"鍥剧墖鎻忚堪鍐呭涓猴細\n{image_des}\n瑙勫垯鍐呭锛歕n{rule_text}銆俓n璇烽獙璇佸浘鐗囨弿杩颁腑鏄惁鏈夌鍚堣鍒欑殑鍐呭锛屼笉杩涜鎺ㄧ悊鍜宼hink銆傝繑鍥炵粨鏋滄牸寮忎负[xxx绗﹀悎鐨勮鍒檌d]锛屽鏋滄病鏈夎繑鍥瀃]")
-            # self.logger.info(content)
-            #self.logger.info(len(content))
-            search_data = {
-                "prompt": "",
-                "messages": [
-                    {
-                        "role": "user",
-                        "content": content
-                    }
-                ],
-                "llm_name": rag_mode,
-                "stream": False,
-                "gen_conf": {
-                    "temperature": 0.7,
-                    "max_tokens": max_tokens
-                }
-            }
-            response = requests.post(ragurl + "/chat", json=search_data)
-            results = response.json().get('data')
-            #self.logger.info(len(results))
-            # self.logger.info(results)
-            ret = re.sub(r'<think>.*?</think>', '', results, flags=re.DOTALL)
-            ret = ret.replace(" ", "").replace("\t", "").replace("\n", "")
-            is_waning = 0
-            if len(ret) > 2:
-                is_waning = 1
-            return is_waning
-        except Exception as e:
-            self.logger.info(f"绾跨▼锛氭墽琛岃鍒欏尮閰嶆椂鍑洪敊:{image_des, rule_text, ragurl, rag_mode,e}")
-            return None
-
-    async def insert_json_data(self, ragurl, data):
-        try:
-            data = {'collection_name': "smartrag", "data": data, "description": ""}
-            requests.post(ragurl + "/insert_json_data", json=data, timeout=(0.3, 0.3))
-            #self.logger.info(f"璋冪敤褰曞儚鏈嶅姟:{ragurl, data}")
-        except Exception as e:
-            #self.logger.info(f"{self._thread_name}绾跨▼锛氳皟鐢ㄥ綍鍍忔椂鍑洪敊:鍦板潃锛歿ragurl}锛歿e}")
-            return
-
     def _release_semaphore(self, future):
         self.semaphore.release()
-        self.logger.info(f"閲婃斁绾跨▼ (鍓╀綑绌洪棽: {self.semaphore._value}/{self.max_workers})")
+        #self.logger.info(f"閲婃斁绾跨▼ (鍓╀綑绌洪棽: {self.semaphore._value}/{self.max_workers})")
 
     def shutdown(self):
         """瀹夊叏鍏抽棴"""
@@ -288,29 +191,4 @@
                 self.lock_pool[i].release()
                 break
 
-
-    def remove_duplicate_lines(self,text):
-        seen = set()
-        result = []
-        for line in text.split('銆�'):  # 鎸夊彞鍙峰垎鍓�
-            if line.strip() and line not in seen:
-                seen.add(line)
-                result.append(line)
-        return '銆�'.join(result)
-    def remove_duplicate_lines_d(self,text):
-        seen = set()
-        result = []
-        for line in text.split(','):  # 鎸夊彞鍙峰垎鍓�
-            if line.strip() and line not in seen:
-                seen.add(line)
-                result.append(line)
-        return '銆�'.join(result)
-    def remove_duplicate_lines_n(self,text):
-        seen = set()
-        result = []
-        for line in text.split('\n'):  # 鎸夊彞鍙峰垎鍓�
-            if line.strip() and line not in seen:
-                seen.add(line)
-                result.append(line)
-        return '銆�'.join(result)
 

--
Gitblit v1.8.0