From 028e39f85bf4115024d0467613f26a1750ff004e Mon Sep 17 00:00:00 2001
From: zhm <839713154@qq.com>
Date: 星期四, 24 七月 2025 09:18:19 +0800
Subject: [PATCH] 预警跟图片描述优化

---
 qwen_detect.py |  178 ++++++++++++++++++++++++++++++++++++++++-------------------
 1 files changed, 120 insertions(+), 58 deletions(-)

diff --git a/qwen_detect.py b/qwen_detect.py
index 35f87fe..b603aef 100644
--- a/qwen_detect.py
+++ b/qwen_detect.py
@@ -1,5 +1,4 @@
 from operator import itemgetter
-import torch
 import threading
 import time as time_sel
 from typing import Dict
@@ -7,10 +6,11 @@
 import requests
 import os
 import logging
-from transformers import AutoProcessor, AutoModelForVision2Seq
 from pymilvus import connections, Collection
 from logging.handlers import RotatingFileHandler
 import get_mem
+from qwen_thread_description import qwen_thread_description
+
 
 class ThreadPool:
     def __init__(self):
@@ -31,26 +31,8 @@
                     value = value.strip()
                     # 灏嗛敭鍊煎娣诲姞鍒板瓧鍏镐腑
                     self.config[key] = value
