From 028e39f85bf4115024d0467613f26a1750ff004e Mon Sep 17 00:00:00 2001
From: zhm <839713154@qq.com>
Date: 星期四, 24 七月 2025 09:18:19 +0800
Subject: [PATCH] 预警跟图片描述优化

---
 qwen_detect.py             |   76 ++++
 qwen_thread.py             |  325 +--------------------
 conf.txt                   |    2 
 qwen_thread_description.py |  464 ++++++++++++++++++++++++++++++
 4 files changed, 555 insertions(+), 312 deletions(-)

diff --git a/conf.txt b/conf.txt
index cc12af5..4e24a17 100644
--- a/conf.txt
+++ b/conf.txt
@@ -14,6 +14,8 @@
 max_tokens = 200
 threadnum = 4
 detectnum = 1
+qwendescription=1
+qwenwarning=1
 qwenaddr = /home/debian/Qwen2.5-VL-3B-Instruct-GPTQ-Int4
 cuda = 1
 logaddr=/home/debian/logs
\ No newline at end of file
diff --git a/qwen_detect.py b/qwen_detect.py
index 2b769ed..b603aef 100644
--- a/qwen_detect.py
+++ b/qwen_detect.py
@@ -9,6 +9,8 @@
 from pymilvus import connections, Collection
 from logging.handlers import RotatingFileHandler
 import get_mem
+from qwen_thread_description import qwen_thread_description
+
 
 class ThreadPool:
     def __init__(self):
@@ -59,8 +61,10 @@
         # 鍔犺浇闆嗗悎
         self.collection = Collection(name="smartobject")
         self.collection.load()
-        #鍒涘缓qwen绾跨▼姹�
-        self.pool = qwen_thread(self.config,self.logger)
+        #鍒涘缓qwen绾跨▼姹�--棰勮绾跨▼姹�
+        self.poolWarning = qwen_thread(self.config,self.logger)
+        # 鍒涘缓qwen绾跨▼姹�--鍥剧墖鎻忚堪绛�
+        self.poolDescription= qwen_thread_description(self.config, self.logger)
         #鏄惁鏇存柊
         self._isupdate = False
 
@@ -87,12 +91,69 @@
             self.threads[camera_id] = t
             return t
 
