import os
|
import logging
|
from logging.handlers import RotatingFileHandler
|
|
import cv2
|
import torch
|
from PIL import Image
|
from pymilvus import connections, Collection
|
from transformers import AutoTokenizer, AutoModelForVision2Seq, AutoProcessor, BitsAndBytesConfig
|
from flask import Flask, request, jsonify
|
|
app = Flask(__name__)
|
#规则比对
|
def cross_collection_vector_compare(self,image_id,video_point_id,rule_id,video_image_time,frame_id,task_id,video_path,videotaskurl):
|
try:
|
# 1. 从集合A获取向量和元数据
|
res_a = self.collection.query(
|
expr=f"id in {image_id}",
|
output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "object_label", "task_id", "event_level_id",
|
"video_point_id", "detect_num", "detect_id", "detect_time", "image_path", "video_path"],
|
consistency_level="Strong"
|
)
|
|
# 2. 从集合B获取向量和元数据
|
res_b = self.collection_rule.query(
|
expr=f"rule_id in {rule_id}",
|
output_fields=["id", "zh_desc_class", "text_vector","range_value"],
|
consistency_level="Strong"
|
)
|
|
# 3. 计算两两相似度
|
results = []
|
pos_weights = {
|
('n', 'n'): 1.0, # 名词匹配
|
('v', 'v'): 0.8, # 动词匹配
|
('n', 'v'): 0.3 # 名词-动词交叉
|
}
|
# 循环结果集
|
for item_a in res_a:
|
if item_a["detect_num"]>0 : # 有检测到目标时进行比对
|
for item_b in res_b: # 和每个规则项进行对比
|
similarity = 0
|
# 1. 向量相似度(60%权重)
|
# 将向量转为numpy数组
|
vec_a = np.array(item_a["text_vector"])
|
vec_b = np.array(item_b["text_vector"])
|
assert vec_a.shape == vec_b.shape, f"维度不匹配: {vec_a.shape} vs {vec_b.shape}"
|
vec_a = vec_a.astype(np.float64) # 提升计算精度
|
vec_b = vec_b.astype(np.float64)
|
# 计算相似度
|
vector_sim = np.dot(vec_a, vec_b) / (np.linalg.norm(vec_a) * np.linalg.norm(vec_b))
|
|
# 2. 词性匹配度(40%权重)
|
desc_a = ast.literal_eval(item_a['zh_desc_class']) # 获取目标语义
|
desc_b = ast.literal_eval(item_b['zh_desc_class'])
|
for clas_a in desc_a: # 和每个目标语义进行对比
|
pos_match = 0
|
for (_, pos_a, _), (_, pos_b, _) in zip(clas_a, desc_b):
|
pos_match += pos_weights.get((pos_a, pos_b), 0)
|
pos_match /= max(len(clas_a), len(desc_b))
|
|
#按权重组装 当前相似度 并赋值最大值
|
stem_int = 0.6 * vector_sim + 0.4 * pos_match
|
if stem_int > similarity:
|
similarity = stem_int
|
|
# 3.相似度如果大于规则匹配阈值,组装预警数据
|
if similarity > item_b["range_value"]:
|
logging.info(f"相似度:{similarity}:{item_a['zh_desc_class']} {item_b['zh_desc_class']}")
|
results.append({
|
"a_id": item_a["id"],
|
"b_id": item_b["id"],
|
"similarity": round(float(similarity), 4)
|
})
|
|
# 4. 按相似度排序
|
if len(results) > 0:
|
comparison_results = sorted(results, key=lambda x: x["similarity"], reverse=True) # 集合排序
|
# 保存录像视频
|
asyncio.run(self.video_task(video_point_id,video_image_time,"basic",frame_id,video_path,videotaskurl))
|
# 保存到milvus
|
self.update_milvus(image_id,res_a,1,comparison_results[0]['similarity'],rule_id)
|
else:
|
self.update_milvus(image_id,res_a,0,0.0,rule_id)
|
except Exception as e:
|
self.logger.info(f"{video_point_id}线程:规则对比时出错:{image_id,rule_id}: {e}")
|
|
#启动录像
|
async def video_task(self,video_point_id,video_image_time,task_id,frame_id,video_path,videotaskurl):
|
try:
|
json_data = {
|
"cameraId": f"{video_point_id}",
|
"timestamp": video_image_time,
|
"preSeconds": 10,
|
"postSeconds": 10,
|
"taskId": task_id,
|
"fid": frame_id,
|
"uploadUrl": video_path,
|
"uploadType": "local"
|
}
|
self.logger.info(f"{video_point_id}线程:调用录像服务:{videotaskurl, json_data}")
|
|
# 定义请求的 URL
|
# 发送 GET 请求
|
response = requests.post(videotaskurl,json=json_data,timeout=(0.03, 0.03))
|
# 检查响应状态码
|
if response.status_code == 200:
|
data = response.json()
|
print(data)
|
except Exception as e:
|
self.logger.info(f"{self._thread_name}线程:调用录像时出错:地址:{videotaskurl}:{e}")
|
|
def update_milvus(self,image_id,res_a,is_waning,similarity,rule_id_list):
|
try:
|
# 保存到milvus
|
self.collection.upsert(
|
data=[{
|
"id": image_id[0],
|
"text_vector": res_a[0]["text_vector"],
|
"is_waning": is_waning,
|
"waning_value": similarity,
|
"rule_id": rule_id_list,
|
"zh_desc_class": res_a[0]['zh_desc_class'], # text_vector
|
"bounding_box": res_a[0]['bounding_box'], # bounding_box
|
"object_label": res_a[0]['object_label'], # desc
|
"task_id": res_a[0]['task_id'], # task_id
|
"event_level_id": res_a[0]['event_level_id'], # event_level_id
|
"video_point_id": res_a[0]['video_point_id'], # video_point_id
|
"detect_num": res_a[0]['detect_num'],
|
"detect_id": res_a[0]['detect_id'], # detect_id
|
"detect_time": res_a[0]['detect_time'], # detect_time
|
"image_path": res_a[0]['image_path'], # image_path
|
"video_path": res_a[0]['video_path'] # video_path
|
}]
|
)
|
except Exception as e:
|
self.logger.info(f"{self._thread_name}线程:规则对比后修改数据时出错:{image_id}:是否预警{is_waning}:预警值:{similarity}:规则id:{rule_id_list}:数据集合:{len(res_a)} :{e}")
|
|
def tark_do(image_id,image_path,milvusurl,milvusport):
|
try :
|
# 初始化qwenvl检测模型
|
qwen_tokenizer = AutoProcessor.from_pretrained("Qwen2-VL-2B", trust_remote_code=True)
|
qwen_model = AutoModelForVision2Seq.from_pretrained("Qwen2-VL-2B", device_map="auto", trust_remote_code=True).eval()
|
|
# 初始化Milvus集合
|
connections.connect("default", host=milvusurl, port=milvusport)
|
# 加载集合
|
collection = Collection(name="smartobject")
|
collection.load()
|
# 1. 从集合A获取向量和元数据
|
res_a = collection.query(
|
expr=f"id == {image_id}",
|
output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "object_label", "task_id", "event_level_id",
|
"video_point_id", "detect_num", "is_waning", "waning_value", "rule_id", "detect_id", "detect_time", "image_path", "video_path"],
|
consistency_level="Strong"
|
)
|
# 图片和视频地址
|
image = Image.open(image_path) # 替换为您的图片路径
|
image = image.thumbnail((512, 512))
|
text = "描述这张图片的内容"
|
# 处理输入
|
inputs = qwen_tokenizer(text=text, images=image, return_tensors="pt").to("cuda")
|
# 生成输出
|
with torch.no_grad():
|
outputs = qwen_model.generate(**inputs, max_new_tokens=50)
|
response = qwen_tokenizer.decode(outputs[0], skip_special_tokens=True)
|
print(response)
|
|
# 保存到milvus
|
collection.upsert(
|
data=[{
|
"id": image_id,
|
"text_vector": res_a[0]["text_vector"],
|
"is_waning": res_a[0]["is_waning"],
|
"waning_value": res_a[0]["waning_value"],
|
"rule_id": res_a[0]["rule_id"],
|
"zh_desc_class": res_a[0]['zh_desc_class'], # text_vector
|
"bounding_box": res_a[0]['bounding_box'], # bounding_box
|
"object_label": f"{response}", # desc
|
"task_id": res_a[0]['task_id'], # task_id
|
"event_level_id": res_a[0]['event_level_id'], # event_level_id
|
"video_point_id": res_a[0]['video_point_id'], # video_point_id
|
"detect_num": res_a[0]['detect_num'],
|
"detect_id": res_a[0]['detect_id'], # detect_id
|
"detect_time": res_a[0]['detect_time'], # detect_time
|
"image_path": res_a[0]['image_path'], # image_path
|
"video_path": res_a[0]['video_path'] # video_path
|
}]
|
)
|
print(f"执行任务{image_id,image_path}:结束")
|
return 1
|
except Exception as e:
|
print(f"执行模型解析时出错:{image_id,image_path} :{e}")
|
return 0
|
|
|
# 示例:文本处理接口
|
@app.route('/process', methods=['POST'])
|
def process_text():
|
data = request.json
|
# 调用您的处理函数(例如 Qwen2-VL 模型)
|
try:
|
return tark_do(data.get("image_id"),data.get("image_path"),data.get("milvusurl"),data.get("milvusport")) # 替换为实际函数
|
except Exception as e:
|
# 错误处理
|
return 0
|
|
if __name__ == '__main__':
|
app.run(host='0.0.0.0', port=5000, debug=True)
|