-        # 閰嶇疆鏃ュ織
-        # 纭繚鏃ュ織鐩綍瀛樺湪
-        log_dir = "logs"
-        os.makedirs(log_dir, exist_ok=True)
-        self.threads: Dict[str, threading.Thread] = {}
-        self.lock = threading.Lock()
-
-        # 鍒濆鍖朚ilvus闆嗗悎
-        connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport"))
-        # 鍔犺浇闆嗗悎
-        self.collection = Collection(name="smartobject")
-        self.collection.load()
-        self.pool = qwen_thread(int(self.config.get("threadnum")), self.config,self.config.get("qwenaddr"))
-        #鏄惁鏇存柊
-        self._isupdate = False
-
-        # 鍒濆鍖栧叡浜唴瀛�
-        get_mem.smem_init()
-
-        # 閰嶇疆鏃ュ織
+        # 鍒涘缓瀹炰緥涓撳睘logger
+        os.makedirs(self.config.get("logaddr"), exist_ok=True)
         logging.basicConfig(
             level=logging.INFO,
             format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s',
@@ -58,7 +40,7 @@
             handlers=[
                 # 鎸夊ぇ灏忚疆杞殑鏃ュ織鏂囦欢锛堟渶澶�10MB锛屼繚鐣�3涓浠斤級
                 RotatingFileHandler(
-                    filename=os.path.join(log_dir, 'start_log.log'),
+                    filename=os.path.join(self.config.get("logaddr"), 'qwen_log.log'),
                     maxBytes=10 * 1024 * 1024,  # 10MB
                     backupCount=3,
                     encoding='utf-8'
@@ -67,6 +49,28 @@
                 logging.StreamHandler()
             ]
         )
+        self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
+        self.logger.setLevel(logging.INFO)
+
+
+        self.threads: Dict[str, threading.Thread] = {}
+        self.lock = threading.Lock()
+
+        # 鍒濆鍖朚ilvus闆嗗悎
+        connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport"))
+        # 鍔犺浇闆嗗悎
+        self.collection = Collection(name="smartobject")
+        self.collection.load()
+        #鍒涘缓qwen绾跨▼姹�--棰勮绾跨▼姹�
+        self.poolWarning = qwen_thread(self.config,self.logger)
+        # 鍒涘缓qwen绾跨▼姹�--鍥剧墖鎻忚堪绛�
+        self.poolDescription= qwen_thread_description(self.config, self.logger)
+        #鏄惁鏇存柊
+        self._isupdate = False
+
+        # 鍒濆鍖栧叡浜唴瀛�
+        get_mem.smem_init()
+
 
     #鍚姩绾跨▼
     def safe_start(self, target_func, camera_id):
@@ -87,40 +91,40 @@
             self.threads[camera_id] = t
             return t
 
-    # 鍚姩绾跨▼浠诲姟
-    def worker(self, camera_id):
-        while True:
-            try:
-                res_a = self.collection.query(
-                    expr=f"is_desc == 0 and video_point_id=={camera_id}",
-                    output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
-                                   "task_name", "event_level_id", "event_level_name",
-                                   "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id",
-                                   "detect_id",
-                                   "detect_time", "image_path", "image_desc_path", "video_path"],
-                    consistency_level="Strong",
-                    order_by_field="id",  # 鎸塱d瀛楁鎺掑簭
-                    order_by_type="desc"  # 闄嶅簭鎺掑垪
-                )
-
-                # 璇诲彇鍏变韩鍐呭瓨涓殑鍥剧墖
-                # image_id = get_mem.smem_read_frame_qianwen(camera_id)
-                if len(res_a) > 0:
-                    sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
-                    # 鏌ヨ鍓峃涓渶澶х殑ID
-                    num = int(self.config.get("threadnum")) - 1
-                    res_a = sorted_results[:num]
-                    for res in res_a:
+    # 鍚姩绾跨▼浠诲姟--棰勮
+    def workerWarning(self, camera_id):
+            while True:
+                try:
+                    res_a = self.collection.query(
+                        expr=f"is_desc == 0 and video_point_id=={camera_id}",
+                        output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name",
+                                       "task_id",
+                                       "task_name", "event_level_id", "event_level_name",
+                                       "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value",
+                                       "rule_id",
+                                       "detect_id", "knowledge_id", "suggestion", "risk_description",
+                                       "detect_time", "image_path", "image_desc_path", "video_path"],
+                        consistency_level="Strong",
+                        order_by_field="id",  # 鎸塱d瀛楁鎺掑簭
+                        order_by_type="desc"  # 闄嶅簭鎺掑垪
+                    )
+                    # 璇诲彇鍏变韩鍐呭瓨涓殑鍥剧墖
+                    # image_id = get_mem.smem_read_frame_qianwen(camera_id)
+                    if len(res_a) > 0:
+                        # sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
+                        # res = sorted_results[0]
+                        res = max(res_a, key=itemgetter("id"))
+                        self.collection.delete(f"id == {res['id']}")
+                        # 鏁版嵁缁�
                         data = {
-                            "id": res['id'],
                             "event_level_id": res['event_level_id'],  # event_level_id
                             "event_level_name": res['event_level_name'],  # event_level_id
                             "rule_id": res["rule_id"],
                             "video_point_id": res['video_point_id'],  # video_point_id
                             "video_point_name": res['video_point_name'],
                             "is_waning": 0,
-                            "is_desc": 1,
-                            "zh_desc_class": res['zh_desc_class'],  # text_vector
+                            "is_desc":4,#棰勮涓姸鎬�
+                            "zh_desc_class": res['zh_desc_class'],
                             "bounding_box": res['bounding_box'],  # bounding_box
                             "task_id": res['task_id'],  # task_id
                             "task_name": res['task_name'],  # task_id
@@ -131,19 +135,74 @@
                             "image_path": res['image_path'],  # image_path
                             "image_desc_path": res['image_desc_path'],  # image_desc_path
                             "video_path": res['video_path'],
-                            "text_vector": res['text_vector']
+                            "text_vector": res['text_vector'],
+                            "risk_description": res['risk_description'],
+                            "suggestion": res['suggestion'],
+                            "knowledge_id": res['knowledge_id']
                         }
-                        # logging.info(f"璇诲彇鍥惧儚鎴愬姛: {res['id']}")
                         # 淇濆瓨鍒癿ilvus
-                        image_id = self.collection.upsert(data).primary_keys
+                        image_id = self.collection.insert(data).primary_keys
                         res['id'] = image_id[0]
