From a7e7050585325bdb873c6d312ea89de94215e11e Mon Sep 17 00:00:00 2001
From: shidong <shidong@jhsoft.cc>
Date: 星期六, 12 七月 2025 15:34:27 +0800
Subject: [PATCH] #2025/7/12 #milvus的upsert方法在多线程调用时多产生重复记录,修正为先删除在新增

---
 qwen_thread.py |  207 ++++++++++++++++++++++-----------------------------
 1 files changed, 90 insertions(+), 117 deletions(-)

diff --git a/qwen_thread.py b/qwen_thread.py
index 85275f9..9c348b4 100644
--- a/qwen_thread.py
+++ b/qwen_thread.py
@@ -17,55 +17,42 @@
 
 
 class qwen_thread:
-    def __init__(self, max_workers,config,model_path):
-        self.executor = ThreadPoolExecutor(max_workers=max_workers)
-        self.semaphore = threading.Semaphore(max_workers)
-        self.max_workers = max_workers
+    def __init__(self, config,logger):
+        self.config = config
+        self.max_workers = int(config.get("threadnum"))
+        self.executor = ThreadPoolExecutor(max_workers=int(config.get("threadnum")))
+        self.semaphore = threading.Semaphore(int(config.get("threadnum")))
+        self.logger = logger
+
         # 鍒濆鍖朚ilvus闆嗗悎
         connections.connect("default", host=config.get("milvusurl"), port=config.get("milvusport"))
         # 鍔犺浇闆嗗悎
         self.collection = Collection(name="smartobject")
         self.collection.load()
 
-        self.config = config
         self.model_pool = []
-        self.lock_pool = [threading.Lock() for _ in range(max_workers)]
-        for i in range(max_workers):
+        self.lock_pool = [threading.Lock() for _ in range(int(config.get("threadnum")))]
+        for i in range(int(config.get("threadnum"))):
             model = AutoModelForVision2Seq.from_pretrained(
-                model_path,
+                config.get("qwenaddr"),
                 device_map=f"cuda:{config.get('cuda')}",
                 trust_remote_code=True,
+                use_safetensors=True,
                 torch_dtype=torch.float16
+
             ).eval()
             self.model_pool.append(model)
 
         # 鍏变韩鐨勫鐞嗗櫒 (绾跨▼瀹夊叏)
-        self.processor = AutoProcessor.from_pretrained(model_path,use_fast=True)
+        self.processor = AutoProcessor.from_pretrained(config.get("qwenaddr"), use_fast=True)
 
-
-        # 鍒涘缓瀹炰緥涓撳睘logger
-        self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
-        self.logger.setLevel(logging.INFO)
-        # 閬垮厤閲嶅娣诲姞handler
-        if not self.logger.handlers:
-            handler = RotatingFileHandler(
-                filename=os.path.join("logs", 'thread_log.log'),
-                maxBytes=10 * 1024 * 1024,
-                backupCount=3,
-                encoding='utf-8'
-            )
-            formatter = logging.Formatter(
-                '%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s'
-            )
-            handler.setFormatter(formatter)
-            self.logger.addHandler(handler)
 
     def submit(self,res_a):
         # 灏濊瘯鑾峰彇淇″彿閲忥紙闈為樆濉烇級
         acquired = self.semaphore.acquire(blocking=False)
 
         if not acquired:
-            self.logger.info(f"绾跨▼姹犲凡婊★紝绛夊緟绌洪棽绾跨▼... (褰撳墠娲昏穬: {self.max_workers - self.semaphore._value}/{self.max_workers})")
+            #self.logger.info(f"绾跨▼姹犲凡婊★紝绛夊緟绌洪棽绾跨▼... (褰撳墠娲昏穬: {self.max_workers - self.semaphore._value}/{self.max_workers})")
             # 闃诲绛夊緟鐩村埌鏈夊彲鐢ㄧ嚎绋�
             self.semaphore.acquire(blocking=True)
 
@@ -73,15 +60,11 @@
         future.add_done_callback(self._release_semaphore)
         return future
 
-    def _wrap_task(self, res):
+    def _wrap_task(self, res_a):
         try:
-            #self.logger.info(f"澶勭悊: { res['id']}寮�濮�")
-            current_time = datetime.now()
-            image_id = self.tark_do(res, self.config.get("ragurl"), self.config.get("ragmode"), self.config.get("max_tokens"))
-            self.logger.info(f"澶勭悊: { res['id']}瀹屾瘯{image_id}:{datetime.now() - current_time}")
-            return image_id
+            self.tark_do(res_a, self.config.get("ragurl"), self.config.get("ragmode"), self.config.get("max_tokens"))
         except Exception as e:
-            self.logger.info(f"浠诲姟 { res['id']} 澶勭悊鍑洪敊: {e}")
+            self.logger.info(f"澶勭悊鍑洪敊: {e}")
             raise
 
     def tark_do(self,res,ragurl,rag_mode,max_tokens):
@@ -91,29 +74,32 @@
             is_desc = 2
 
             # 鐢熸垚鍥剧墖鎻忚堪
-            image_des = self.image_desc(res['image_desc_path'])
+            ks_time = datetime.now()
+            desc = self.image_desc(res)
+            desc_time = datetime.now() - ks_time
+            current_time = datetime.now()
             risk_description = ""
             suggestion = ""
             # 鍥剧墖鎻忚堪鐢熸垚鎴愬姛
-            if image_des:
+            if desc:
                 is_desc = 2
                 # 璋冪敤瑙勫垯鍖归厤鏂规硶,鍒ゆ柇鏄惁棰勮
-                is_waning = self.image_rule_chat(image_des, res['waning_value'], ragurl, rag_mode, max_tokens)
+                is_waning = self.image_rule_chat(desc, res['waning_value'], ragurl,rag_mode,max_tokens)
                 # 濡傛灉棰勮,鍒欑敓鎴愰殣鎮f弿杩板拰澶勭悊寤鸿
                 if is_waning == 1:
                     # 鑾峰彇瑙勭珷鍒跺害鏁版嵁
-                    filedata = self.get_filedata(res['waning_value'], ragurl)
+                    filedata = self.get_filedata(res['waning_value'],res['suggestion'], ragurl)
                     # 鐢熸垚闅愭偅鎻忚堪
-                    risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl, rag_mode)
+                    risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
                     # 鐢熸垚澶勭悊寤鸿
-                    suggestion = self.image_rule_chat_suggestion(res['waning_value'], ragurl, rag_mode)
-
+                    suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
+                    self.logger.info(
+                        f"{res['video_point_id']}鎵ц瀹屾瘯锛歿res['id']}:鏄惁棰勮{is_waning},瀹夊叏闅愭偅锛歿risk_description}\n澶勭悊寤鸿锛歿suggestion}")
             else:
                 is_desc = 3
 
             # 鏁版嵁缁�
             data = {
-                "id": res['id'],
                 "event_level_id": res['event_level_id'],  # event_level_id
                 "event_level_name": res['event_level_name'],  # event_level_id
                 "rule_id": res["rule_id"],
@@ -121,7 +107,7 @@
                 "video_point_name": res['video_point_name'],
                 "is_waning": is_waning,
                 "is_desc": is_desc,
-                "zh_desc_class": image_des,  # text_vector
+                "zh_desc_class": desc,  # text_vector
                 "bounding_box": res['bounding_box'],  # bounding_box
                 "task_id": res['task_id'],  # task_id
                 "task_name": res['task_name'],  # task_id
@@ -137,15 +123,14 @@
                 "suggestion": suggestion,
                 "knowledge_id": res['knowledge_id']
             }
-
+            self.collection.delete(f"id == {res['id']}")
             # 淇濆瓨鍒癿ilvus
-            image_id = self.collection.upsert(data).primary_keys
-            logging.info(image_id)
+            image_id = self.collection.insert(data).primary_keys
             data = {
                 "id": str(image_id[0]),
                 "video_point_id": res['video_point_id'],
                 "video_path": res["video_point_name"],
-                "zh_desc_class": image_des,
+                "zh_desc_class": desc,
                 "detect_time": res['detect_time'],
                 "image_path": f"{res['image_path']}",
                 "task_name": res["task_name"],
@@ -154,17 +139,17 @@
             }
             # 璋冪敤rag
             asyncio.run(self.insert_json_data(ragurl, data))
-            return image_id
+            rag_time = datetime.now() - current_time
+            self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿image_id}杩愯缁撴潫鎬讳綋鐢ㄦ椂:{datetime.now() - ks_time},鍥剧墖鎻忚堪鐢ㄦ椂{desc_time}锛孯AG鐢ㄦ椂{rag_time}")
         except Exception as e:
-            self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊:浠诲姟锛歿res['id']} :{e}")
+            self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊::{e}")
             return 0
 
-    def image_desc(self, image_path):
+    def image_desc(self, res_data):
         try:
             model, lock = self._acquire_model()
