From a7e7050585325bdb873c6d312ea89de94215e11e Mon Sep 17 00:00:00 2001
From: shidong <shidong@jhsoft.cc>
Date: 星期六, 12 七月 2025 15:34:27 +0800
Subject: [PATCH] #2025/7/12 #milvus的upsert方法在多线程调用时多产生重复记录,修正为先删除在新增

---
 qwen_thread.py |   46 ++++++++++++++++------------------------------
 1 files changed, 16 insertions(+), 30 deletions(-)

diff --git a/qwen_thread.py b/qwen_thread.py
index 0a16fd8..9c348b4 100644
--- a/qwen_thread.py
+++ b/qwen_thread.py
@@ -17,11 +17,12 @@
 
 
 class qwen_thread:
-    def __init__(self, config):
+    def __init__(self, config,logger):
         self.config = config
         self.max_workers = int(config.get("threadnum"))
         self.executor = ThreadPoolExecutor(max_workers=int(config.get("threadnum")))
         self.semaphore = threading.Semaphore(int(config.get("threadnum")))
+        self.logger = logger
 
         # 鍒濆鍖朚ilvus闆嗗悎
         connections.connect("default", host=config.get("milvusurl"), port=config.get("milvusport"))
@@ -45,22 +46,6 @@
         # 鍏变韩鐨勫鐞嗗櫒 (绾跨▼瀹夊叏)
         self.processor = AutoProcessor.from_pretrained(config.get("qwenaddr"), use_fast=True)
 
-        # 鍒涘缓瀹炰緥涓撳睘logger
-        self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
-        self.logger.setLevel(logging.INFO)
-        # 閬垮厤閲嶅娣诲姞handler
-        if not self.logger.handlers:
-            handler = RotatingFileHandler(
-                filename=os.path.join("logs", 'thread_log.log'),
-                maxBytes=10 * 1024 * 1024,
-                backupCount=3,
-                encoding='utf-8'
-            )
-            formatter = logging.Formatter(
-                '%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s'
-            )
-            handler.setFormatter(formatter)
-            self.logger.addHandler(handler)
 
     def submit(self,res_a):
         # 灏濊瘯鑾峰彇淇″彿閲忥紙闈為樆濉烇級
@@ -101,25 +86,26 @@
                 # 璋冪敤瑙勫垯鍖归厤鏂规硶,鍒ゆ柇鏄惁棰勮
                 is_waning = self.image_rule_chat(desc, res['waning_value'], ragurl,rag_mode,max_tokens)
                 # 濡傛灉棰勮,鍒欑敓鎴愰殣鎮f弿杩板拰澶勭悊寤鸿
-                #if is_waning == 1:
-                # 鑾峰彇瑙勭珷鍒跺害鏁版嵁
-                filedata = self.get_filedata(res['waning_value'],res['suggestion'], ragurl)
-                # 鐢熸垚闅愭偅鎻忚堪
-                risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
-                # 鐢熸垚澶勭悊寤鸿
-                suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
+                if is_waning == 1:
+                    # 鑾峰彇瑙勭珷鍒跺害鏁版嵁
+                    filedata = self.get_filedata(res['waning_value'],res['suggestion'], ragurl)
+                    # 鐢熸垚闅愭偅鎻忚堪
+                    risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
+                    # 鐢熸垚澶勭悊寤鸿
+                    suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
+                    self.logger.info(
+                        f"{res['video_point_id']}鎵ц瀹屾瘯锛歿res['id']}:鏄惁棰勮{is_waning},瀹夊叏闅愭偅锛歿risk_description}\n澶勭悊寤鸿锛歿suggestion}")
             else:
                 is_desc = 3
 
             # 鏁版嵁缁�
             data = {
-                "id": res['id'],
                 "event_level_id": res['event_level_id'],  # event_level_id
                 "event_level_name": res['event_level_name'],  # event_level_id
                 "rule_id": res["rule_id"],
                 "video_point_id": res['video_point_id'],  # video_point_id
                 "video_point_name": res['video_point_name'],
-                "is_waning": 1,
+                "is_waning": is_waning,
                 "is_desc": is_desc,
                 "zh_desc_class": desc,  # text_vector
                 "bounding_box": res['bounding_box'],  # bounding_box
@@ -137,9 +123,9 @@
                 "suggestion": suggestion,
                 "knowledge_id": res['knowledge_id']
             }
-
+            self.collection.delete(f"id == {res['id']}")
             # 淇濆瓨鍒癿ilvus
-            image_id = self.collection.upsert(data).primary_keys
+            image_id = self.collection.insert(data).primary_keys
             data = {
                 "id": str(image_id[0]),
                 "video_point_id": res['video_point_id'],
@@ -154,7 +140,7 @@
             # 璋冪敤rag
             asyncio.run(self.insert_json_data(ragurl, data))
             rag_time = datetime.now() - current_time
-            self.logger.info(f"{image_id}杩愯缁撴潫鎬讳綋鐢ㄦ椂:{datetime.now() - ks_time},鍥剧墖鎻忚堪鐢ㄦ椂{desc_time}锛孯AG鐢ㄦ椂{rag_time}")
+            self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿image_id}杩愯缁撴潫鎬讳綋鐢ㄦ椂:{datetime.now() - ks_time},鍥剧墖鎻忚堪鐢ㄦ椂{desc_time}锛孯AG鐢ㄦ椂{rag_time}")
         except Exception as e:
             self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊::{e}")
             return 0
@@ -187,7 +173,7 @@
             )
             inputs = inputs.to(model.device)
             with torch.inference_mode():
-                outputs = model.generate(**inputs,max_new_tokens=100)
+                outputs = model.generate(**inputs,max_new_tokens=200)
             generated_ids = outputs[:, len(inputs.input_ids[0]):]
             image_text = self.processor.batch_decode(
                 generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True

--
Gitblit v1.8.0