-    # 鍚姩绾跨▼浠诲姟
+    # 鍚姩绾跨▼浠诲姟--棰勮
+    def workerWarning(self, camera_id):
+            while True:
+                try:
+                    res_a = self.collection.query(
+                        expr=f"is_desc == 0 and video_point_id=={camera_id}",
+                        output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name",
+                                       "task_id",
+                                       "task_name", "event_level_id", "event_level_name",
+                                       "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value",
+                                       "rule_id",
+                                       "detect_id", "knowledge_id", "suggestion", "risk_description",
+                                       "detect_time", "image_path", "image_desc_path", "video_path"],
+                        consistency_level="Strong",
+                        order_by_field="id",  # 鎸塱d瀛楁鎺掑簭
+                        order_by_type="desc"  # 闄嶅簭鎺掑垪
+                    )
+                    # 璇诲彇鍏变韩鍐呭瓨涓殑鍥剧墖
+                    # image_id = get_mem.smem_read_frame_qianwen(camera_id)
+                    if len(res_a) > 0:
+                        # sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
+                        # res = sorted_results[0]
+                        res = max(res_a, key=itemgetter("id"))
+                        self.collection.delete(f"id == {res['id']}")
+                        # 鏁版嵁缁�
+                        data = {
+                            "event_level_id": res['event_level_id'],  # event_level_id
+                            "event_level_name": res['event_level_name'],  # event_level_id
+                            "rule_id": res["rule_id"],
+                            "video_point_id": res['video_point_id'],  # video_point_id
+                            "video_point_name": res['video_point_name'],
+                            "is_waning": 0,
+                            "is_desc":4,#棰勮涓姸鎬�
+                            "zh_desc_class": res['zh_desc_class'],
+                            "bounding_box": res['bounding_box'],  # bounding_box
+                            "task_id": res['task_id'],  # task_id
+                            "task_name": res['task_name'],  # task_id
+                            "detect_id": res['detect_id'],  # detect_id
+                            "detect_time": res['detect_time'],  # detect_time
+                            "detect_num": res['detect_num'],
+                            "waning_value": res['waning_value'],
+                            "image_path": res['image_path'],  # image_path
+                            "image_desc_path": res['image_desc_path'],  # image_desc_path
+                            "video_path": res['video_path'],
+                            "text_vector": res['text_vector'],
+                            "risk_description": res['risk_description'],
+                            "suggestion": res['suggestion'],
+                            "knowledge_id": res['knowledge_id']
+                        }
+                        # 淇濆瓨鍒癿ilvus
+                        image_id = self.collection.insert(data).primary_keys
+                        res['id'] = image_id[0]
+                        self.poolWarning.submit(res)
+                    time_sel.sleep(0.01)
+                except Exception as e:
+                    logging.info(f"{camera_id}绾跨▼閿欒:{e}")
+
+    # 鍚姩绾跨▼浠诲姟--鐢熸垚鍥剧墖鎻忚堪绛�
     def worker(self, camera_id):
         while True:
             try:
                 res_a = self.collection.query(
-                    expr=f"is_desc == 0 and video_point_id=={camera_id}",
+                    expr=f"is_desc == 5 and video_point_id=={camera_id}",
                     output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
                                    "task_name", "event_level_id", "event_level_name",
                                    "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id",
@@ -116,7 +177,7 @@
                         "rule_id": res["rule_id"],
                         "video_point_id": res['video_point_id'],  # video_point_id
                         "video_point_name": res['video_point_name'],
-                        "is_waning": 0,
+                        "is_waning": res['is_waning'],
                         "is_desc": 1,
                         "zh_desc_class": res['zh_desc_class'],
                         "bounding_box": res['bounding_box'],  # bounding_box
@@ -137,7 +198,7 @@
                     # 淇濆瓨鍒癿ilvus
                     image_id = self.collection.insert(data).primary_keys
                     res['id'] = image_id[0]
-                    self.pool.submit(res)
+                    self.poolDescription.submit(res)
                 time_sel.sleep(0.01)
             except Exception as e:
                 logging.info(f"{camera_id}绾跨▼閿欒:{e}")
@@ -226,6 +287,9 @@
                         thread = pool.threads.get(camera.get("camera_id"))
                         if not thread:
                             logging.info(f"寮�濮嬪垱寤簕camera.get('camera_id')}绾跨▼")
+                            #鍏堝鏁版嵁杩涜棰勮
+                            pool.safe_start(pool.workerWarning, camera.get('camera_id'))
+                            #鍦ㄧ敓鎴愬浘鐗囨弿杩般�佸鐞嗗缓璁瓑淇℃伅
                             pool.safe_start(pool.worker, camera.get('camera_id'))
                             logging.info(f"{camera.get('camera_id')}绾跨▼鍒涘缓瀹屾瘯")
 
diff --git a/qwen_thread.py b/qwen_thread.py
index f1cb6e0..f3f8703 100644
--- a/qwen_thread.py
+++ b/qwen_thread.py
@@ -1,3 +1,4 @@
+import logging
 import time
 from concurrent.futures import ThreadPoolExecutor
 import threading
@@ -26,13 +27,13 @@
         # 鍔犺浇闆嗗悎
         self.collection = Collection(name="smartobject")
         self.collection.load()
-        if config.get('cuda') == None or config.get('cuda') == '0':
+        if config.get('cuda') is None or config.get('cuda') == '0':
             self.device = f"cuda"
         else:
             self.device = f"cuda:{config.get('cuda')}"
         self.model_pool = []
-        self.lock_pool = [threading.Lock() for _ in range(int(config.get("threadnum")))]
-        for i in range(int(config.get("threadnum"))):
+        self.lock_pool = [threading.Lock() for _ in range(int(config.get("qwenwarning")))]
+        for i in range(int(config.get("qwenwarning"))):
             model = AutoModelForVision2Seq.from_pretrained(
                 config.get("qwenaddr"),
                 device_map=self.device,
@@ -70,27 +71,15 @@
 
     def tark_do(self,res,ragurl,rag_mode,max_tokens):
         try:
-            # 1. 浠庨泦鍚圓鑾峰彇鍚戦噺鍜屽厓鏁版嵁
-            is_waning = 0
-            is_desc = 2
             # 鐢熸垚鍥剧墖鎻忚堪
             ks_time = datetime.now()
-            desc_time = datetime.now() - ks_time
-            current_time = datetime.now()
+
             risk_description = ""
             suggestion = ""
             # 璋冪敤瑙勫垯鍖归厤鏂规硶,鍒ゆ柇鏄惁棰勮
             is_waning = self.image_rule(res)
-            # 濡傛灉棰勮,鍒欑敓鎴愰殣鎮f弿杩板拰澶勭悊寤鸿
-            if is_waning == 1:
-                # 鑾峰彇瑙勭珷鍒跺害鏁版嵁
-                filedata = self.get_filedata(res['waning_value'],res['suggestion'], ragurl)
-                # 鐢熸垚闅愭偅鎻忚堪
-                risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
-                # 鐢熸垚澶勭悊寤鸿
-                suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
-                #self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿res['id']}:鏄惁棰勮{is_waning},瀹夊叏闅愭偅锛歿risk_description}\n澶勭悊寤鸿锛歿suggestion}")
-                # 鏁版嵁缁�
+            self.logger.info(f"棰勮瑙勫垯瑙勫垯瑙勫垯is_waning锛歿is_waning}")
+            #鏇存柊鏁版嵁鐨勯璀︾粨鏋滀笌鏁版嵁棰勮鐘舵��
             data = {
                 "event_level_id": res['event_level_id'],  # event_level_id
                 "event_level_name": res['event_level_name'],  # event_level_id
@@ -98,7 +87,7 @@
                 "video_point_id": res['video_point_id'],  # video_point_id
                 "video_point_name": res['video_point_name'],
                 "is_waning": is_waning,
-                "is_desc": 1,
+                "is_desc": 5,  #鏀逛负宸茬粡棰勮
                 "zh_desc_class": res['zh_desc_class'],  # text_vector
                 "bounding_box": res['bounding_box'],  # bounding_box
                 "task_id": res['task_id'],  # task_id
@@ -119,105 +108,14 @@
             # 淇濆瓨鍒癿ilvus
             image_id = self.collection.insert(data).primary_keys
             res['id'] = image_id[0]
-            # 鍥剧墖鎻忚堪鐢熸垚鎴愬姛
-            desc = self.image_desc(res)
-            if desc:
-                is_desc = 2
-            else:
-                is_desc = 3
-            # 鏁版嵁缁�
-            data = {
-                "event_level_id": res['event_level_id'],  # event_level_id
-                "event_level_name": res['event_level_name'],  # event_level_id
-                "rule_id": res["rule_id"],
-                "video_point_id": res['video_point_id'],  # video_point_id
-                "video_point_name": res['video_point_name'],
-                "is_waning": is_waning,
-                "is_desc": is_desc,
-                "zh_desc_class": desc,  # text_vector
-                "bounding_box": res['bounding_box'],  # bounding_box
-                "task_id": res['task_id'],  # task_id
-                "task_name": res['task_name'],  # task_id
-                "detect_id": res['detect_id'],  # detect_id
-                "detect_time": res['detect_time'],  # detect_time
-                "detect_num": res['detect_num'],
-                "waning_value": res['waning_value'],
-                "image_path": res['image_path'],  # image_path
-                "image_desc_path": res['image_desc_path'],  # image_desc_path
-                "video_path": res['video_path'],
-                "text_vector": res['text_vector'],
-                "risk_description": risk_description,
-                "suggestion": suggestion,
-                "knowledge_id": res['knowledge_id']
-            }
-            self.collection.delete(f"id == {res['id']}")
-            # 淇濆瓨鍒癿ilvus
-            image_id = self.collection.insert(data).primary_keys
-            data = {
-                "id": str(image_id[0]),
-                "video_point_id": res['video_point_id'],
-                "video_path": res["video_point_name"],
-                "zh_desc_class": desc,
-                "detect_time": res['detect_time'],
-                "image_path": f"{res['image_path']}",
-                "task_name": res["task_name"],
-                "event_level_name": res["event_level_name"],
-                "rtsp_address": f"{res['video_path']}"
-            }
-            # 璋冪敤rag
-            asyncio.run(self.insert_json_data(ragurl, data))
-            rag_time = datetime.now() - current_time
-            self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿image_id}杩愯缁撴潫鎬讳綋鐢ㄦ椂:{datetime.now() - ks_time},鍥剧墖鎻忚堪鐢ㄦ椂{desc_time},RAG鐢ㄦ椂{rag_time}")
-            if is_waning == 1:
-                self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿image_id},鍥剧墖鎻忚堪锛歿desc}\n闅愭偅锛歿risk_description}\n寤鸿锛歿suggestion}")
+            self.logger.info(f"{res['video_point_id']}棰勮鎵ц瀹屾瘯锛歿image_id}杩愯缁撴潫鎬讳綋鐢ㄦ椂:{datetime.now() - ks_time}")
+            return None
         except Exception as e:
             self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊::{e}")
             return 0
 
