From 89a673df861e4a6cf0f8507b96018dbaced0086d Mon Sep 17 00:00:00 2001
From: shidong <shidong@jhsoft.cc>
Date: 星期五, 11 七月 2025 14:08:45 +0800
Subject: [PATCH] #2025/7/11 #qwen_detect.py和qwen_thread.py优化了日志输出,采用一个日志,方便后期排查问题 #conf.txt从服务器获取的最新配置文件,备份 #批量处理的效果太差,比单张慢两倍,留存qwen_detect_batch.py和qwen_thread_batch.py

---
 /dev/null      |  432 ------------------------------------------------
 qwen_detect.py |   44 ++--
 qwen_thread.py |   37 +--
 conf.txt       |   25 +-
 4 files changed, 47 insertions(+), 491 deletions(-)

diff --git a/conf.txt b/conf.txt
index ed82d7e..fad40ab 100644
--- a/conf.txt
+++ b/conf.txt
@@ -1,17 +1,18 @@
-milvusurl = 192.168.1.232
+milvusurl = 192.168.1.176
 milvusport = 19530
-isupdateurl = http://192.168.1.232:8088/v1/task/isChange?stateType=2
-gettaskconfurl = http://192.168.1.232:8088/v1/task/getTaskConf
-updatestatusurl = http://192.168.1.232:8088/v1/task/updateChangeStatus?stateType=2
-videotaskurl = http://192.168.1.232:8089/api/v1/camera/record/task
-ragurl=http://192.168.1.232:8870
-ragmode=qwen2.5vl
-ollamaurl=http://192.168.1.232:11434
-ollamamode=qwen2.5vl
+isupdateurl = http://192.168.1.176:8088/v1/task/isChange?stateType=2
+gettaskconfurl = http://192.168.1.176:8088/v1/task/getTaskConf
+updatestatusurl = http://192.168.1.176:8088/v1/task/updateChangeStatus?stateType=2
+videotaskurl = http://192.168.1.176:8089/api/v1/camera/record/task
+ragurl=http://192.168.1.176:8870
+ragmode=qwen3
+ollamaurl=http://192.168.1.176:11434
+ollamamode=qwen3
+vllmurl=http://192.168.1.176:8880/v1
+vllmmode=qwen3
 filesavepath = /opt/smart
-max_tokens = 2000
-threadnum = 3
+max_tokens = 200
+threadnum = 4
 detectnum = 1
 qwenaddr = /home/debian/Qwen2.5-VL-3B-Instruct-GPTQ-Int4
 cuda = 1
-
diff --git a/qwen_detect.py b/qwen_detect.py
index 75249d0..b27a87b 100644
--- a/qwen_detect.py
+++ b/qwen_detect.py
@@ -29,26 +29,8 @@
                     value = value.strip()
                     # 灏嗛敭鍊煎娣诲姞鍒板瓧鍏镐腑
                     self.config[key] = value