-                        # logging.info(f"璇诲彇鍥惧儚鎴愬姛: {image_id}")
-                        image_id = self.pool.submit(res)
-                        # image_id = pool.tark_do(image_id,self.config.get("ragurl"),self.config.get("ragmode"),self.config.get("max_tokens"))
-                        # logging.info(f"澶勭悊鍥惧儚鎴愬姛: {image_id}")
-                    sorted_results = None
+                        self.poolWarning.submit(res)
+                    time_sel.sleep(0.01)
+                except Exception as e:
+                    logging.info(f"{camera_id}绾跨▼閿欒:{e}")
+
+    # 鍚姩绾跨▼浠诲姟--鐢熸垚鍥剧墖鎻忚堪绛�
+    def worker(self, camera_id):
+        while True:
+            try:
+                res_a = self.collection.query(
+                    expr=f"is_desc == 5 and video_point_id=={camera_id}",
+                    output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
+                                   "task_name", "event_level_id", "event_level_name",
+                                   "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id",
+                                   "detect_id","knowledge_id","suggestion","risk_description",
+                                   "detect_time", "image_path", "image_desc_path", "video_path"],
+                    consistency_level="Strong",
+                    order_by_field="id",  # 鎸塱d瀛楁鎺掑簭
+                    order_by_type="desc"  # 闄嶅簭鎺掑垪
+                )
+                # 璇诲彇鍏变韩鍐呭瓨涓殑鍥剧墖
+                # image_id = get_mem.smem_read_frame_qianwen(camera_id)
+                if len(res_a) > 0:
+                    #sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
+                    #res = sorted_results[0]
+                    res = max(res_a, key=itemgetter("id"))
+                    self.collection.delete(f"id == {res['id']}")
+                    # 鏁版嵁缁�
+                    data = {
+                        "event_level_id": res['event_level_id'],  # event_level_id
+                        "event_level_name": res['event_level_name'],  # event_level_id
+                        "rule_id": res["rule_id"],
+                        "video_point_id": res['video_point_id'],  # video_point_id
+                        "video_point_name": res['video_point_name'],
+                        "is_waning": res['is_waning'],
+                        "is_desc": 1,
+                        "zh_desc_class": res['zh_desc_class'],
+                        "bounding_box": res['bounding_box'],  # bounding_box
+                        "task_id": res['task_id'],  # task_id
+                        "task_name": res['task_name'],  # task_id
+                        "detect_id": res['detect_id'],  # detect_id
+                        "detect_time": res['detect_time'],  # detect_time
+                        "detect_num": res['detect_num'],
+                        "waning_value": res['waning_value'],
+                        "image_path": res['image_path'],  # image_path
+                        "image_desc_path": res['image_desc_path'],  # image_desc_path
+                        "video_path": res['video_path'],
+                        "text_vector": res['text_vector'],
+                        "risk_description": res['risk_description'],
+                        "suggestion": res['suggestion'],
+                        "knowledge_id": res['knowledge_id']
+                    }
+                    # 淇濆瓨鍒癿ilvus
+                    image_id = self.collection.insert(data).primary_keys
+                    res['id'] = image_id[0]
+                    self.poolDescription.submit(res)
+                time_sel.sleep(0.01)
             except Exception as e:
                 logging.info(f"{camera_id}绾跨▼閿欒:{e}")
+
 
     #璋冪敤鏄惁闇�瑕佹洿鏂�
     def isUpdate(self):
@@ -228,6 +287,9 @@
                         thread = pool.threads.get(camera.get("camera_id"))
                         if not thread:
                             logging.info(f"寮�濮嬪垱寤簕camera.get('camera_id')}绾跨▼")
+                            #鍏堝鏁版嵁杩涜棰勮
+                            pool.safe_start(pool.workerWarning, camera.get('camera_id'))
+                            #鍦ㄧ敓鎴愬浘鐗囨弿杩般�佸鐞嗗缓璁瓑淇℃伅
                             pool.safe_start(pool.worker, camera.get('camera_id'))
                             logging.info(f"{camera.get('camera_id')}绾跨▼鍒涘缓瀹屾瘯")
 

--
Gitblit v1.8.0