-    def image_desc(self, res_data):
-        try:
-            model, lock = self._acquire_model()
-
-            image = Image.open(res_data['image_desc_path']).convert("RGB").resize((600, 600), Image.Resampling.LANCZOS)
-            messages = [
-                {
-                    "role": "user",
-                    "content": [
-                        {
-                            "type": "image",
-                        },
-                        {"type": "text", "text": "璇疯缁嗘弿杩板浘鐗囦腑鐨勭洰鏍囦俊鎭強鐗瑰緛銆傝繑鍥炴牸寮忎负鏁存鏂囧瓧鎻忚堪"},
-                    ],
-                }
-            ]
-            # Preparation for inference
-            text = self.processor.apply_chat_template(
-                messages, add_generation_prompt=True
-            )
-            inputs = self.processor(
-                text=[text],
-                images=[image],
-                padding=True,
-                return_tensors="pt",
-            )
-            inputs = inputs.to(model.device)
-            with torch.inference_mode(), torch.amp.autocast(device_type=self.device, dtype=torch.float16):
-                outputs = model.generate(**inputs,max_new_tokens=200,do_sample=False,num_beams=1,temperature=None,top_p=None,top_k=1,use_cache=True,repetition_penalty=1.0)
-            generated_ids = outputs[:, len(inputs.input_ids[0]):]
-            image_text = self.processor.batch_decode(
-                generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
-            )
-            image_des = (image_text[0]).strip()
-            #self.logger.info(f"{res_data['video_point_id']}:{res_data['id']}:{res_data['detect_time']}:{image_des}")
-            return image_des
-        except Exception as e:
-            self.logger.info(f"绾跨▼锛氭墽琛屽浘鐗囨弿杩版椂鍑洪敊:{e}")
-        finally:
-            # 4. 閲婃斁妯″瀷
-            self._release_model(model)
-            torch.cuda.empty_cache()
-
     def image_rule(self, res_data):
+        self.logger.info(f"棰勮瑙勫垯瑙勫垯瑙勫垯绛夌骇鍒嗙被灏辨槸鎵撹缂濆灏戠Н鍒�")
         try:
             model, lock = self._acquire_model()
             image = Image.open(res_data['image_desc_path']).convert("RGB").resize((600, 600), Image.Resampling.LANCZOS)
@@ -227,7 +125,7 @@
                     "role": "user",
                     "content": [
                         {"type": "image", "image": image},
-                        {"type": "text", "text": f"鍥剧墖涓槸鍚︽湁{res_data['waning_value']}?璇峰洖绛攜es鎴杗o"},
+                        {"type": "text", "text": f"璇锋娴嬪浘鐗囦腑{res_data['waning_value']}?璇峰洖绛攜es鎴杗o"},
                     ],
                 }
             ]
@@ -254,179 +152,19 @@
             )
 
             image_des = (image_text[0]).strip()
-            return image_des
+            upper_text = image_des.upper()
+            self.logger.info(f"棰勮瑙勫垯瑙勫垯瑙勫垯锛歿upper_text}")
+            if "YES" in upper_text:
+                return 1
+            else:
+                return 0
         except Exception as e:
             self.logger.info(f"绾跨▼锛氭墽琛屽浘鐗囨弿杩版椂鍑洪敊:{e}")
+            return 0
         finally:
             # 4. 閲婃斁妯″瀷
             self._release_model(model)
             torch.cuda.empty_cache()