-        # 閰嶇疆鏃ュ織
-        # 纭繚鏃ュ織鐩綍瀛樺湪
-        log_dir = "logs"
-        os.makedirs(log_dir, exist_ok=True)
-        self.threads: Dict[str, threading.Thread] = {}
-        self.lock = threading.Lock()
-
-        # 鍒濆鍖朚ilvus闆嗗悎
-        connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport"))
-        # 鍔犺浇闆嗗悎
-        self.collection = Collection(name="smartobject")
-        self.collection.load()
-        self.pool = qwen_thread(self.config)
-        #鏄惁鏇存柊
-        self._isupdate = False
-
-        # 鍒濆鍖栧叡浜唴瀛�
-        get_mem.smem_init()
-
-        # 閰嶇疆鏃ュ織
+        # 鍒涘缓瀹炰緥涓撳睘logger
+        os.makedirs("logs", exist_ok=True)
         logging.basicConfig(
             level=logging.INFO,
             format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s',
@@ -56,7 +38,7 @@
             handlers=[
                 # 鎸夊ぇ灏忚疆杞殑鏃ュ織鏂囦欢锛堟渶澶�10MB锛屼繚鐣�3涓浠斤級
                 RotatingFileHandler(
-                    filename=os.path.join(log_dir, 'start_log.log'),
+                    filename=os.path.join("logs", 'qwen_log.log'),
                     maxBytes=10 * 1024 * 1024,  # 10MB
                     backupCount=3,
                     encoding='utf-8'
@@ -65,6 +47,26 @@
                 logging.StreamHandler()
             ]
         )
+        self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
+        self.logger.setLevel(logging.INFO)
+
+
+        self.threads: Dict[str, threading.Thread] = {}
+        self.lock = threading.Lock()
+
+        # 鍒濆鍖朚ilvus闆嗗悎
+        connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport"))
+        # 鍔犺浇闆嗗悎
+        self.collection = Collection(name="smartobject")
+        self.collection.load()
+        #鍒涘缓qwen绾跨▼姹�
+        self.pool = qwen_thread(self.config,self.logger)
+        #鏄惁鏇存柊
+        self._isupdate = False
+
+        # 鍒濆鍖栧叡浜唴瀛�
+        get_mem.smem_init()
+
 
     #鍚姩绾跨▼
     def safe_start(self, target_func, camera_id):
diff --git a/qwen_detect_batch.py b/qwen_detect_batch.py
deleted file mode 100644
index 301c1b5..0000000
--- a/qwen_detect_batch.py
+++ /dev/null
@@ -1,249 +0,0 @@
-from operator import itemgetter
-import threading
-import time as time_sel
-from typing import Dict
-from qwen_thread_batch import qwen_thread_batch
-import requests
-import os
-import logging
-from pymilvus import connections, Collection
-from logging.handlers import RotatingFileHandler
-import get_mem
-
-class ThreadPool:
-    def __init__(self):
-        #璇诲彇閰嶇疆鏂囦欢
-        self.config = {}
-        with open('./conf.txt', 'r', encoding='utf-8') as file:
-            for line in file:
-                # 鍘婚櫎姣忚鐨勯灏剧┖鐧藉瓧绗︼紙鍖呮嫭鎹㈣绗︼級
-                line = line.strip()
-                # 璺宠繃绌鸿
-                if not line:
-                    continue
-                # 鍒嗗壊閿拰鍊�
-                if '=' in line:
-                    key, value = line.split('=', 1)
-                    # 鍘婚櫎閿拰鍊肩殑棣栧熬绌虹櫧瀛楃
-                    key = key.strip()
-                    value = value.strip()
-                    # 灏嗛敭鍊煎娣诲姞鍒板瓧鍏镐腑
-                    self.config[key] = value
-        # 閰嶇疆鏃ュ織
-        # 纭繚鏃ュ織鐩綍瀛樺湪
-        log_dir = "logs"
-        os.makedirs(log_dir, exist_ok=True)
-        self.threads: Dict[str, threading.Thread] = {}
-        self.lock = threading.Lock()
-
-        # 鍒濆鍖朚ilvus闆嗗悎
-        connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport"))
-        # 鍔犺浇闆嗗悎
-        self.collection = Collection(name="smartobject")
-        self.collection.load()
-        self.pool = qwen_thread_batch(int(self.config.get("threadnum")), self.config,"/home/debian/Qwen2.5-VL-3B-Instruct-GPTQ-Int4")
-        #鏄惁鏇存柊
-        self._isupdate = False
-
-        # 鍒濆鍖栧叡浜唴瀛�
-        get_mem.smem_init()
-
-        # 閰嶇疆鏃ュ織
-        logging.basicConfig(
-            level=logging.INFO,
-            format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s',
-            datefmt='%Y-%m-%d %H:%M:%S',
-            handlers=[
-                # 鎸夊ぇ灏忚疆杞殑鏃ュ織鏂囦欢锛堟渶澶�10MB锛屼繚鐣�3涓浠斤級
-                RotatingFileHandler(
-                    filename=os.path.join(log_dir, 'start_log.log'),
-                    maxBytes=10 * 1024 * 1024,  # 10MB
-                    backupCount=3,
-                    encoding='utf-8'
-                ),
-                # 鍚屾椂杈撳嚭鍒版帶鍒跺彴
-                logging.StreamHandler()
-            ]
-        )
-
-    #鍚姩绾跨▼
-    def safe_start(self, target_func, camera_id):
-        """绾跨▼瀹夊叏鍚姩鏂规硶"""
-        def wrapped():
-            thread_name = threading.current_thread().name
-            try:
-                target_func(camera_id)
-            except Exception as e:
-                logging.error(f"绾跨▼寮傚父: {str(e)}", exc_info=True)
-
-        with self.lock:  # 纭繚绾跨▼瀹夊叏鍒涘缓
-            t = threading.Thread(
-                target=wrapped,
-                daemon=True  # 璁剧疆涓哄畧鎶ょ嚎绋�
-            )
-            t.start()
-            self.threads[camera_id] = t
-            return t
-
-    # 鍚姩绾跨▼浠诲姟
-    def worker(self, camera_id):
-        while True:
-            try:
-                res_a = self.collection.query(
-                    expr=f"is_desc == 0 and video_point_id=={camera_id}",
-                    output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
-                                   "task_name", "event_level_id", "event_level_name",
-                                   "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id",
-                                   "detect_id","knowledge_id",
-                                   "detect_time", "image_path", "image_desc_path", "video_path"],
-                    consistency_level="Strong",
-                    order_by_field="id",  # 鎸塱d瀛楁鎺掑簭
-                    order_by_type="desc"  # 闄嶅簭鎺掑垪
-                )
-
-                # 璇诲彇鍏变韩鍐呭瓨涓殑鍥剧墖
-                # image_id = get_mem.smem_read_frame_qianwen(camera_id)
-                if len(res_a) > 0:
-                    sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
-                    # 鏌ヨ鍓峃涓渶澶х殑ID
-                    res_a = sorted_results[:int(self.config.get("detectnum"))]
-                    res_data = []
-                    for res in res_a:
-                        data = {
-                            "id": res['id'],
-                            "event_level_id": res['event_level_id'],  # event_level_id
-                            "event_level_name": res['event_level_name'],  # event_level_id
-                            "rule_id": res["rule_id"],
-                            "video_point_id": res['video_point_id'],  # video_point_id
-                            "video_point_name": res['video_point_name'],
-                            "is_waning": 0,
-                            "is_desc": 1,
-                            "zh_desc_class": res['zh_desc_class'],  # text_vector
-                            "bounding_box": res['bounding_box'],  # bounding_box
-                            "task_id": res['task_id'],  # task_id
-                            "task_name": res['task_name'],  # task_id
-                            "detect_id": res['detect_id'],  # detect_id
-                            "detect_time": res['detect_time'],  # detect_time
-                            "detect_num": res['detect_num'],
-                            "waning_value": res['waning_value'],
-                            "image_path": res['image_path'],  # image_path
-                            "image_desc_path": res['image_desc_path'],  # image_desc_path
-                            "video_path": res['video_path'],
-                            "text_vector": res['text_vector'],
-                            "knowledge_id": res['knowledge_id']
-                        }
-                        # logging.info(f"璇诲彇鍥惧儚鎴愬姛: {res['id']}")
-                        # 淇濆瓨鍒癿ilvus
-                        image_id = self.collection.upsert(data).primary_keys
-                        res['id'] = image_id[0]
-                        res_data.append(res)
-                    self.pool.submit(res_data)
-                        # image_id = pool.tark_do(image_id,self.config.get("ragurl"),self.config.get("ragmode"),self.config.get("max_tokens"))
-                        # logging.info(f"澶勭悊鍥惧儚鎴愬姛: {image_id}")
-                    sorted_results = None
-            except Exception as e:
-                logging.info(f"{camera_id}绾跨▼閿欒:{e}")
-
-    #璋冪敤鏄惁闇�瑕佹洿鏂�
-    def isUpdate(self):
-        try:
-            # 瀹氫箟璇锋眰鐨� URL
-            url = self.config.get("isupdateurl")
-            # 鍙戦�� GET 璇锋眰
-            response = requests.get(url)
-
-            # 妫�鏌ュ搷搴旂姸鎬佺爜
-            if response.status_code == 200:
-                data = response.json().get("data")
-                if data.get("isChange") == 1:
-                    return True
-                else:
-                    return False
-        except Exception as e:
-            logging.info(f"璋冪敤鏄惁闇�瑕佹洿鏂版椂鍑洪敊:URL:{self.config.get('isupdateurl')}:{e}")
-            return False
-
-    #淇敼鏄惁鏇存柊鐘舵��
-    def update_status(self):
-        try:
-            # 鏇存柊鐘舵��
-            url = self.config.get("updatestatusurl")
-            # 鍙戦�� GET 璇锋眰
-            response = requests.post(url)
-            # 妫�鏌ュ搷搴旂姸鎬佺爜
-            if response.status_code == 200:
-                return True
-            else:
-                return False
-        except Exception as e:
-            logging.info(f"淇敼鏄惁鏇存柊鐘舵�佹椂鍑洪敊:URL:{self.config.get('updatestatusurl')}:{e}")
-            return False
-
-    def shutdown_all(self) -> None:
-        """娓呯悊鎵�鏈夌嚎绋�"""
-        with self.lock:
-            for camera_id, thread in list(self.threads.items()):
-                if thread.is_alive():
-                    thread.join(timeout=1)
-                del self.threads[camera_id]
-
-    #鑾峰彇浠诲姟
-    def getTaskconf(self,isupdate):
-        try:
-            # 瀹氫箟璇锋眰鐨� URL
-            url = self.config.get("gettaskconfurl")
-            # 鍙戦�� GET 璇锋眰
-            response = requests.get(url)
-            # 妫�鏌ュ搷搴旂姸鎬佺爜
-            if response.status_code == 200:
-                data = response.json()
-                if isupdate:
-                    # 鏇存柊鐘舵��
-                    self.update_status()
-                return data.get("data")
-            else:
-                return []
-        except Exception as e:
-            logging.info(f"璋冪敤鑾峰彇浠诲姟鏃跺嚭閿�:URL:{self.config.get('gettaskconfurl')}:{e}")
-            return []
-
-# 浣跨敤绀轰緥
-if __name__ == "__main__":
-    pool = ThreadPool()
-    is_init = True
-    camera_data = pool.getTaskconf(False)
-    while True:
-        try:
-            pool._isupdate = False  # 鏄惁鏇存柊鏁版嵁
-            # 鏄惁闇�瑕佹洿鏂颁换鍔℃暟鎹�
-            if pool.isUpdate():
-                # 鑾峰彇鎽勫儚鏈轰换鍔�
-                camera_data = pool.getTaskconf(True)
-                pool._isupdate = True  # 鏇存柊鏁版嵁
-
-            if is_init:
-                if camera_data:
-                    for camera in camera_data:
-                        thread = pool.threads.get(camera.get("camera_id"))
-                        if not thread:
-                            logging.info(f"寮�濮嬪垱寤簕camera.get('camera_id')}绾跨▼")
-                            pool.safe_start(pool.worker, camera.get('camera_id'))
-                            logging.info(f"{camera.get('camera_id')}绾跨▼鍒涘缓瀹屾瘯")
-
-            if pool._isupdate:
-                logging.info(f"鏇存柊绾跨▼寮�濮�")
-                pool.shutdown_all()
-                if camera_data:
-                    for camera in camera_data:
-                        thread = pool.threads.get(camera.get("camera_id"))
-                        if not thread:
-                            logging.info(f"寮�濮嬪垱寤簕camera.get('camera_id')}绾跨▼")
-                            pool.safe_start(pool.worker, camera.get('camera_id'))
-                            logging.info(f"{camera.get('camera_id')}绾跨▼鍒涘缓瀹屾瘯")
-
-                logging.info(f"鏇存柊绾跨▼缁撴潫")
-
-            is_init = False
-            time_sel.sleep(1)
-        except Exception as e:
-            logging.info(f"涓荤嚎绋嬫湭鐭ラ敊璇�:{e}")
diff --git a/qwen_thread.py b/qwen_thread.py
index 0a16fd8..3673dcd 100644
--- a/qwen_thread.py
+++ b/qwen_thread.py
@@ -17,11 +17,12 @@
 
 
 class qwen_thread:
-    def __init__(self, config):
+    def __init__(self, config,logger):
         self.config = config
         self.max_workers = int(config.get("threadnum"))
         self.executor = ThreadPoolExecutor(max_workers=int(config.get("threadnum")))
         self.semaphore = threading.Semaphore(int(config.get("threadnum")))
+        self.logger = logger
 
         # 鍒濆鍖朚ilvus闆嗗悎
         connections.connect("default", host=config.get("milvusurl"), port=config.get("milvusport"))
@@ -45,22 +46,6 @@
         # 鍏变韩鐨勫鐞嗗櫒 (绾跨▼瀹夊叏)
         self.processor = AutoProcessor.from_pretrained(config.get("qwenaddr"), use_fast=True)
 
-        # 鍒涘缓瀹炰緥涓撳睘logger
-        self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
-        self.logger.setLevel(logging.INFO)
-        # 閬垮厤閲嶅娣诲姞handler
-        if not self.logger.handlers:
-            handler = RotatingFileHandler(
-                filename=os.path.join("logs", 'thread_log.log'),
-                maxBytes=10 * 1024 * 1024,
-                backupCount=3,
-                encoding='utf-8'
-            )
-            formatter = logging.Formatter(
-                '%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s'
-            )
-            handler.setFormatter(formatter)
-            self.logger.addHandler(handler)
 
     def submit(self,res_a):
         # 灏濊瘯鑾峰彇淇″彿閲忥紙闈為樆濉烇級
