#2025/7/12
#milvus的upsert方法在多线程调用时多产生重复记录,修正为先删除在新增
2个文件已修改
26 ■■■■■ 已修改文件
qwen_detect.py 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
qwen_thread.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
qwen_detect.py
@@ -9,6 +9,7 @@
from pymilvus import connections, Collection
from logging.handlers import RotatingFileHandler
import get_mem
from multiprocessing import Process
class ThreadPool:
    def __init__(self):
@@ -96,7 +97,7 @@
                    output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
                                   "task_name", "event_level_id", "event_level_name",
                                   "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id",
                                   "detect_id","knowledge_id","suggestion",
                                   "detect_id","knowledge_id","suggestion","risk_description",
                                   "detect_time", "image_path", "image_desc_path", "video_path"],
                    consistency_level="Strong",
                    order_by_field="id",  # 按id字段排序
@@ -107,8 +108,9 @@
                if len(res_a) > 0:
                    sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
                    res = sorted_results[0]
                    self.collection.delete(f"id == {res['id']}")
                    # 数据组
                    data = {
                        "id": res['id'],
                        "event_level_id": res['event_level_id'],  # event_level_id
                        "event_level_name": res['event_level_name'],  # event_level_id
                        "rule_id": res["rule_id"],
@@ -116,7 +118,7 @@
                        "video_point_name": res['video_point_name'],
                        "is_waning": 0,
                        "is_desc": 1,
                        "zh_desc_class": res['zh_desc_class'],  # text_vector
                        "zh_desc_class": "",
                        "bounding_box": res['bounding_box'],  # bounding_box
                        "task_id": res['task_id'],  # task_id
                        "task_name": res['task_name'],  # task_id
@@ -128,17 +130,18 @@
                        "image_desc_path": res['image_desc_path'],  # image_desc_path
                        "video_path": res['video_path'],
                        "text_vector": res['text_vector'],
                        "knowledge_id": res['knowledge_id'],
                        "risk_description": res['risk_description'],
                        "suggestion": res['suggestion'],
                        "knowledge_id": res['knowledge_id']
                    }
                    # logging.info(f"读取图像成功: {res['id']}")
                    # 保存到milvus
                    image_id = self.collection.upsert(data).primary_keys
                    image_id = self.collection.insert(data).primary_keys
                    res['id'] = image_id[0]
                    self.pool.submit(res)
                time_sel.sleep(0.01)
            except Exception as e:
                logging.info(f"{camera_id}线程错误:{e}")
            time_sel.sleep(0.01)
    #调用是否需要更新
    def isUpdate(self):
qwen_thread.py
@@ -93,12 +93,13 @@
                    risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
                    # 生成处理建议
                    suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
                    self.logger.info(
                        f"{res['video_point_id']}执行完毕:{res['id']}:是否预警{is_waning},安全隐患:{risk_description}\n处理建议:{suggestion}")
            else:
                is_desc = 3
            # 数据组
            data = {
                "id": res['id'],
                "event_level_id": res['event_level_id'],  # event_level_id
                "event_level_name": res['event_level_name'],  # event_level_id
                "rule_id": res["rule_id"],
@@ -122,9 +123,9 @@
                "suggestion": suggestion,
                "knowledge_id": res['knowledge_id']
            }
            self.collection.delete(f"id == {res['id']}")
            # 保存到milvus
            image_id = self.collection.upsert(data).primary_keys
            image_id = self.collection.insert(data).primary_keys
            data = {
                "id": str(image_id[0]),
                "video_point_id": res['video_point_id'],
@@ -172,7 +173,7 @@
            )
            inputs = inputs.to(model.device)
            with torch.inference_mode():
                outputs = model.generate(**inputs,max_new_tokens=100)
                outputs = model.generate(**inputs,max_new_tokens=200)
            generated_ids = outputs[:, len(inputs.input_ids[0]):]
            image_text = self.processor.batch_decode(
                generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True