-
-    def get_rule(self,ragurl):
-        try:
-            rule_text = None
-            search_data = {
-                "collection_name": "smart_rule",
-                "query_text": "",
-                "search_mode": "hybrid",
-                "limit": 100,
-                "weight_dense": 0.7,
-                "weight_sparse": 0.3,
-                "filter_expr": "",
-                "output_fields": ["text"]
-            }
-            response = requests.post(ragurl + "/search", json=search_data)
-            results = response.json().get('results')
-            rule_text = ""
-            ruleid = 1
-            for rule in results:
-                if rule['score'] >= 0:
-                    rule_text = rule_text + str(ruleid) + ". " + rule['entity'].get('text') + ";\n"
-                    ruleid = ruleid + 1
-            # self.logger.info(len(rule_text))
-            else:
-                self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{response}")
-            return rule_text
-        except Exception as e:
-            self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{e}")
-            return None
-
-    def image_rule_chat(self, image_des,rule_text, ragurl, rag_mode,max_tokens):
-        try:
-            content = (
-                f"鍥剧墖鎻忚堪鍐呭涓猴細\n{image_des}\n瑙勫垯鍐呭锛歕n{rule_text}銆俓n璇烽獙璇佸浘鐗囨弿杩颁腑鏄惁鏈変笉绗﹀悎瑙勫垯鐨勫唴瀹癸紝涓嶈繘琛屾帹鐞嗗拰think銆傝繑鍥炵粨鏋滄牸寮忎负[xxx绗﹀悎鐨勮鍒檌d]锛屽鏋滄病鏈夎繑鍥瀃]")
-            #self.logger.info(len(content))
-            search_data = {
-                "prompt": "",
-                "messages": [
-                    {
-                        "role": "user",
-                        "content": content
-                    }
-                ],
-                "llm_name": rag_mode,
-                "stream": False,
-                "gen_conf": {
-                    "temperature": 0.7,
-                    "max_tokens": max_tokens
-                }
-            }
-            response = requests.post(ragurl + "/chat", json=search_data)
-            results = response.json().get('data')
-            ret = re.sub(r'<think>.*?</think>', '', results, flags=re.DOTALL)
-            ret = ret.replace(" ", "").replace("\t", "").replace("\n", "")
-            #self.logger.info(f"{rule_text}:{ret}")
-            is_waning = 0
-            if len(ret) > 2:
-                is_waning = 1
-            return is_waning
-        except Exception as e:
-            self.logger.info(f"绾跨▼锛氭墽琛岃鍒欏尮閰嶆椂鍑洪敊:{image_des, rule_text, ragurl, rag_mode,e}")
-            return None
-
-    # 闅愭偅鎻忚堪
-    def image_rule_chat_with_detail(self,filedata, rule_text, ragurl, rag_mode,max_tokens):
-        # API璋冪敤
-        content = (
-            f"瑙勭珷鍒跺害涓猴細[{filedata}]\n杩濆弽鍐呭涓猴細[{rule_text}]\n璇锋煡璇㈣繚鍙嶅唴瀹瑰湪瑙勭珷鍒跺害涓殑瀹夊叏闅愭偅锛屼笉杩涜鎺ㄧ悊鍜宼hink锛岃繑鍥炵畝鐭殑鏂囧瓧淇℃伅")
-        # self.logger.info(len(content))
-        search_data = {
-            "prompt": "",
-            "messages": [
-                {
-                    "role": "user",
-                    "content": content
-                }
-            ],
-            "llm_name": rag_mode,
-            "stream": False,
-            "gen_conf": {
-                "temperature": 0.7,
-                "max_tokens": max_tokens
-            }
-        }
-        #self.logger.info(content)
-        response = requests.post(ragurl + "/chat", json=search_data)
-        # 浠巎son鎻愬彇data瀛楁鍐呭
-        ret = response.json()["data"]
-        # 绉婚櫎<think>鏍囩鍜屽唴瀹�
-        ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
-        # 瀛楃涓叉竻鐞�,绉婚櫎绌烘牸,鍒惰〃绗�,鎹㈣绗�,鏄熷彿
-        ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
-        #print(f"瀹夊叏闅愭偅:{ret}")
-        return ret
-    #澶勭悊寤鸿
-    def image_rule_chat_suggestion(self,filedata, rule_text, ragurl, rag_mode,max_tokens):
-        # 璇锋眰鍐呭
-        content = (
-            f"瑙勭珷鍒跺害涓猴細[{filedata}]\n杩濆弽鍐呭涓猴細[{rule_text}]\n璇锋煡璇㈣繚鍙嶅唴瀹瑰湪瑙勭珷鍒跺害涓殑澶勭悊寤鸿锛屼笉杩涜鎺ㄧ悊鍜宼hink锛岃繑鍥炵畝鐭殑鏂囧瓧淇℃伅")
-        response = requests.post(
-            # ollama鍦板潃
-            url=f"{ragurl}/chat",
-            json={
-                # 鎸囧畾妯″瀷
-                "llm_name": rag_mode,
-                "messages": [
-                    {"role": "user", "content": content}
-                ],
-                "stream": False,  # 鍏抽棴娴佸紡杈撳嚭
-                "gen_conf": {
-                    "temperature": 0.7,
-                    "max_tokens": max_tokens
-                }
-            }
-        )
-        # 浠巎son鎻愬彇data瀛楁鍐呭
-        ret = response.json()["data"]
-        # 绉婚櫎<think>鏍囩鍜屽唴瀹�
-        ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
-        # 瀛楃涓叉竻鐞�,绉婚櫎绌烘牸,鍒惰〃绗�,鎹㈣绗�,鏄熷彿
-        ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
-        #print(f"澶勭悊寤鸿:{ret}")
-        return ret
-
-    # RAG鏈嶅姟鍙戦�佽姹�,鑾峰彇鐭ヨ瘑搴撳唴瀹�
-    def get_filedata(self, searchtext,filter_expr, ragurl):
-        search_data = {
-            # 鐭ヨ瘑搴撻泦鍚�
-            "collection_name": "smart_knowledge",
-            # 鏌ヨ鏂囨湰
-            "query_text": searchtext,
-            # 鎼滅储妯″紡
-            "search_mode": "hybrid",
-            # 鏈�澶氳繑鍥炵粨鏋�
-            "limit": 10,
-            # 璋冨瘑鍚戦噺鎼滅储鏉冮噸
-            "weight_dense": 0.9,
-            # 绋�鐤忓悜閲忔悳绱㈡潈閲�
-            "weight_sparse": 0.1,
-            # 绌哄瓧绗︿覆
-            "filter_expr": f"docnm_kwd in {filter_expr}",
-            # 鍙繑鍥� text 瀛楁
-            "output_fields": ["text"]
-        }
-        #print(search_data)
-        # 鍚� ragurl + "/search" 绔偣鍙戦�丳OST璇锋眰
-        response = requests.post(ragurl + "/search", json=search_data)
-        # 浠庡搷搴斾腑鑾峰彇'results'瀛楁
-        results = response.json().get('results')
-        # 鍒濆鍖� text
-        text = ""
-        # 閬嶅巻鎵�鏈夌粨鏋滆鍒�(rule)锛屽皢姣忔潯瑙勫垯鐨�'entity'涓殑'text'瀛楁鍙栧嚭.
-        for rule in results:
-            text = text + rule['entity'].get('text') + ";\n"
-        #print(text)
-        return text
-
-    async def insert_json_data(self, ragurl, data):
-        try:
-            data = {'collection_name': "smartrag", "data": data, "description": ""}
-            requests.post(ragurl + "/insert_json_data", json=data, timeout=(0.3, 0.3))
-            #self.logger.info(f"璋冪敤褰曞儚鏈嶅姟:{ragurl, data}")
-        except Exception as e:
-            #self.logger.info(f"{self._thread_name}绾跨▼锛氳皟鐢ㄥ綍鍍忔椂鍑洪敊:鍦板潃锛歿ragurl}锛歿e}")
-            return
-
     def _release_semaphore(self, future):
         self.semaphore.release()
         #self.logger.info(f"閲婃斁绾跨▼ (鍓╀綑绌洪棽: {self.semaphore._value}/{self.max_workers})")