@@ -101,13 +86,13 @@
                 # 璋冪敤瑙勫垯鍖归厤鏂规硶,鍒ゆ柇鏄惁棰勮
                 is_waning = self.image_rule_chat(desc, res['waning_value'], ragurl,rag_mode,max_tokens)
                 # 濡傛灉棰勮,鍒欑敓鎴愰殣鎮f弿杩板拰澶勭悊寤鸿
-                #if is_waning == 1:
-                # 鑾峰彇瑙勭珷鍒跺害鏁版嵁
-                filedata = self.get_filedata(res['waning_value'],res['suggestion'], ragurl)
-                # 鐢熸垚闅愭偅鎻忚堪
-                risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
-                # 鐢熸垚澶勭悊寤鸿
-                suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
+                if is_waning == 1:
+                    # 鑾峰彇瑙勭珷鍒跺害鏁版嵁
+                    filedata = self.get_filedata(res['waning_value'],res['suggestion'], ragurl)
+                    # 鐢熸垚闅愭偅鎻忚堪
+                    risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
+                    # 鐢熸垚澶勭悊寤鸿
+                    suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
             else:
                 is_desc = 3
 
@@ -119,7 +104,7 @@
                 "rule_id": res["rule_id"],
                 "video_point_id": res['video_point_id'],  # video_point_id
                 "video_point_name": res['video_point_name'],
-                "is_waning": 1,
+                "is_waning": is_waning,
                 "is_desc": is_desc,
                 "zh_desc_class": desc,  # text_vector
                 "bounding_box": res['bounding_box'],  # bounding_box
