import os import logging from logging.handlers import RotatingFileHandler import cv2 import torch from PIL import Image from pymilvus import connections, Collection from transformers import AutoTokenizer, AutoModelForVision2Seq, AutoProcessor, BitsAndBytesConfig from flask import Flask, request, jsonify app = Flask(__name__) #规则比对 def cross_collection_vector_compare(self,image_id,video_point_id,rule_id,video_image_time,frame_id,task_id,video_path,videotaskurl): try: # 1. 从集合A获取向量和元数据 res_a = self.collection.query( expr=f"id in {image_id}", output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "object_label", "task_id", "event_level_id", "video_point_id", "detect_num", "detect_id", "detect_time", "image_path", "video_path"], consistency_level="Strong" ) # 2. 从集合B获取向量和元数据 res_b = self.collection_rule.query( expr=f"rule_id in {rule_id}", output_fields=["id", "zh_desc_class", "text_vector","range_value"], consistency_level="Strong" ) # 3. 计算两两相似度 results = [] pos_weights = { ('n', 'n'): 1.0, # 名词匹配 ('v', 'v'): 0.8, # 动词匹配 ('n', 'v'): 0.3 # 名词-动词交叉 } # 循环结果集 for item_a in res_a: if item_a["detect_num"]>0 : # 有检测到目标时进行比对 for item_b in res_b: # 和每个规则项进行对比 similarity = 0 # 1. 向量相似度(60%权重) # 将向量转为numpy数组 vec_a = np.array(item_a["text_vector"]) vec_b = np.array(item_b["text_vector"]) assert vec_a.shape == vec_b.shape, f"维度不匹配: {vec_a.shape} vs {vec_b.shape}" vec_a = vec_a.astype(np.float64) # 提升计算精度 vec_b = vec_b.astype(np.float64) # 计算相似度 vector_sim = np.dot(vec_a, vec_b) / (np.linalg.norm(vec_a) * np.linalg.norm(vec_b)) # 2. 词性匹配度(40%权重) desc_a = ast.literal_eval(item_a['zh_desc_class']) # 获取目标语义 desc_b = ast.literal_eval(item_b['zh_desc_class']) for clas_a in desc_a: # 和每个目标语义进行对比 pos_match = 0 for (_, pos_a, _), (_, pos_b, _) in zip(clas_a, desc_b): pos_match += pos_weights.get((pos_a, pos_b), 0) pos_match /= max(len(clas_a), len(desc_b)) #按权重组装 当前相似度 并赋值最大值 stem_int = 0.6 * vector_sim + 0.4 * pos_match if stem_int > similarity: similarity = stem_int # 3.相似度如果大于规则匹配阈值,组装预警数据 if similarity > item_b["range_value"]: logging.info(f"相似度:{similarity}:{item_a['zh_desc_class']} {item_b['zh_desc_class']}") results.append({ "a_id": item_a["id"], "b_id": item_b["id"], "similarity": round(float(similarity), 4) }) # 4. 按相似度排序 if len(results) > 0: comparison_results = sorted(results, key=lambda x: x["similarity"], reverse=True) # 集合排序 # 保存录像视频 asyncio.run(self.video_task(video_point_id,video_image_time,"basic",frame_id,video_path,videotaskurl)) # 保存到milvus self.update_milvus(image_id,res_a,1,comparison_results[0]['similarity'],rule_id) else: self.update_milvus(image_id,res_a,0,0.0,rule_id) except Exception as e: self.logger.info(f"{video_point_id}线程:规则对比时出错:{image_id,rule_id}: {e}") #启动录像 async def video_task(self,video_point_id,video_image_time,task_id,frame_id,video_path,videotaskurl): try: json_data = { "cameraId": f"{video_point_id}", "timestamp": video_image_time, "preSeconds": 10, "postSeconds": 10, "taskId": task_id, "fid": frame_id, "uploadUrl": video_path, "uploadType": "local" } self.logger.info(f"{video_point_id}线程:调用录像服务:{videotaskurl, json_data}") # 定义请求的 URL # 发送 GET 请求 response = requests.post(videotaskurl,json=json_data,timeout=(0.03, 0.03)) # 检查响应状态码 if response.status_code == 200: data = response.json() print(data) except Exception as e: self.logger.info(f"{self._thread_name}线程:调用录像时出错:地址:{videotaskurl}:{e}") def update_milvus(self,image_id,res_a,is_waning,similarity,rule_id_list): try: # 保存到milvus self.collection.upsert( data=[{ "id": image_id[0], "text_vector": res_a[0]["text_vector"], "is_waning": is_waning, "waning_value": similarity, "rule_id": rule_id_list, "zh_desc_class": res_a[0]['zh_desc_class'], # text_vector "bounding_box": res_a[0]['bounding_box'], # bounding_box "object_label": res_a[0]['object_label'], # desc "task_id": res_a[0]['task_id'], # task_id "event_level_id": res_a[0]['event_level_id'], # event_level_id "video_point_id": res_a[0]['video_point_id'], # video_point_id "detect_num": res_a[0]['detect_num'], "detect_id": res_a[0]['detect_id'], # detect_id "detect_time": res_a[0]['detect_time'], # detect_time "image_path": res_a[0]['image_path'], # image_path "video_path": res_a[0]['video_path'] # video_path }] ) except Exception as e: self.logger.info(f"{self._thread_name}线程:规则对比后修改数据时出错:{image_id}:是否预警{is_waning}:预警值:{similarity}:规则id:{rule_id_list}:数据集合:{len(res_a)} :{e}") def tark_do(image_id,image_path,milvusurl,milvusport): try : # 初始化qwenvl检测模型 qwen_tokenizer = AutoProcessor.from_pretrained("Qwen2-VL-2B", trust_remote_code=True) qwen_model = AutoModelForVision2Seq.from_pretrained("Qwen2-VL-2B", device_map="auto", trust_remote_code=True).eval() # 初始化Milvus集合 connections.connect("default", host=milvusurl, port=milvusport) # 加载集合 collection = Collection(name="smartobject") collection.load() # 1. 从集合A获取向量和元数据 res_a = collection.query( expr=f"id == {image_id}", output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "object_label", "task_id", "event_level_id", "video_point_id", "detect_num", "is_waning", "waning_value", "rule_id", "detect_id", "detect_time", "image_path", "video_path"], consistency_level="Strong" ) # 图片和视频地址 image = Image.open(image_path) # 替换为您的图片路径 image = image.thumbnail((512, 512)) text = "描述这张图片的内容" # 处理输入 inputs = qwen_tokenizer(text=text, images=image, return_tensors="pt").to("cuda") # 生成输出 with torch.no_grad(): outputs = qwen_model.generate(**inputs, max_new_tokens=50) response = qwen_tokenizer.decode(outputs[0], skip_special_tokens=True) print(response) # 保存到milvus collection.upsert( data=[{ "id": image_id, "text_vector": res_a[0]["text_vector"], "is_waning": res_a[0]["is_waning"], "waning_value": res_a[0]["waning_value"], "rule_id": res_a[0]["rule_id"], "zh_desc_class": res_a[0]['zh_desc_class'], # text_vector "bounding_box": res_a[0]['bounding_box'], # bounding_box "object_label": f"{response}", # desc "task_id": res_a[0]['task_id'], # task_id "event_level_id": res_a[0]['event_level_id'], # event_level_id "video_point_id": res_a[0]['video_point_id'], # video_point_id "detect_num": res_a[0]['detect_num'], "detect_id": res_a[0]['detect_id'], # detect_id "detect_time": res_a[0]['detect_time'], # detect_time "image_path": res_a[0]['image_path'], # image_path "video_path": res_a[0]['video_path'] # video_path }] ) print(f"执行任务{image_id,image_path}:结束") return 1 except Exception as e: print(f"执行模型解析时出错:{image_id,image_path} :{e}") return 0 # 示例:文本处理接口 @app.route('/process', methods=['POST']) def process_text(): data = request.json # 调用您的处理函数(例如 Qwen2-VL 模型) try: return tark_do(data.get("image_id"),data.get("image_path"),data.get("milvusurl"),data.get("milvusport")) # 替换为实际函数 except Exception as e: # 错误处理 return 0 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=True)