From a7e7050585325bdb873c6d312ea89de94215e11e Mon Sep 17 00:00:00 2001
From: shidong <shidong@jhsoft.cc>
Date: 星期六, 12 七月 2025 15:34:27 +0800
Subject: [PATCH] #2025/7/12 #milvus的upsert方法在多线程调用时多产生重复记录,修正为先删除在新增

---
 qwen_detect.py |   17 ++++++++++-------
 qwen_thread.py |    9 +++++----
 2 files changed, 15 insertions(+), 11 deletions(-)

diff --git a/qwen_detect.py b/qwen_detect.py
index 2cd6ab8..871f5da 100644
--- a/qwen_detect.py
+++ b/qwen_detect.py
@@ -9,6 +9,7 @@
 from pymilvus import connections, Collection
 from logging.handlers import RotatingFileHandler
 import get_mem
+from multiprocessing import Process
 
 class ThreadPool:
     def __init__(self):
@@ -96,7 +97,7 @@
                     output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
                                    "task_name", "event_level_id", "event_level_name",
                                    "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id",
-                                   "detect_id","knowledge_id","suggestion",
+                                   "detect_id","knowledge_id","suggestion","risk_description",
                                    "detect_time", "image_path", "image_desc_path", "video_path"],
                     consistency_level="Strong",
                     order_by_field="id",  # 鎸塱d瀛楁鎺掑簭
@@ -107,8 +108,9 @@
                 if len(res_a) > 0:
                     sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
                     res = sorted_results[0]
+                    self.collection.delete(f"id == {res['id']}")
+                    # 鏁版嵁缁�
                     data = {
-                        "id": res['id'],
                         "event_level_id": res['event_level_id'],  # event_level_id
                         "event_level_name": res['event_level_name'],  # event_level_id
                         "rule_id": res["rule_id"],
@@ -116,7 +118,7 @@
                         "video_point_name": res['video_point_name'],
                         "is_waning": 0,
                         "is_desc": 1,
-                        "zh_desc_class": res['zh_desc_class'],  # text_vector
+                        "zh_desc_class": "",
                         "bounding_box": res['bounding_box'],  # bounding_box
                         "task_id": res['task_id'],  # task_id
                         "task_name": res['task_name'],  # task_id
@@ -128,17 +130,18 @@
                         "image_desc_path": res['image_desc_path'],  # image_desc_path
                         "video_path": res['video_path'],
                         "text_vector": res['text_vector'],
-                        "knowledge_id": res['knowledge_id'],
+                        "risk_description": res['risk_description'],
                         "suggestion": res['suggestion'],
+                        "knowledge_id": res['knowledge_id']
                     }
-                    # logging.info(f"璇诲彇鍥惧儚鎴愬姛: {res['id']}")
                     # 淇濆瓨鍒癿ilvus
-                    image_id = self.collection.upsert(data).primary_keys
+                    image_id = self.collection.insert(data).primary_keys
                     res['id'] = image_id[0]
                     self.pool.submit(res)
+                time_sel.sleep(0.01)
             except Exception as e:
                 logging.info(f"{camera_id}绾跨▼閿欒:{e}")
-            time_sel.sleep(0.01)
+
 
     #璋冪敤鏄惁闇�瑕佹洿鏂�
     def isUpdate(self):
diff --git a/qwen_thread.py b/qwen_thread.py
index 3673dcd..9c348b4 100644
--- a/qwen_thread.py
+++ b/qwen_thread.py
@@ -93,12 +93,13 @@
                     risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
                     # 鐢熸垚澶勭悊寤鸿
                     suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
+                    self.logger.info(
+                        f"{res['video_point_id']}鎵ц瀹屾瘯锛歿res['id']}:鏄惁棰勮{is_waning},瀹夊叏闅愭偅锛歿risk_description}\n澶勭悊寤鸿锛歿suggestion}")
             else:
                 is_desc = 3
 
             # 鏁版嵁缁�
             data = {
-                "id": res['id'],
                 "event_level_id": res['event_level_id'],  # event_level_id
                 "event_level_name": res['event_level_name'],  # event_level_id
                 "rule_id": res["rule_id"],
@@ -122,9 +123,9 @@
                 "suggestion": suggestion,
                 "knowledge_id": res['knowledge_id']
             }
-
+            self.collection.delete(f"id == {res['id']}")
             # 淇濆瓨鍒癿ilvus
-            image_id = self.collection.upsert(data).primary_keys
+            image_id = self.collection.insert(data).primary_keys
             data = {
                 "id": str(image_id[0]),
                 "video_point_id": res['video_point_id'],
@@ -172,7 +173,7 @@
             )
             inputs = inputs.to(model.device)
             with torch.inference_mode():
-                outputs = model.generate(**inputs,max_new_tokens=100)
+                outputs = model.generate(**inputs,max_new_tokens=200)
             generated_ids = outputs[:, len(inputs.input_ids[0]):]
             image_text = self.processor.batch_decode(
                 generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True

--
Gitblit v1.8.0