@@ -154,7 +139,7 @@
             # 璋冪敤rag
             asyncio.run(self.insert_json_data(ragurl, data))
             rag_time = datetime.now() - current_time
-            self.logger.info(f"{image_id}杩愯缁撴潫鎬讳綋鐢ㄦ椂:{datetime.now() - ks_time},鍥剧墖鎻忚堪鐢ㄦ椂{desc_time}锛孯AG鐢ㄦ椂{rag_time}")
+            self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿image_id}杩愯缁撴潫鎬讳綋鐢ㄦ椂:{datetime.now() - ks_time},鍥剧墖鎻忚堪鐢ㄦ椂{desc_time}锛孯AG鐢ㄦ椂{rag_time}")
         except Exception as e:
             self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊::{e}")
             return 0
diff --git a/qwen_thread_batch.py b/qwen_thread_batch.py
deleted file mode 100644
index 1027151..0000000
--- a/qwen_thread_batch.py
+++ /dev/null
@@ -1,432 +0,0 @@
-import time
-from concurrent.futures import ThreadPoolExecutor
-import threading
-
-import torch
-from PIL import Image
-from pymilvus import connections, Collection
-from datetime import datetime
-import os
-import requests
-import asyncio
-import logging
-import re
-from logging.handlers import RotatingFileHandler
-
-from transformers import AutoModelForVision2Seq, AutoProcessor
-
-
-class qwen_thread_batch:
-    def __init__(self, max_workers,config,model_path):
-        self.executor = ThreadPoolExecutor(max_workers=max_workers)
-        self.semaphore = threading.Semaphore(max_workers)
-        self.max_workers = max_workers
-        # 鍒濆鍖朚ilvus闆嗗悎
-        connections.connect("default", host=config.get("milvusurl"), port=config.get("milvusport"))
-        # 鍔犺浇闆嗗悎
-        self.collection = Collection(name="smartobject")
-        self.collection.load()
-
-        self.config = config
-        self.model_pool = []
-        self.lock_pool = [threading.Lock() for _ in range(max_workers)]
-        for i in range(max_workers):
-            model = AutoModelForVision2Seq.from_pretrained(
-                model_path,
-                device_map=f"cuda:{config.get('cuda')}",
-                trust_remote_code=True,
-                use_safetensors=True,
-                torch_dtype=torch.float16
-
-            ).eval()
-            self.model_pool.append(model)
-
-        # 鍏变韩鐨勫鐞嗗櫒 (绾跨▼瀹夊叏)
-        self.processor = AutoProcessor.from_pretrained(model_path,use_fast=True)
-
-
-        # 鍒涘缓瀹炰緥涓撳睘logger
-        self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
-        self.logger.setLevel(logging.INFO)
-        # 閬垮厤閲嶅娣诲姞handler
-        if not self.logger.handlers:
-            handler = RotatingFileHandler(
-                filename=os.path.join("logs", 'thread_log.log'),
-                maxBytes=10 * 1024 * 1024,
-                backupCount=3,
-                encoding='utf-8'
-            )
-            formatter = logging.Formatter(
-                '%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s'
-            )
-            handler.setFormatter(formatter)
-            self.logger.addHandler(handler)
-
-    def submit(self,res_a):
-        # 灏濊瘯鑾峰彇淇″彿閲忥紙闈為樆濉烇級
-        acquired = self.semaphore.acquire(blocking=False)
-
-        if not acquired:
-            #self.logger.info(f"绾跨▼姹犲凡婊★紝绛夊緟绌洪棽绾跨▼... (褰撳墠娲昏穬: {self.max_workers - self.semaphore._value}/{self.max_workers})")
-            # 闃诲绛夊緟鐩村埌鏈夊彲鐢ㄧ嚎绋�
-            self.semaphore.acquire(blocking=True)
-
-        future = self.executor.submit(self._wrap_task, res_a)
-        future.add_done_callback(self._release_semaphore)
-        return future
-
-    def _wrap_task(self, res_a):
-        try:
-            self.tark_do(res_a, self.config.get("ragurl"), self.config.get("ragmode"), self.config.get("max_tokens"))
-        except Exception as e:
-            self.logger.info(f"澶勭悊鍑洪敊: {e}")
-            raise
-
-    def tark_do(self,res_a,ragurl,rag_mode,max_tokens):
-        try:
-            # 1. 浠庨泦鍚圓鑾峰彇鍚戦噺鍜屽厓鏁版嵁
-            is_waning = 0
-            is_desc = 2
-
-            # 鐢熸垚鍥剧墖鎻忚堪
-            desc_list = self.image_desc(res_a)
-            risk_description = ""
-            suggestion = ""
-            if desc_list:
-                for desc, res in zip(desc_list, res_a):
-                    # 鍥剧墖鎻忚堪鐢熸垚鎴愬姛
-                    if desc:
-                        is_desc = 2
-                        # 璋冪敤瑙勫垯鍖归厤鏂规硶,鍒ゆ柇鏄惁棰勮
-                        is_waning = self.image_rule_chat(desc, res['waning_value'], ragurl,rag_mode,max_tokens)
-                        # 濡傛灉棰勮,鍒欑敓鎴愰殣鎮f弿杩板拰澶勭悊寤鸿
-                        if is_waning == 1:
-                            # 鑾峰彇瑙勭珷鍒跺害鏁版嵁
-                            filedata = self.get_filedata(res['waning_value'], ragurl)
-                            # 鐢熸垚闅愭偅鎻忚堪
-                            risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl, rag_mode)
-                            # 鐢熸垚澶勭悊寤鸿
-                            suggestion = self.image_rule_chat_suggestion(res['waning_value'], ragurl, rag_mode)
-                    else:
-                        is_desc = 3
-
-                    # 鏁版嵁缁�
-                    data = {
-                        "id": res['id'],
-                        "event_level_id": res['event_level_id'],  # event_level_id
-                        "event_level_name": res['event_level_name'],  # event_level_id
-                        "rule_id": res["rule_id"],
-                        "video_point_id": res['video_point_id'],  # video_point_id
-                        "video_point_name": res['video_point_name'],
-                        "is_waning": is_waning,
-                        "is_desc": is_desc,
-                        "zh_desc_class": desc,  # text_vector
-                        "bounding_box": res['bounding_box'],  # bounding_box
-                        "task_id": res['task_id'],  # task_id
-                        "task_name": res['task_name'],  # task_id
-                        "detect_id": res['detect_id'],  # detect_id
-                        "detect_time": res['detect_time'],  # detect_time
-                        "detect_num": res['detect_num'],
-                        "waning_value": res['waning_value'],
-                        "image_path": res['image_path'],  # image_path
-                        "image_desc_path": res['image_desc_path'],  # image_desc_path
-                        "video_path": res['video_path'],
-                        "text_vector": res['text_vector'],
-                        "risk_description": risk_description,
-                        "suggestion": suggestion,
-                        "knowledge_id": res['knowledge_id']
-                    }
-
-                    # 淇濆瓨鍒癿ilvus
-                    image_id = self.collection.upsert(data).primary_keys
-                    logging.info(image_id)
-                    data = {
-                        "id": str(image_id[0]),
-                        "video_point_id": res['video_point_id'],
-                        "video_path": res["video_point_name"],
-                        "zh_desc_class": desc,
-                        "detect_time": res['detect_time'],
-                        "image_path": f"{res['image_path']}",
-                        "task_name": res["task_name"],
-                        "event_level_name": res["event_level_name"],
-                        "rtsp_address": f"{res['video_path']}"
-                    }
-                    # 璋冪敤rag
-                    asyncio.run(self.insert_json_data(ragurl, data))
-        except Exception as e:
-            self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊::{e}")
-            return 0
-
-    def image_desc(self, res_data):
-        try:
-            model, lock = self._acquire_model()
-            image_data = []
-            # 1. 骞惰鍔犺浇鍥惧儚
-            def _load_image(path):
-                return Image.open(path).convert("RGB").resize((448, 448), Image.Resampling.LANCZOS)
-
-            with ThreadPoolExecutor(max_workers=4) as executor:
-                image_data = list(executor.map(
-                    _load_image,
-                    [res['image_desc_path'] for res in res_data]
-                ))
-
-            messages = [
-                {
-                    "role": "user",
-                    "content": [
-                        {
-                            "type": "image",
-                        },
-                        {"type": "text", "text": "璇疯缁嗘弿杩板浘鐗囦腑鐨勭洰鏍囦俊鎭強鐗瑰緛銆傝繑鍥炴牸寮忎负鏁存鏂囧瓧鎻忚堪"},
-                    ],
-                }
-            ]
-            # Preparation for inference
-            text = self.processor.apply_chat_template(
-                messages, add_generation_prompt=True
-            )
-            inputs = self.processor(
-                text=[text] * len(image_data),
-                images=[image_data],
-                padding=True,
-                return_tensors="pt",
-            )
-            inputs = inputs.to(model.device)
-            with torch.inference_mode():
-                outputs = model.generate(**inputs,max_new_tokens=100)
-            generated_ids = outputs[:, len(inputs.input_ids[0]):]
-            image_text = self.processor.batch_decode(
-                generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
-            )
-            image_des = []
-            for text in image_text:
-                image_des.append(text)
-            return image_des
-        except Exception as e:
-            self.logger.info(f"绾跨▼锛氭墽琛屽浘鐗囨弿杩版椂鍑洪敊:{e}")
-        finally:
-            # 4. 閲婃斁妯″瀷
-            self._release_model(model)
-            torch.cuda.empty_cache()
-
-    def get_rule(self,ragurl):
-        try:
-            rule_text = None
-            search_data = {
-                "collection_name": "smart_rule",
-                "query_text": "",
-                "search_mode": "hybrid",
-                "limit": 100,
-                "weight_dense": 0.7,
-                "weight_sparse": 0.3,
-                "filter_expr": "",
-                "output_fields": ["text"]
-            }
-            response = requests.post(ragurl + "/search", json=search_data)
-            results = response.json().get('results')
-            rule_text = ""
-            ruleid = 1
-            for rule in results:
-                if rule['score'] >= 0:
-                    rule_text = rule_text + str(ruleid) + ". " + rule['entity'].get('text') + ";\n"
-                    ruleid = ruleid + 1
-            # self.logger.info(len(rule_text))
-            else:
-                self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{response}")
-            return rule_text
-        except Exception as e:
-            self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{e}")
-            return None
-
-    def image_rule_chat(self, image_des,rule_text, ragurl, rag_mode,max_tokens):
-        try:
-
-            content = (
-                f"鍥剧墖鎻忚堪鍐呭涓猴細\n{image_des}\n瑙勫垯鍐呭锛歕n{rule_text}銆俓n璇烽獙璇佸浘鐗囨弿杩颁腑鏄惁鏈夌鍚堣鍒欑殑鍐呭锛屼笉杩涜鎺ㄧ悊鍜宼hink銆傝繑鍥炵粨鏋滄牸寮忎负[xxx绗﹀悎鐨勮鍒檌d]锛屽鏋滄病鏈夎繑鍥瀃]")
-            #self.logger.info(len(content))
-            search_data = {
-                "prompt": "",
-                "messages": [
-                    {
-                        "role": "user",
-                        "content": content
-                    }
-                ],
-                "llm_name": rag_mode,
-                "stream": False,
-                "gen_conf": {
-                    "temperature": 0.7,
-                    "max_tokens": max_tokens
-                }
-            }
-            response = requests.post(ragurl + "/chat", json=search_data)
-            results = response.json().get('data')
-            # self.logger.info(results)
-            ret = re.sub(r'<think>.*?</think>', '', results, flags=re.DOTALL)
-            ret = ret.replace(" ", "").replace("\t", "").replace("\n", "")
-            is_waning = 0
-            if len(ret) > 2:
-                is_waning = 1
-            return is_waning
-        except Exception as e:
-            self.logger.info(f"绾跨▼锛氭墽琛岃鍒欏尮閰嶆椂鍑洪敊:{image_des, rule_text, ragurl, rag_mode,e}")
-            return None
-
-    # 闅愭偅鎻忚堪
-    def image_rule_chat_with_detail(self,filedata, rule_text, ollama_url, ollama_mode="qwen2.5vl:3b"):
-
-        # API璋冪敤
-        response = requests.post(
-            # ollama鍦板潃
-            url=f"{ollama_url}/chat",
-            json={
-                "prompt":"",
-                # 璇锋眰鍐呭
-                "messages": [
-                    {
-                        "role": "user",
-                        "content": f"璇锋牴鎹绔犲埗搴{filedata}]\n鏌ユ壘[{rule_text}]鐨勫畨鍏ㄩ殣鎮f弿杩帮紝涓嶈繘琛屾帹鐞嗗拰think銆傝繑鍥炰俊鎭皬浜�800瀛�"
-                    }
-                ],
-                # 鎸囧畾妯″瀷
-                "llm_name": "qwen3:8b",
-                "stream": False,    # 鍏抽棴娴佸紡杈撳嚭
-                "gen_conf": {
-                    "temperature": 0.7,  # 鎺у埗鐢熸垚闅忔満鎬�
-                    "max_tokens": 800   # 鏈�澶ц緭鍑洪暱搴�
-                }
-            }
-        )
-        # 浠巎son鎻愬彇data瀛楁鍐呭
-        ret = response.json()["data"]
-        #result = response.json()
-        #ret = result.get("data") or result.get("message", {}).get("content", "")
-        # 绉婚櫎<think>鏍囩鍜屽唴瀹�
-        ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
-        # 瀛楃涓叉竻鐞�,绉婚櫎绌烘牸,鍒惰〃绗�,鎹㈣绗�,鏄熷彿
-        ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
-        print(ret)
-        return ret
-    #澶勭悊寤鸿
-    def image_rule_chat_suggestion(self, rule_text, ollama_url, ollama_mode="qwen2.5vl:3b"):
-        self.logger.info("----------------------------------------------------------------")
-        # 璇锋眰鍐呭
-        content = (
-            f"璇锋牴鎹繚瑙勫唴瀹筟{rule_text}]\n杩涜杩斿洖澶勭悊杩濊寤鸿锛屼笉杩涜鎺ㄧ悊鍜宼hink銆傝繑鍥炵簿鍑嗕俊鎭�")
-        response = requests.post(
-            # ollama鍦板潃
-            url=f"{ollama_url}/chat",
-            json={
-                # 鎸囧畾妯″瀷
-                "llm_name": "qwen3:8b",
-                "messages": [
-                    {"role": "user", "content": content}
-                ],
-                "stream": False  # 鍏抽棴娴佸紡杈撳嚭
-            }
-        )
-        # 浠巎son鎻愬彇data瀛楁鍐呭
-        ret = response.json()["data"]
-        #result = response.json()
-        #ret = result.get("data") or result.get("message", {}).get("content", "")
-        # 绉婚櫎<think>鏍囩鍜屽唴瀹�
-        ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
-        # 瀛楃涓叉竻鐞�,绉婚櫎绌烘牸,鍒惰〃绗�,鎹㈣绗�,鏄熷彿
-        ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
-        print(ret)
-        return ret
-
-    # RAG鏈嶅姟鍙戦�佽姹�,鑾峰彇鐭ヨ瘑搴撳唴瀹�
-    def get_filedata(self, searchtext, ragurl):
-        search_data = {
-            # 鐭ヨ瘑搴撻泦鍚�
-            "collection_name": "smart_knowledge",
-            # 鏌ヨ鏂囨湰
-            "query_text": searchtext,
-            # 鎼滅储妯″紡
-            "search_mode": "hybrid",
-            # 鏈�澶氳繑鍥炵粨鏋�
-            "limit": 100,
-            # 璋冨瘑鍚戦噺鎼滅储鏉冮噸
-            "weight_dense": 0.7,
-            # 绋�鐤忓悜閲忔悳绱㈡潈閲�
-            "weight_sparse": 0.3,
-            # 绌哄瓧绗︿覆
-            "filter_expr": "",
-            # 鍙繑鍥� text 瀛楁
-            "output_fields": ["text"]
-        }
-        # 鍚� ragurl + "/search" 绔偣鍙戦�丳OST璇锋眰
-        response = requests.post(ragurl + "/search", json=search_data)
-        # 浠庡搷搴斾腑鑾峰彇'results'瀛楁
-        results = response.json().get('results')
-        # 鍒濆鍖� text
-        text = ""
-        # 閬嶅巻鎵�鏈夌粨鏋滆鍒�(rule)锛屽皢姣忔潯瑙勫垯鐨�'entity'涓殑'text'瀛楁鍙栧嚭.
-        for rule in results:
-            text = text + rule['entity'].get('text') + ";\n"
-
-        return text
-
-    async def insert_json_data(self, ragurl, data):
-        try:
-            data = {'collection_name': "smartrag", "data": data, "description": ""}
-            requests.post(ragurl + "/insert_json_data", json=data, timeout=(0.3, 0.3))
-            #self.logger.info(f"璋冪敤褰曞儚鏈嶅姟:{ragurl, data}")
-        except Exception as e:
-            #self.logger.info(f"{self._thread_name}绾跨▼锛氳皟鐢ㄥ綍鍍忔椂鍑洪敊:鍦板潃锛歿ragurl}锛歿e}")
-            return
-
-    def _release_semaphore(self, future):
-        self.semaphore.release()
-        #self.logger.info(f"閲婃斁绾跨▼ (鍓╀綑绌洪棽: {self.semaphore._value}/{self.max_workers})")
-
-    def shutdown(self):
-        """瀹夊叏鍏抽棴"""
-        self.executor.shutdown(wait=False)
-        for model in self.model_pool:
-            del model
-        torch.cuda.empty_cache()
-
-    def _acquire_model(self):
-        """浠庢睜涓幏鍙栦竴涓┖闂叉ā鍨� (绠�鍗曡疆璇�)"""
-        while True:
-            for i, (model, lock) in enumerate(zip(self.model_pool, self.lock_pool)):
-                if lock.acquire(blocking=False):
-                    return model, lock
-            time.sleep(0.1)  # 閬垮厤CPU绌鸿浆
-
-    def _release_model(self, model):
-        """閲婃斁妯″瀷鍥炴睜"""
-        for i, m in enumerate(self.model_pool):
-            if m == model:
-                self.lock_pool[i].release()
-                break
-
-
-    def remove_duplicate_lines(self,text):
-        seen = set()
-        result = []
-        for line in text.split('銆�'):  # 鎸夊彞鍙峰垎鍓�
-            if line.strip() and line not in seen:
-                seen.add(line)
-                result.append(line)
-        return '銆�'.join(result)
-    def remove_duplicate_lines_d(self,text):
-        seen = set()
-        result = []
-        for line in text.split(','):  # 鎸夊彞鍙峰垎鍓�
-            if line.strip() and line not in seen:
-                seen.add(line)
-                result.append(line)
-        return '銆�'.join(result)
-    def remove_duplicate_lines_n(self,text):
-        seen = set()
-        result = []
-        for line in text.split('\n'):  # 鎸夊彞鍙峰垎鍓�
-            if line.strip() and line not in seen:
-                seen.add(line)
-                result.append(line)
-        return '銆�'.join(result)
-

--
Gitblit v1.8.0