From a7e7050585325bdb873c6d312ea89de94215e11e Mon Sep 17 00:00:00 2001
From: shidong <shidong@jhsoft.cc>
Date: 星期六, 12 七月 2025 15:34:27 +0800
Subject: [PATCH] #2025/7/12 #milvus的upsert方法在多线程调用时多产生重复记录,修正为先删除在新增
---
qwen_thread.py | 46 ++++++++++++++++------------------------------
1 files changed, 16 insertions(+), 30 deletions(-)
diff --git a/qwen_thread.py b/qwen_thread.py
index 0a16fd8..9c348b4 100644
--- a/qwen_thread.py
+++ b/qwen_thread.py
@@ -17,11 +17,12 @@
class qwen_thread:
- def __init__(self, config):
+ def __init__(self, config,logger):
self.config = config
self.max_workers = int(config.get("threadnum"))
self.executor = ThreadPoolExecutor(max_workers=int(config.get("threadnum")))
self.semaphore = threading.Semaphore(int(config.get("threadnum")))
+ self.logger = logger
# 鍒濆鍖朚ilvus闆嗗悎
connections.connect("default", host=config.get("milvusurl"), port=config.get("milvusport"))
@@ -45,22 +46,6 @@
# 鍏变韩鐨勫鐞嗗櫒 (绾跨▼瀹夊叏)
self.processor = AutoProcessor.from_pretrained(config.get("qwenaddr"), use_fast=True)
- # 鍒涘缓瀹炰緥涓撳睘logger
- self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
- self.logger.setLevel(logging.INFO)
- # 閬垮厤閲嶅娣诲姞handler
- if not self.logger.handlers:
- handler = RotatingFileHandler(
- filename=os.path.join("logs", 'thread_log.log'),
- maxBytes=10 * 1024 * 1024,
- backupCount=3,
- encoding='utf-8'
- )
- formatter = logging.Formatter(
- '%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s'
- )
- handler.setFormatter(formatter)
- self.logger.addHandler(handler)
def submit(self,res_a):
# 灏濊瘯鑾峰彇淇″彿閲忥紙闈為樆濉烇級
@@ -101,25 +86,26 @@
# 璋冪敤瑙勫垯鍖归厤鏂规硶,鍒ゆ柇鏄惁棰勮
is_waning = self.image_rule_chat(desc, res['waning_value'], ragurl,rag_mode,max_tokens)
# 濡傛灉棰勮,鍒欑敓鎴愰殣鎮f弿杩板拰澶勭悊寤鸿
- #if is_waning == 1:
- # 鑾峰彇瑙勭珷鍒跺害鏁版嵁
- filedata = self.get_filedata(res['waning_value'],res['suggestion'], ragurl)
- # 鐢熸垚闅愭偅鎻忚堪
- risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
- # 鐢熸垚澶勭悊寤鸿
- suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
+ if is_waning == 1:
+ # 鑾峰彇瑙勭珷鍒跺害鏁版嵁
+ filedata = self.get_filedata(res['waning_value'],res['suggestion'], ragurl)
+ # 鐢熸垚闅愭偅鎻忚堪
+ risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
+ # 鐢熸垚澶勭悊寤鸿
+ suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
+ self.logger.info(
+ f"{res['video_point_id']}鎵ц瀹屾瘯锛歿res['id']}:鏄惁棰勮{is_waning},瀹夊叏闅愭偅锛歿risk_description}\n澶勭悊寤鸿锛歿suggestion}")
else:
is_desc = 3
# 鏁版嵁缁�
data = {
- "id": res['id'],
"event_level_id": res['event_level_id'], # event_level_id
"event_level_name": res['event_level_name'], # event_level_id
"rule_id": res["rule_id"],
"video_point_id": res['video_point_id'], # video_point_id
"video_point_name": res['video_point_name'],
- "is_waning": 1,
+ "is_waning": is_waning,
"is_desc": is_desc,
"zh_desc_class": desc, # text_vector
"bounding_box": res['bounding_box'], # bounding_box
@@ -137,9 +123,9 @@
"suggestion": suggestion,
"knowledge_id": res['knowledge_id']
}
-
+ self.collection.delete(f"id == {res['id']}")
# 淇濆瓨鍒癿ilvus
- image_id = self.collection.upsert(data).primary_keys
+ image_id = self.collection.insert(data).primary_keys
data = {
"id": str(image_id[0]),
"video_point_id": res['video_point_id'],
@@ -154,7 +140,7 @@
# 璋冪敤rag
asyncio.run(self.insert_json_data(ragurl, data))
rag_time = datetime.now() - current_time
- self.logger.info(f"{image_id}杩愯缁撴潫鎬讳綋鐢ㄦ椂:{datetime.now() - ks_time},鍥剧墖鎻忚堪鐢ㄦ椂{desc_time}锛孯AG鐢ㄦ椂{rag_time}")
+ self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿image_id}杩愯缁撴潫鎬讳綋鐢ㄦ椂:{datetime.now() - ks_time},鍥剧墖鎻忚堪鐢ㄦ椂{desc_time}锛孯AG鐢ㄦ椂{rag_time}")
except Exception as e:
self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊::{e}")
return 0
@@ -187,7 +173,7 @@
)
inputs = inputs.to(model.device)
with torch.inference_mode():
- outputs = model.generate(**inputs,max_new_tokens=100)
+ outputs = model.generate(**inputs,max_new_tokens=200)
generated_ids = outputs[:, len(inputs.input_ids[0]):]
image_text = self.processor.batch_decode(
generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
--
Gitblit v1.8.0