import threading import time as time_sel from typing import Dict import cv2 import numpy as np import detect_task from datetime import time import requests import os from datetime import datetime import torch import logging import jieba.posseg as pseg from transformers import AutoModel, AutoTokenizer, AutoModelForCausalLM, AutoConfig, AutoProcessor, \ AutoModelForVision2Seq from pymilvus import connections, Collection from logging.handlers import RotatingFileHandler class ThreadPool: def __init__(self): #读取配置文件 self.config = {} with open('./conf.txt', 'r', encoding='utf-8') as file: for line in file: # 去除每行的首尾空白字符(包括换行符) line = line.strip() # 跳过空行 if not line: continue # 分割键和值 if '=' in line: key, value = line.split('=', 1) # 去除键和值的首尾空白字符 key = key.strip() value = value.strip() # 将键值对添加到字典中 self.config[key] = value self.threads: Dict[str, threading.Thread] = {} self.lock = threading.Lock() # 配置日志 # 确保日志目录存在 log_dir = "logs" os.makedirs(log_dir, exist_ok=True) self.device = "cuda:0" if torch.cuda.is_available() else "cpu" # 初始化向量模型 self.qwen_tokenizer = AutoProcessor.from_pretrained("Qwen2-VL-2B", trust_remote_code=True) self.qwen_model = AutoModelForVision2Seq.from_pretrained("Qwen2-VL-2B", device_map="auto", trust_remote_code=True).eval() # 初始化Milvus集合 connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport")) # 加载集合 self.collection = Collection(name="smartobject") self.collection.load() # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ # 按大小轮转的日志文件(最大10MB,保留3个备份) RotatingFileHandler( filename=os.path.join(log_dir, 'start_log.log'), maxBytes=10 * 1024 * 1024, # 10MB backupCount=3, encoding='utf-8' ), # 同时输出到控制台 logging.StreamHandler() ] ) def cross_collection_vector_compare(self,image_id,video_point_id,rule_id,video_image_time,frame_id,task_id,video_path,videotaskurl): try: # 1. 从集合A获取向量和元数据 res_a = self.collection.query( expr=f"id in {image_id}", output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "object_label", "task_id", "event_level_id", "video_point_id", "detect_num", "detect_id", "detect_time", "image_path", "video_path"], consistency_level="Strong" ) # 2. 从集合B获取向量和元数据 res_b = self.collection_rule.query( expr=f"rule_id in {rule_id}", output_fields=["id", "zh_desc_class", "text_vector","range_value"], consistency_level="Strong" ) # 3. 计算两两相似度 results = [] pos_weights = { ('n', 'n'): 1.0, # 名词匹配 ('v', 'v'): 0.8, # 动词匹配 ('n', 'v'): 0.3 # 名词-动词交叉 } # 循环结果集 for item_a in res_a: if item_a["detect_num"]>0 : # 有检测到目标时进行比对 for item_b in res_b: # 和每个规则项进行对比 similarity = 0 # 1. 向量相似度(60%权重) # 将向量转为numpy数组 vec_a = np.array(item_a["text_vector"]) vec_b = np.array(item_b["text_vector"]) assert vec_a.shape == vec_b.shape, f"维度不匹配: {vec_a.shape} vs {vec_b.shape}" vec_a = vec_a.astype(np.float64) # 提升计算精度 vec_b = vec_b.astype(np.float64) # 计算相似度 vector_sim = np.dot(vec_a, vec_b) / (np.linalg.norm(vec_a) * np.linalg.norm(vec_b)) # 2. 词性匹配度(40%权重) desc_a = ast.literal_eval(item_a['zh_desc_class']) # 获取目标语义 desc_b = ast.literal_eval(item_b['zh_desc_class']) for clas_a in desc_a: # 和每个目标语义进行对比 pos_match = 0 for (_, pos_a, _), (_, pos_b, _) in zip(clas_a, desc_b): pos_match += pos_weights.get((pos_a, pos_b), 0) pos_match /= max(len(clas_a), len(desc_b)) #按权重组装 当前相似度 并赋值最大值 stem_int = 0.6 * vector_sim + 0.4 * pos_match if stem_int > similarity: similarity = stem_int # 3.相似度如果大于规则匹配阈值,组装预警数据 if similarity > item_b["range_value"]: logging.info(f"相似度:{similarity}:{item_a['zh_desc_class']} {item_b['zh_desc_class']}") results.append({ "a_id": item_a["id"], "b_id": item_b["id"], "similarity": round(float(similarity), 4) }) # 4. 按相似度排序 if len(results) > 0: comparison_results = sorted(results, key=lambda x: x["similarity"], reverse=True) # 集合排序 # 保存录像视频 asyncio.run(self.video_task(video_point_id,video_image_time,"basic",frame_id,video_path,videotaskurl)) # 保存到milvus self.update_milvus(image_id,res_a,1,comparison_results[0]['similarity'],rule_id) else: self.update_milvus(image_id,res_a,0,0.0,rule_id) except Exception as e: self.logger.info(f"{video_point_id}线程:规则对比时出错:{image_id,rule_id}: {e}") #启动录像 async def video_task(self,video_point_id,video_image_time,task_id,frame_id,video_path,videotaskurl): try: json_data = { "cameraId": f"{video_point_id}", "timestamp": video_image_time, "preSeconds": 10, "postSeconds": 10, "taskId": task_id, "fid": frame_id, "uploadUrl": video_path, "uploadType": "local" } self.logger.info(f"{video_point_id}线程:调用录像服务:{videotaskurl, json_data}") # 定义请求的 URL # 发送 GET 请求 response = requests.post(videotaskurl,json=json_data,timeout=(0.03, 0.03)) # 检查响应状态码 if response.status_code == 200: data = response.json() print(data) except Exception as e: self.logger.info(f"{self._thread_name}线程:调用录像时出错:地址:{videotaskurl}:{e}") def update_milvus(self,image_id,res_a,is_waning,similarity,rule_id_list): try: # 保存到milvus self.collection.upsert( data=[{ "id": image_id[0], "text_vector": res_a[0]["text_vector"], "is_waning": is_waning, "waning_value": similarity, "rule_id": rule_id_list, "zh_desc_class": res_a[0]['zh_desc_class'], # text_vector "bounding_box": res_a[0]['bounding_box'], # bounding_box "object_label": res_a[0]['object_label'], # desc "task_id": res_a[0]['task_id'], # task_id "event_level_id": res_a[0]['event_level_id'], # event_level_id "video_point_id": res_a[0]['video_point_id'], # video_point_id "detect_num": res_a[0]['detect_num'], "detect_id": res_a[0]['detect_id'], # detect_id "detect_time": res_a[0]['detect_time'], # detect_time "image_path": res_a[0]['image_path'], # image_path "video_path": res_a[0]['video_path'] # video_path }] ) except Exception as e: self.logger.info(f"{self._thread_name}线程:规则对比后修改数据时出错:{image_id}:是否预警{is_waning}:预警值:{similarity}:规则id:{rule_id_list}:数据集合:{len(res_a)} :{e}") def tark_do(): try : # 1. 从集合A获取向量和元数据 res_a = collection.query( expr=f"id == {image_id}", output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "object_label", "task_id", "event_level_id", "video_point_id", "detect_num", "is_waning", "waning_value", "rule_id", "detect_id", "detect_time", "image_path", "video_path"], consistency_level="Strong" ) # 图片和视频地址 image = Image.open(image_path) # 替换为您的图片路径 image = image.thumbnail((512, 512)) text = "描述这张图片的内容" # 处理输入 inputs = qwen_tokenizer(text=text, images=image, return_tensors="pt").to("cuda") # 生成输出 with torch.no_grad(): outputs = qwen_model.generate(**inputs, max_new_tokens=50) response = qwen_tokenizer.decode(outputs[0], skip_special_tokens=True) print(response) # 保存到milvus collection.upsert( data=[{ "id": image_id, "text_vector": res_a[0]["text_vector"], "is_waning": res_a[0]["is_waning"], "waning_value": res_a[0]["waning_value"], "rule_id": res_a[0]["rule_id"], "zh_desc_class": res_a[0]['zh_desc_class'], # text_vector "bounding_box": res_a[0]['bounding_box'], # bounding_box "object_label": f"{response}", # desc "task_id": res_a[0]['task_id'], # task_id "event_level_id": res_a[0]['event_level_id'], # event_level_id "video_point_id": res_a[0]['video_point_id'], # video_point_id "detect_num": res_a[0]['detect_num'], "detect_id": res_a[0]['detect_id'], # detect_id "detect_time": res_a[0]['detect_time'], # detect_time "image_path": res_a[0]['image_path'], # image_path "video_path": res_a[0]['video_path'] # video_path }] ) print(f"执行任务{image_id,image_path}:结束") return 1 except Exception as e: print(f"执行模型解析时出错:{image_id,image_path} :{e}") return 0 # 使用示例 if __name__ == "__main__": pool = ThreadPool() while True: try: pool.tark_do() except Exception as e: logging.info(f"主线程未知错误:{e}")