From a7e7050585325bdb873c6d312ea89de94215e11e Mon Sep 17 00:00:00 2001
From: shidong <shidong@jhsoft.cc>
Date: 星期六, 12 七月 2025 15:34:27 +0800
Subject: [PATCH] #2025/7/12 #milvus的upsert方法在多线程调用时多产生重复记录,修正为先删除在新增

---
 qwen_detect.py |   74 ++++++++++++++++++------------------
 1 files changed, 37 insertions(+), 37 deletions(-)

diff --git a/qwen_detect.py b/qwen_detect.py
index b27a87b..871f5da 100644
--- a/qwen_detect.py
+++ b/qwen_detect.py
@@ -9,6 +9,7 @@
 from pymilvus import connections, Collection
 from logging.handlers import RotatingFileHandler
 import get_mem
+from multiprocessing import Process
 
 class ThreadPool:
     def __init__(self):
@@ -30,7 +31,7 @@
                     # 灏嗛敭鍊煎娣诲姞鍒板瓧鍏镐腑
                     self.config[key] = value
         # 鍒涘缓瀹炰緥涓撳睘logger
-        os.makedirs("logs", exist_ok=True)
+        os.makedirs(self.config.get("logaddr"), exist_ok=True)
         logging.basicConfig(
             level=logging.INFO,
             format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s',
@@ -38,7 +39,7 @@
             handlers=[
                 # 鎸夊ぇ灏忚疆杞殑鏃ュ織鏂囦欢锛堟渶澶�10MB锛屼繚鐣�3涓浠斤級
                 RotatingFileHandler(
-                    filename=os.path.join("logs", 'qwen_log.log'),
+                    filename=os.path.join(self.config.get("logaddr"), 'qwen_log.log'),
                     maxBytes=10 * 1024 * 1024,  # 10MB
                     backupCount=3,
                     encoding='utf-8'
@@ -96,7 +97,7 @@
                     output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
                                    "task_name", "event_level_id", "event_level_name",
                                    "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id",
-                                   "detect_id","knowledge_id","suggestion",
+                                   "detect_id","knowledge_id","suggestion","risk_description",
                                    "detect_time", "image_path", "image_desc_path", "video_path"],
                     consistency_level="Strong",
                     order_by_field="id",  # 鎸塱d瀛楁鎺掑簭
@@ -106,42 +107,41 @@
                 # image_id = get_mem.smem_read_frame_qianwen(camera_id)
                 if len(res_a) > 0:
                     sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
-                    # 鏌ヨ鍓峃涓渶澶х殑ID
-                    num = int(self.config.get("threadnum")) - 1
-                    res_a = sorted_results[:num]
-                    for res in res_a:
-                        data = {
-                            "id": res['id'],
-                            "event_level_id": res['event_level_id'],  # event_level_id
-                            "event_level_name": res['event_level_name'],  # event_level_id
-                            "rule_id": res["rule_id"],
-                            "video_point_id": res['video_point_id'],  # video_point_id
-                            "video_point_name": res['video_point_name'],
-                            "is_waning": 0,
-                            "is_desc": 1,
-                            "zh_desc_class": res['zh_desc_class'],  # text_vector
-                            "bounding_box": res['bounding_box'],  # bounding_box
-                            "task_id": res['task_id'],  # task_id
-                            "task_name": res['task_name'],  # task_id
-                            "detect_id": res['detect_id'],  # detect_id
-                            "detect_time": res['detect_time'],  # detect_time
-                            "detect_num": res['detect_num'],
-                            "waning_value": res['waning_value'],
-                            "image_path": res['image_path'],  # image_path
-                            "image_desc_path": res['image_desc_path'],  # image_desc_path
-                            "video_path": res['video_path'],
-                            "text_vector": res['text_vector'],
-                            "knowledge_id": res['knowledge_id'],
-                            "suggestion": res['suggestion'],
-                        }
-                        # logging.info(f"璇诲彇鍥惧儚鎴愬姛: {res['id']}")
-                        # 淇濆瓨鍒癿ilvus
-                        image_id = self.collection.upsert(data).primary_keys
-                        res['id'] = image_id[0]
-                        self.pool.submit(res)
+                    res = sorted_results[0]
+                    self.collection.delete(f"id == {res['id']}")
+                    # 鏁版嵁缁�
+                    data = {
+                        "event_level_id": res['event_level_id'],  # event_level_id
+                        "event_level_name": res['event_level_name'],  # event_level_id
+                        "rule_id": res["rule_id"],
+                        "video_point_id": res['video_point_id'],  # video_point_id
+                        "video_point_name": res['video_point_name'],
+                        "is_waning": 0,
+                        "is_desc": 1,
+                        "zh_desc_class": "",
+                        "bounding_box": res['bounding_box'],  # bounding_box
+                        "task_id": res['task_id'],  # task_id
+                        "task_name": res['task_name'],  # task_id
+                        "detect_id": res['detect_id'],  # detect_id
+                        "detect_time": res['detect_time'],  # detect_time
+                        "detect_num": res['detect_num'],
+                        "waning_value": res['waning_value'],
+                        "image_path": res['image_path'],  # image_path
+                        "image_desc_path": res['image_desc_path'],  # image_desc_path
+                        "video_path": res['video_path'],
+                        "text_vector": res['text_vector'],
+                        "risk_description": res['risk_description'],
+                        "suggestion": res['suggestion'],
+                        "knowledge_id": res['knowledge_id']
+                    }
+                    # 淇濆瓨鍒癿ilvus
+                    image_id = self.collection.insert(data).primary_keys
+                    res['id'] = image_id[0]
+                    self.pool.submit(res)
+                time_sel.sleep(0.01)
             except Exception as e:
                 logging.info(f"{camera_id}绾跨▼閿欒:{e}")
-            time_sel.sleep(0.01)
+
 
     #璋冪敤鏄惁闇�瑕佹洿鏂�
     def isUpdate(self):

--
Gitblit v1.8.0