@@ -453,29 +191,4 @@
                 self.lock_pool[i].release()
                 break
 
-
-    def remove_duplicate_lines(self,text):
-        seen = set()
-        result = []
-        for line in text.split('銆�'):  # 鎸夊彞鍙峰垎鍓�
-            if line.strip() and line not in seen:
-                seen.add(line)
-                result.append(line)
-        return '銆�'.join(result)
-    def remove_duplicate_lines_d(self,text):
-        seen = set()
-        result = []
-        for line in text.split(','):  # 鎸夊彞鍙峰垎鍓�
-            if line.strip() and line not in seen:
-                seen.add(line)
-                result.append(line)
-        return '銆�'.join(result)
-    def remove_duplicate_lines_n(self,text):
-        seen = set()
-        result = []
-        for line in text.split('\n'):  # 鎸夊彞鍙峰垎鍓�
-            if line.strip() and line not in seen:
-                seen.add(line)
-                result.append(line)
-        return '銆�'.join(result)
 
diff --git a/qwen_thread_description.py b/qwen_thread_description.py
new file mode 100644
index 0000000..4dbc849
--- /dev/null
+++ b/qwen_thread_description.py
@@ -0,0 +1,464 @@
+import logging
+import time
+from concurrent.futures import ThreadPoolExecutor
+import threading
+import torch
+from PIL import Image
+from pymilvus import connections, Collection
+from datetime import datetime
+import requests
+import asyncio
+import re
+
+from qwen_vl_utils import process_vision_info
+from transformers import AutoModelForVision2Seq, AutoProcessor
+
+
+class qwen_thread_description:
+    def __init__(self, config,logger):
+        self.config = config
+        self.max_workers = int(config.get("threadnum"))
+        self.executor = ThreadPoolExecutor(max_workers=int(config.get("threadnum")))
+        self.semaphore = threading.Semaphore(int(config.get("threadnum")))
+        self.logger = logger
+
+        # 鍒濆鍖朚ilvus闆嗗悎
+        connections.connect("default", host=config.get("milvusurl"), port=config.get("milvusport"))
+        # 鍔犺浇闆嗗悎
+        self.collection = Collection(name="smartobject")
+        self.collection.load()
+        if config.get('cuda') is None or config.get('cuda') == '0':
+            self.device = f"cuda"
+        else:
+            self.device = f"cuda:{config.get('cuda')}"
+        self.model_pool = []
+        self.lock_pool = [threading.Lock() for _ in range(int(config.get("qwendescription")))]
+        for i in range(int(config.get("qwendescription"))):
+            model = AutoModelForVision2Seq.from_pretrained(
+                config.get("qwenaddr"),
+                device_map=self.device,
+                trust_remote_code=True,
+                use_safetensors=True,
+                torch_dtype=torch.float16
+
+            ).eval()
+            model = model.to(self.device)
+            self.model_pool.append(model)
+
+        # 鍏变韩鐨勫鐞嗗櫒 (绾跨▼瀹夊叏)
+        self.processor = AutoProcessor.from_pretrained(config.get("qwenaddr"), use_fast=True)
+
+
+    def submit(self,res_a):
+        # 灏濊瘯鑾峰彇淇″彿閲忥紙闈為樆濉烇級
+        acquired = self.semaphore.acquire(blocking=False)
+
+        if not acquired:
+            #self.logger.info(f"绾跨▼姹犲凡婊★紝绛夊緟绌洪棽绾跨▼... (褰撳墠娲昏穬: {self.max_workers - self.semaphore._value}/{self.max_workers})")
+            # 闃诲绛夊緟鐩村埌鏈夊彲鐢ㄧ嚎绋�
+            self.semaphore.acquire(blocking=True)
+
+        future = self.executor.submit(self._wrap_task, res_a)
+        future.add_done_callback(self._release_semaphore)
+        return future
+
+    def _wrap_task(self, res_a):
+        try:
+            self.tark_do(res_a, self.config.get("ragurl"), self.config.get("ragmode"), self.config.get("max_tokens"))
+        except Exception as e:
+            self.logger.info(f"澶勭悊鍑洪敊: {e}")
+            raise
+
+    def tark_do(self,res,ragurl,rag_mode,max_tokens):
+        try:
+            # 1. 浠庨泦鍚圓鑾峰彇鍚戦噺鍜屽厓鏁版嵁
+            is_waning = 0
+            is_desc = 2
+            # 鐢熸垚鍥剧墖鎻忚堪
+            ks_time = datetime.now()
+
+            risk_description = ""
+            suggestion = ""
+            # 璋冪敤瑙勫垯鍖归厤鏂规硶,鍒ゆ柇鏄惁棰勮
+            is_waning = res['is_waning']
+            self.logger.info(f"棰勮瑙勫垯瑙勫垯瑙勫垯is_waning锛歿is_waning}")
+            # 濡傛灉棰勮,鍒欑敓鎴愰殣鎮f弿杩板拰澶勭悊寤鸿
+            if is_waning == 1:
+                # 鑾峰彇瑙勭珷鍒跺害鏁版嵁
+                filedata = self.get_filedata(res['waning_value'],res['suggestion'], ragurl)
+                # 鐢熸垚闅愭偅鎻忚堪
+                risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
+                # 鐢熸垚澶勭悊寤鸿
+                suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
+                #self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿res['id']}:鏄惁棰勮{is_waning},瀹夊叏闅愭偅锛歿risk_description}\n澶勭悊寤鸿锛歿suggestion}")
+                # 鏁版嵁缁�
+
+            desc_time = datetime.now() - ks_time
+            current_time = datetime.now()
+            # 鍥剧墖鎻忚堪鐢熸垚鎴愬姛
+            desc = self.image_desc(res)
+            if desc:
+                is_desc = 2
+            else:
+                is_desc = 3
+            # 鏁版嵁缁�
+            data = {
+                "event_level_id": res['event_level_id'],  # event_level_id
+                "event_level_name": res['event_level_name'],  # event_level_id
+                "rule_id": res["rule_id"],
+                "video_point_id": res['video_point_id'],  # video_point_id
+                "video_point_name": res['video_point_name'],
+                "is_waning": is_waning,
+                "is_desc": is_desc,
+                "zh_desc_class": desc,  # text_vector
+                "bounding_box": res['bounding_box'],  # bounding_box
+                "task_id": res['task_id'],  # task_id
+                "task_name": res['task_name'],  # task_id
+                "detect_id": res['detect_id'],  # detect_id
+                "detect_time": res['detect_time'],  # detect_time
+                "detect_num": res['detect_num'],
+                "waning_value": res['waning_value'],
+                "image_path": res['image_path'],  # image_path
+                "image_desc_path": res['image_desc_path'],  # image_desc_path
+                "video_path": res['video_path'],
+                "text_vector": res['text_vector'],
+                "risk_description": risk_description,
+                "suggestion": suggestion,
+                "knowledge_id": res['knowledge_id']
+            }
+            self.collection.delete(f"id == {res['id']}")
+            # 淇濆瓨鍒癿ilvus
+            image_id = self.collection.insert(data).primary_keys
+            data = {
+                "id": str(image_id[0]),
+                "video_point_id": res['video_point_id'],
+                "video_path": res["video_point_name"],
+                "zh_desc_class": desc,
+                "detect_time": res['detect_time'],
+                "image_path": f"{res['image_path']}",
+                "task_name": res["task_name"],
+                "event_level_name": res["event_level_name"],
+                "rtsp_address": f"{res['video_path']}"
+            }
+            # 璋冪敤rag
+            asyncio.run(self.insert_json_data(ragurl, data))
+            rag_time = datetime.now() - current_time
+            self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿image_id}杩愯缁撴潫鎬讳綋鐢ㄦ椂:{datetime.now() - ks_time},RAG鐢ㄦ椂{desc_time},鍥剧墖鎻忚堪鐢ㄦ椂{rag_time}")
+            if is_waning == 1:
+                self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿image_id},鍥剧墖鎻忚堪锛歿desc}\n闅愭偅锛歿risk_description}\n寤鸿锛歿suggestion}")
+        except Exception as e:
+            self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊::{e}")
+            return 0
+
+    def image_desc(self, res_data):
+        try:
+            model, lock = self._acquire_model()
+
+            image = Image.open(res_data['image_desc_path']).convert("RGB").resize((600, 600), Image.Resampling.LANCZOS)
+            messages = [
+                {
+                    "role": "user",
+                    "content": [
+                        {
+                            "type": "image",
+                        },
+                        {"type": "text", "text": "璇疯缁嗘弿杩板浘鐗囦腑鐨勭洰鏍囦俊鎭強鐗瑰緛銆傝繑鍥炴牸寮忎负鏁存鏂囧瓧鎻忚堪"},
+                    ],
+                }
+            ]
+            # Preparation for inference
+            text = self.processor.apply_chat_template(
+                messages, add_generation_prompt=True
+            )
+            inputs = self.processor(
+                text=[text],
+                images=[image],
+                padding=True,
+                return_tensors="pt",
+            )
+            inputs = inputs.to(model.device)
+            with torch.inference_mode(), torch.amp.autocast(device_type=self.device, dtype=torch.float16):
+                outputs = model.generate(**inputs,max_new_tokens=200,do_sample=False,num_beams=1,temperature=0.1)
+            generated_ids = outputs[:, len(inputs.input_ids[0]):]
+            image_text = self.processor.batch_decode(
+                generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
+            )
+            image_des = (image_text[0]).strip()
+            #self.logger.info(f"{res_data['video_point_id']}:{res_data['id']}:{res_data['detect_time']}:{image_des}")
+            return image_des
+        except Exception as e:
+            self.logger.info(f"绾跨▼锛氭墽琛屽浘鐗囨弿杩版椂鍑洪敊:{e}")
+        finally:
+            # 4. 閲婃斁妯″瀷
+            self._release_model(model)
+            torch.cuda.empty_cache()
+
+    def image_rule(self, res_data):
+        self.logger.info(f"棰勮瑙勫垯瑙勫垯瑙勫垯绛夌骇鍒嗙被灏辨槸鎵撹缂濆灏戠Н鍒�")
+        try:
+            model, lock = self._acquire_model()
+            image = Image.open(res_data['image_desc_path']).convert("RGB").resize((600, 600), Image.Resampling.LANCZOS)
+
+            messages = [
+                {
+                    "role": "user",
+                    "content": [
+                        {"type": "image", "image": image},
+                        {"type": "text", "text": f"璇锋娴嬪浘鐗囦腑{res_data['waning_value']}?璇峰洖绛攜es鎴杗o"},
+                    ],
+                }
+            ]
+
+            # Preparation for inference
+            text = self.processor.apply_chat_template(
+                messages, tokenize=False, add_generation_prompt=True
+            )
+            image_inputs, video_inputs = process_vision_info(messages)
+            inputs = self.processor(
+                text=[text],
+                images=image_inputs,
+                videos=video_inputs,
+                padding=True,
+                return_tensors="pt",
+            )
+            inputs = inputs.to(model.device)
+
+            with torch.no_grad():
+                outputs = model.generate(**inputs, max_new_tokens=10)
+            generated_ids = outputs[:, len(inputs.input_ids[0]):]
+            image_text = self.processor.batch_decode(
+                generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
+            )
+
+            image_des = (image_text[0]).strip()
+            upper_text = image_des.upper()
+            self.logger.info(f"棰勮瑙勫垯瑙勫垯瑙勫垯锛歿upper_text}")
+            if "YES" in upper_text:
+                return 1
+            else:
+                return 0
+        except Exception as e:
+            self.logger.info(f"绾跨▼锛氭墽琛屽浘鐗囨弿杩版椂鍑洪敊:{e}")
+            return 0
+        finally:
+            # 4. 閲婃斁妯″瀷
+            self._release_model(model)
+            torch.cuda.empty_cache()
+
+    def get_rule(self,ragurl):
+        try:
+            rule_text = None
+            search_data = {
+                "collection_name": "smart_rule",
+                "query_text": "",
+                "search_mode": "hybrid",
+                "limit": 100,
+                "weight_dense": 0.7,
+                "weight_sparse": 0.3,
+                "filter_expr": "",
+                "output_fields": ["text"]
+            }
+            response = requests.post(ragurl + "/search", json=search_data)
+            results = response.json().get('results')
+            rule_text = ""
+            ruleid = 1
+            for rule in results:
+                if rule['score'] >= 0:
+                    rule_text = rule_text + str(ruleid) + ". " + rule['entity'].get('text') + ";\n"
+                    ruleid = ruleid + 1
+            # self.logger.info(len(rule_text))
+            else:
+                self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{response}")
+            return rule_text
+        except Exception as e:
+            self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{e}")
+            return None
+
+    def image_rule_chat(self, image_des,rule_text, ragurl, rag_mode,max_tokens):
+        try:
+            content = (
+                f"鍥剧墖鎻忚堪鍐呭涓猴細\n{image_des}\n瑙勫垯鍐呭锛歕n{rule_text}銆俓n璇烽獙璇佸浘鐗囨弿杩颁腑鏄惁鏈変笉绗﹀悎瑙勫垯鐨勫唴瀹癸紝涓嶈繘琛屾帹鐞嗗拰think銆傝繑鍥炵粨鏋滄牸寮忎负[xxx绗﹀悎鐨勮鍒檌d]锛屽鏋滄病鏈夎繑鍥瀃]")
+            #self.logger.info(len(content))
+            search_data = {
+                "prompt": "",
+                "messages": [
+                    {
+                        "role": "user",
+                        "content": content
+                    }
+                ],
+                "llm_name": rag_mode,
+                "stream": False,
+                "gen_conf": {
+                    "temperature": 0.7,
+                    "max_tokens": max_tokens
+                }
+            }
+            response = requests.post(ragurl + "/chat", json=search_data)
+            results = response.json().get('data')
+            ret = re.sub(r'<think>.*?</think>', '', results, flags=re.DOTALL)
+            ret = ret.replace(" ", "").replace("\t", "").replace("\n", "")
+            #self.logger.info(f"{rule_text}:{ret}")
+            is_waning = 0
+            if len(ret) > 2:
+                is_waning = 1
+            return is_waning
+        except Exception as e:
+            self.logger.info(f"绾跨▼锛氭墽琛岃鍒欏尮閰嶆椂鍑洪敊:{image_des, rule_text, ragurl, rag_mode,e}")
+            return None
+
+    # 闅愭偅鎻忚堪
+    def image_rule_chat_with_detail(self,filedata, rule_text, ragurl, rag_mode,max_tokens):
+        # API璋冪敤
+        content = (
+            f"瑙勭珷鍒跺害涓猴細[{filedata}]\n杩濆弽鍐呭涓猴細[{rule_text}]\n璇锋煡璇㈣繚鍙嶅唴瀹瑰湪瑙勭珷鍒跺害涓殑瀹夊叏闅愭偅锛屼笉杩涜鎺ㄧ悊鍜宼hink锛岃繑鍥炵畝鐭殑鏂囧瓧淇℃伅")
+        # self.logger.info(len(content))
+        search_data = {
+            "prompt": "",
+            "messages": [
+                {
+                    "role": "user",
+                    "content": content
+                }
+            ],
+            "llm_name": rag_mode,
+            "stream": False,
+            "gen_conf": {
+                "temperature": 0.7,
+                "max_tokens": max_tokens
+            }
+        }
+        #self.logger.info(content)
+        response = requests.post(ragurl + "/chat", json=search_data)
+        # 浠巎son鎻愬彇data瀛楁鍐呭
+        ret = response.json()["data"]
+        # 绉婚櫎<think>鏍囩鍜屽唴瀹�
+        ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
+        # 瀛楃涓叉竻鐞�,绉婚櫎绌烘牸,鍒惰〃绗�,鎹㈣绗�,鏄熷彿
+        ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
+        #print(f"瀹夊叏闅愭偅:{ret}")
+        return ret
+    #澶勭悊寤鸿
+    def image_rule_chat_suggestion(self,filedata, rule_text, ragurl, rag_mode,max_tokens):
+        # 璇锋眰鍐呭
+        content = (
+            f"瑙勭珷鍒跺害涓猴細[{filedata}]\n杩濆弽鍐呭涓猴細[{rule_text}]\n璇锋煡璇㈣繚鍙嶅唴瀹瑰湪瑙勭珷鍒跺害涓殑澶勭悊寤鸿锛屼笉杩涜鎺ㄧ悊鍜宼hink锛岃繑鍥炵畝鐭殑鏂囧瓧淇℃伅")
+        response = requests.post(
+            # ollama鍦板潃
+            url=f"{ragurl}/chat",
+            json={
+                # 鎸囧畾妯″瀷
+                "llm_name": rag_mode,
+                "messages": [
+                    {"role": "user", "content": content}
+                ],
+                "stream": False,  # 鍏抽棴娴佸紡杈撳嚭
+                "gen_conf": {
+                    "temperature": 0.7,
+                    "max_tokens": max_tokens
+                }
+            }
+        )
+        # 浠巎son鎻愬彇data瀛楁鍐呭
+        ret = response.json()["data"]
+        # 绉婚櫎<think>鏍囩鍜屽唴瀹�
+        ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
+        # 瀛楃涓叉竻鐞�,绉婚櫎绌烘牸,鍒惰〃绗�,鎹㈣绗�,鏄熷彿
+        ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
+        #print(f"澶勭悊寤鸿:{ret}")
+        return ret
+
+    # RAG鏈嶅姟鍙戦�佽姹�,鑾峰彇鐭ヨ瘑搴撳唴瀹�
+    def get_filedata(self, searchtext,filter_expr, ragurl):
+        search_data = {
+            # 鐭ヨ瘑搴撻泦鍚�
+            "collection_name": "smart_knowledge",
+            # 鏌ヨ鏂囨湰
+            "query_text": searchtext,
+            # 鎼滅储妯″紡
+            "search_mode": "hybrid",
+            # 鏈�澶氳繑鍥炵粨鏋�
+            "limit": 10,
+            # 璋冨瘑鍚戦噺鎼滅储鏉冮噸
+            "weight_dense": 0.9,
+            # 绋�鐤忓悜閲忔悳绱㈡潈閲�
+            "weight_sparse": 0.1,
+            # 绌哄瓧绗︿覆
+            "filter_expr": f"docnm_kwd in {filter_expr}",
+            # 鍙繑鍥� text 瀛楁
+            "output_fields": ["text"]
+        }
+        #print(search_data)
+        # 鍚� ragurl + "/search" 绔偣鍙戦�丳OST璇锋眰
+        response = requests.post(ragurl + "/search", json=search_data)
+        # 浠庡搷搴斾腑鑾峰彇'results'瀛楁
+        results = response.json().get('results')
+        # 鍒濆鍖� text
+        text = ""
+        # 閬嶅巻鎵�鏈夌粨鏋滆鍒�(rule)锛屽皢姣忔潯瑙勫垯鐨�'entity'涓殑'text'瀛楁鍙栧嚭.
+        for rule in results:
+            text = text + rule['entity'].get('text') + ";\n"
+        #print(text)
+        return text
+
+    async def insert_json_data(self, ragurl, data):
+        try:
+            data = {'collection_name': "smartrag", "data": data, "description": ""}
+            requests.post(ragurl + "/insert_json_data", json=data, timeout=(0.3, 0.3))
+            #self.logger.info(f"璋冪敤褰曞儚鏈嶅姟:{ragurl, data}")
+        except Exception as e:
+            #self.logger.info(f"{self._thread_name}绾跨▼锛氳皟鐢ㄥ綍鍍忔椂鍑洪敊:鍦板潃锛歿ragurl}锛歿e}")
+            return
+
+    def _release_semaphore(self, future):
+        self.semaphore.release()
+        #self.logger.info(f"閲婃斁绾跨▼ (鍓╀綑绌洪棽: {self.semaphore._value}/{self.max_workers})")
+
+    def shutdown(self):
+        """瀹夊叏鍏抽棴"""
+        self.executor.shutdown(wait=False)
+        for model in self.model_pool:
+            del model
+        torch.cuda.empty_cache()
+
+    def _acquire_model(self):
+        """浠庢睜涓幏鍙栦竴涓┖闂叉ā鍨� (绠�鍗曡疆璇�)"""
+        while True:
+            for i, (model, lock) in enumerate(zip(self.model_pool, self.lock_pool)):
+                if lock.acquire(blocking=False):
+                    return model, lock
+            time.sleep(0.1)  # 閬垮厤CPU绌鸿浆
+
+    def _release_model(self, model):
+        """閲婃斁妯″瀷鍥炴睜"""
+        for i, m in enumerate(self.model_pool):
+            if m == model:
+                self.lock_pool[i].release()
+                break
+
+
+    def remove_duplicate_lines(self,text):
+        seen = set()
+        result = []
+        for line in text.split('銆�'):  # 鎸夊彞鍙峰垎鍓�
+            if line.strip() and line not in seen:
+                seen.add(line)
+                result.append(line)
+        return '銆�'.join(result)
+    def remove_duplicate_lines_d(self,text):
+        seen = set()
+        result = []
+        for line in text.split(','):  # 鎸夊彞鍙峰垎鍓�
+            if line.strip() and line not in seen:
+                seen.add(line)
+                result.append(line)
+        return '銆�'.join(result)
+    def remove_duplicate_lines_n(self,text):
+        seen = set()
+        result = []
+        for line in text.split('\n'):  # 鎸夊彞鍙峰垎鍓�
+            if line.strip() and line not in seen:
+                seen.add(line)
+                result.append(line)
+        return '銆�'.join(result)
+

--
Gitblit v1.8.0