From a7e7050585325bdb873c6d312ea89de94215e11e Mon Sep 17 00:00:00 2001
From: shidong <shidong@jhsoft.cc>
Date: 星期六, 12 七月 2025 15:34:27 +0800
Subject: [PATCH] #2025/7/12 #milvus的upsert方法在多线程调用时多产生重复记录,修正为先删除在新增
---
qwen_detect.py | 17 ++++++++++-------
1 files changed, 10 insertions(+), 7 deletions(-)
diff --git a/qwen_detect.py b/qwen_detect.py
index 2cd6ab8..871f5da 100644
--- a/qwen_detect.py
+++ b/qwen_detect.py
@@ -9,6 +9,7 @@
from pymilvus import connections, Collection
from logging.handlers import RotatingFileHandler
import get_mem
+from multiprocessing import Process
class ThreadPool:
def __init__(self):
@@ -96,7 +97,7 @@
output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
"task_name", "event_level_id", "event_level_name",
"video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id",
- "detect_id","knowledge_id","suggestion",
+ "detect_id","knowledge_id","suggestion","risk_description",
"detect_time", "image_path", "image_desc_path", "video_path"],
consistency_level="Strong",
order_by_field="id", # 鎸塱d瀛楁鎺掑簭
@@ -107,8 +108,9 @@
if len(res_a) > 0:
sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
res = sorted_results[0]
+ self.collection.delete(f"id == {res['id']}")
+ # 鏁版嵁缁�
data = {
- "id": res['id'],
"event_level_id": res['event_level_id'], # event_level_id
"event_level_name": res['event_level_name'], # event_level_id
"rule_id": res["rule_id"],
@@ -116,7 +118,7 @@
"video_point_name": res['video_point_name'],
"is_waning": 0,
"is_desc": 1,
- "zh_desc_class": res['zh_desc_class'], # text_vector
+ "zh_desc_class": "",
"bounding_box": res['bounding_box'], # bounding_box
"task_id": res['task_id'], # task_id
"task_name": res['task_name'], # task_id
@@ -128,17 +130,18 @@
"image_desc_path": res['image_desc_path'], # image_desc_path
"video_path": res['video_path'],
"text_vector": res['text_vector'],
- "knowledge_id": res['knowledge_id'],
+ "risk_description": res['risk_description'],
"suggestion": res['suggestion'],
+ "knowledge_id": res['knowledge_id']
}
- # logging.info(f"璇诲彇鍥惧儚鎴愬姛: {res['id']}")
# 淇濆瓨鍒癿ilvus
- image_id = self.collection.upsert(data).primary_keys
+ image_id = self.collection.insert(data).primary_keys
res['id'] = image_id[0]
self.pool.submit(res)
+ time_sel.sleep(0.01)
except Exception as e:
logging.info(f"{camera_id}绾跨▼閿欒:{e}")
- time_sel.sleep(0.01)
+
#璋冪敤鏄惁闇�瑕佹洿鏂�
def isUpdate(self):
--
Gitblit v1.8.0