-            # 2. 澶勭悊鍥惧儚
-            image = Image.open(image_path).convert("RGB")  # 鏇挎崲涓烘偍鐨勫浘鐗囪矾寰�
-            image = image.resize((600, 600), Image.Resampling.LANCZOS)  # 楂樿川閲忕缉鏀�
+
+            image = Image.open(res_data['image_desc_path']).convert("RGB").resize((600, 600), Image.Resampling.LANCZOS)
             messages = [
                 {
                     "role": "user",
@@ -187,14 +172,8 @@
                 return_tensors="pt",
             )
             inputs = inputs.to(model.device)
-            current_time = datetime.now()
-            outputs = model.generate(**inputs,
-                                               max_new_tokens=300,
-                                               do_sample=True,
-                                               temperature=0.7,
-                                               renormalize_logits=True
-                                               )
-            print(f"澶勭悊瀹屾瘯:{datetime.now() - current_time}")
+            with torch.inference_mode():
+                outputs = model.generate(**inputs,max_new_tokens=200)
             generated_ids = outputs[:, len(inputs.input_ids[0]):]
             image_text = self.processor.batch_decode(
                 generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
@@ -241,7 +220,6 @@
         try:
             content = (
                 f"鍥剧墖鎻忚堪鍐呭涓猴細\n{image_des}\n瑙勫垯鍐呭锛歕n{rule_text}銆俓n璇烽獙璇佸浘鐗囨弿杩颁腑鏄惁鏈夌鍚堣鍒欑殑鍐呭锛屼笉杩涜鎺ㄧ悊鍜宼hink銆傝繑鍥炵粨鏋滄牸寮忎负[xxx绗﹀悎鐨勮鍒檌d]锛屽鏋滄病鏈夎繑鍥瀃]")
-            # self.logger.info(content)
             #self.logger.info(len(content))
             search_data = {
                 "prompt": "",
@@ -260,7 +238,6 @@
             }
             response = requests.post(ragurl + "/chat", json=search_data)
             results = response.json().get('data')
-            #self.logger.info(len(results))
             # self.logger.info(results)
             ret = re.sub(r'<think>.*?</think>', '', results, flags=re.DOTALL)
             ret = ret.replace(" ", "").replace("\t", "").replace("\n", "")
@@ -273,72 +250,67 @@
             return None
 
     # 闅愭偅鎻忚堪
-    def image_rule_chat_with_detail(self, filedata, rule_text, ollama_url, ollama_mode="qwen2.5vl:3b"):
-
+    def image_rule_chat_with_detail(self,filedata, rule_text, ragurl, rag_mode,max_tokens):
         # API璋冪敤
+        content = (
+            f"瑙勭珷鍒跺害涓猴細[{filedata}]\n杩濆弽鍐呭涓猴細[{rule_text}]\n璇锋煡璇㈣繚鍙嶅唴瀹瑰湪瑙勭珷鍒跺害涓殑瀹夊叏闅愭偅锛屼笉杩涜鎺ㄧ悊鍜宼hink锛岃繑鍥炵畝鐭殑鏂囧瓧淇℃伅")
+        # self.logger.info(len(content))
+        search_data = {
+            "prompt": "",
+            "messages": [
+                {
+                    "role": "user",
+                    "content": content
+                }
+            ],
+            "llm_name": rag_mode,
+            "stream": False,
+            "gen_conf": {
+                "temperature": 0.7,
+                "max_tokens": max_tokens
+            }
+        }
+        response = requests.post(ragurl + "/chat", json=search_data)
+        # 浠巎son鎻愬彇data瀛楁鍐呭
+        ret = response.json()["data"]
+        # 绉婚櫎<think>鏍囩鍜屽唴瀹�
+        ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
+        # 瀛楃涓叉竻鐞�,绉婚櫎绌烘牸,鍒惰〃绗�,鎹㈣绗�,鏄熷彿
+        ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
+        #print(f"瀹夊叏闅愭偅:{ret}")
+        return ret
+    #澶勭悊寤鸿
+    def image_rule_chat_suggestion(self,filedata, rule_text, ragurl, rag_mode,max_tokens):
+        # 璇锋眰鍐呭
+        content = (
+            f"瑙勭珷鍒跺害涓猴細[{filedata}]\n杩濆弽鍐呭涓猴細[{rule_text}]\n璇锋煡璇㈣繚鍙嶅唴瀹瑰湪瑙勭珷鍒跺害涓殑澶勭悊寤鸿锛屼笉杩涜鎺ㄧ悊鍜宼hink锛岃繑鍥炵畝鐭殑鏂囧瓧淇℃伅")
         response = requests.post(
             # ollama鍦板潃
-            url=f"{ollama_url}/chat",
+            url=f"{ragurl}/chat",
             json={
-                "prompt": "",
-                # 璇锋眰鍐呭
-                "messages": [
-                    {
-                        "role": "user",
-                        "content": f"璇锋牴鎹绔犲埗搴{filedata}]\n鏌ユ壘[{rule_text}]鐨勫畨鍏ㄩ殣鎮f弿杩帮紝涓嶈繘琛屾帹鐞嗗拰think銆傝繑鍥炰俊鎭皬浜�800瀛�"
-                    }
-                ],
                 # 鎸囧畾妯″瀷
-                "llm_name": "qwen3:8b",
+                "llm_name": rag_mode,
+                "messages": [
+                    {"role": "user", "content": content}
+                ],
                 "stream": False,  # 鍏抽棴娴佸紡杈撳嚭
                 "gen_conf": {
-                    "temperature": 0.7,  # 鎺у埗鐢熸垚闅忔満鎬�
-                    "max_tokens": 800  # 鏈�澶ц緭鍑洪暱搴�
+                    "temperature": 0.7,
+                    "max_tokens": max_tokens
                 }
             }
         )
         # 浠巎son鎻愬彇data瀛楁鍐呭
         ret = response.json()["data"]
-        # result = response.json()
-        # ret = result.get("data") or result.get("message", {}).get("content", "")
         # 绉婚櫎<think>鏍囩鍜屽唴瀹�
         ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
         # 瀛楃涓叉竻鐞�,绉婚櫎绌烘牸,鍒惰〃绗�,鎹㈣绗�,鏄熷彿
-        ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**", "")
-        print(ret)
-        return ret
-
-    # 澶勭悊寤鸿
-    def image_rule_chat_suggestion(self, rule_text, ollama_url, ollama_mode="qwen2.5vl:3b"):
-        self.logger.info("----------------------------------------------------------------")
-        # 璇锋眰鍐呭
-        content = (
-            f"璇锋牴鎹繚瑙勫唴瀹筟{rule_text}]\n杩涜杩斿洖澶勭悊杩濊寤鸿锛屼笉杩涜鎺ㄧ悊鍜宼hink銆傝繑鍥炵簿鍑嗕俊鎭�")
-        response = requests.post(
-            # ollama鍦板潃
-            url=f"{ollama_url}/chat",
-            json={
-                # 鎸囧畾妯″瀷
-                "llm_name": "qwen3:8b",
-                "messages": [
-                    {"role": "user", "content": content}
-                ],
-                "stream": False  # 鍏抽棴娴佸紡杈撳嚭
-            }
-        )
-        # 浠巎son鎻愬彇data瀛楁鍐呭
-        ret = response.json()["data"]
-        # result = response.json()
-        # ret = result.get("data") or result.get("message", {}).get("content", "")
-        # 绉婚櫎<think>鏍囩鍜屽唴瀹�
-        ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
-        # 瀛楃涓叉竻鐞�,绉婚櫎绌烘牸,鍒惰〃绗�,鎹㈣绗�,鏄熷彿
-        ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**", "")
-        print(ret)
+        ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
+        #print(f"澶勭悊寤鸿:{ret}")
         return ret
 
     # RAG鏈嶅姟鍙戦�佽姹�,鑾峰彇鐭ヨ瘑搴撳唴瀹�
-    def get_filedata(self, searchtext, ragurl):
+    def get_filedata(self, searchtext,filter_expr, ragurl):
         search_data = {
             # 鐭ヨ瘑搴撻泦鍚�
             "collection_name": "smart_knowledge",
@@ -347,16 +319,17 @@
             # 鎼滅储妯″紡
             "search_mode": "hybrid",
             # 鏈�澶氳繑鍥炵粨鏋�
-            "limit": 100,
+            "limit": 10,
             # 璋冨瘑鍚戦噺鎼滅储鏉冮噸
-            "weight_dense": 0.7,
+            "weight_dense": 0.9,
             # 绋�鐤忓悜閲忔悳绱㈡潈閲�
-            "weight_sparse": 0.3,
+            "weight_sparse": 0.1,
             # 绌哄瓧绗︿覆
-            "filter_expr": "",
+            "filter_expr": f"docnm_kwd in {filter_expr}",
             # 鍙繑鍥� text 瀛楁
             "output_fields": ["text"]
         }
+        #print(search_data)
         # 鍚� ragurl + "/search" 绔偣鍙戦�丳OST璇锋眰
         response = requests.post(ragurl + "/search", json=search_data)
         # 浠庡搷搴斾腑鑾峰彇'results'瀛楁
@@ -366,7 +339,7 @@
         # 閬嶅巻鎵�鏈夌粨鏋滆鍒�(rule)锛屽皢姣忔潯瑙勫垯鐨�'entity'涓殑'text'瀛楁鍙栧嚭.
         for rule in results:
             text = text + rule['entity'].get('text') + ";\n"
-
+        #print(text)
         return text
 
     async def insert_json_data(self, ragurl, data):
@@ -380,7 +353,7 @@
 
     def _release_semaphore(self, future):
         self.semaphore.release()
-        self.logger.info(f"閲婃斁绾跨▼ (鍓╀綑绌洪棽: {self.semaphore._value}/{self.max_workers})")
+        #self.logger.info(f"閲婃斁绾跨▼ (鍓╀綑绌洪棽: {self.semaphore._value}/{self.max_workers})")
 
     def shutdown(self):
         """瀹夊叏鍏抽棴"""

--
Gitblit v1.8.0