#2025/7/12
#milvus的upsert方法在多线程调用时多产生重复记录,修正为先删除在新增
| | |
| | | from pymilvus import connections, Collection |
| | | from logging.handlers import RotatingFileHandler |
| | | import get_mem |
| | | from multiprocessing import Process |
| | | |
| | | class ThreadPool: |
| | | def __init__(self): |
| | |
| | | output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id", |
| | | "task_name", "event_level_id", "event_level_name", |
| | | "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id", |
| | | "detect_id","knowledge_id","suggestion", |
| | | "detect_id","knowledge_id","suggestion","risk_description", |
| | | "detect_time", "image_path", "image_desc_path", "video_path"], |
| | | consistency_level="Strong", |
| | | order_by_field="id", # 按id字段排序 |
| | |
| | | if len(res_a) > 0: |
| | | sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True) |
| | | res = sorted_results[0] |
| | | self.collection.delete(f"id == {res['id']}") |
| | | # 数据组 |
| | | data = { |
| | | "id": res['id'], |
| | | "event_level_id": res['event_level_id'], # event_level_id |
| | | "event_level_name": res['event_level_name'], # event_level_id |
| | | "rule_id": res["rule_id"], |
| | |
| | | "video_point_name": res['video_point_name'], |
| | | "is_waning": 0, |
| | | "is_desc": 1, |
| | | "zh_desc_class": res['zh_desc_class'], # text_vector |
| | | "zh_desc_class": "", |
| | | "bounding_box": res['bounding_box'], # bounding_box |
| | | "task_id": res['task_id'], # task_id |
| | | "task_name": res['task_name'], # task_id |
| | |
| | | "image_desc_path": res['image_desc_path'], # image_desc_path |
| | | "video_path": res['video_path'], |
| | | "text_vector": res['text_vector'], |
| | | "knowledge_id": res['knowledge_id'], |
| | | "risk_description": res['risk_description'], |
| | | "suggestion": res['suggestion'], |
| | | "knowledge_id": res['knowledge_id'] |
| | | } |
| | | # logging.info(f"读取图像成功: {res['id']}") |
| | | # 保存到milvus |
| | | image_id = self.collection.upsert(data).primary_keys |
| | | image_id = self.collection.insert(data).primary_keys |
| | | res['id'] = image_id[0] |
| | | self.pool.submit(res) |
| | | time_sel.sleep(0.01) |
| | | except Exception as e: |
| | | logging.info(f"{camera_id}线程错误:{e}") |
| | | time_sel.sleep(0.01) |
| | | |
| | | |
| | | #调用是否需要更新 |
| | | def isUpdate(self): |
| | |
| | | risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens) |
| | | # 生成处理建议 |
| | | suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens) |
| | | self.logger.info( |
| | | f"{res['video_point_id']}执行完毕:{res['id']}:是否预警{is_waning},安全隐患:{risk_description}\n处理建议:{suggestion}") |
| | | else: |
| | | is_desc = 3 |
| | | |
| | | # 数据组 |
| | | data = { |
| | | "id": res['id'], |
| | | "event_level_id": res['event_level_id'], # event_level_id |
| | | "event_level_name": res['event_level_name'], # event_level_id |
| | | "rule_id": res["rule_id"], |
| | |
| | | "suggestion": suggestion, |
| | | "knowledge_id": res['knowledge_id'] |
| | | } |
| | | |
| | | self.collection.delete(f"id == {res['id']}") |
| | | # 保存到milvus |
| | | image_id = self.collection.upsert(data).primary_keys |
| | | image_id = self.collection.insert(data).primary_keys |
| | | data = { |
| | | "id": str(image_id[0]), |
| | | "video_point_id": res['video_point_id'], |
| | |
| | | ) |
| | | inputs = inputs.to(model.device) |
| | | with torch.inference_mode(): |
| | | outputs = model.generate(**inputs,max_new_tokens=100) |
| | | outputs = model.generate(**inputs,max_new_tokens=200) |
| | | generated_ids = outputs[:, len(inputs.input_ids[0]):] |
| | | image_text = self.processor.batch_decode( |
| | | generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True |