|
import threading
|
import time as time_sel
|
from typing import Dict
|
|
import cv2
|
import numpy as np
|
|
import detect_task
|
from datetime import time
|
import requests
|
import os
|
from datetime import datetime
|
import torch
|
import logging
|
import jieba.posseg as pseg
|
from transformers import AutoModel, AutoTokenizer, AutoModelForCausalLM, AutoConfig, AutoProcessor, \
|
AutoModelForVision2Seq
|
from pymilvus import connections, Collection
|
from logging.handlers import RotatingFileHandler
|
|
class ThreadPool:
|
def __init__(self):
|
#读取配置文件
|
self.config = {}
|
with open('./conf.txt', 'r', encoding='utf-8') as file:
|
for line in file:
|
# 去除每行的首尾空白字符(包括换行符)
|
line = line.strip()
|
# 跳过空行
|
if not line:
|
continue
|
# 分割键和值
|
if '=' in line:
|
key, value = line.split('=', 1)
|
# 去除键和值的首尾空白字符
|
key = key.strip()
|
value = value.strip()
|
# 将键值对添加到字典中
|
self.config[key] = value
|
|
self.threads: Dict[str, threading.Thread] = {}
|
self.lock = threading.Lock()
|
# 配置日志
|
# 确保日志目录存在
|
log_dir = "logs"
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
|
|
# 初始化向量模型
|
self.qwen_tokenizer = AutoProcessor.from_pretrained("Qwen2-VL-2B", trust_remote_code=True)
|
self.qwen_model = AutoModelForVision2Seq.from_pretrained("Qwen2-VL-2B", device_map="auto",
|
trust_remote_code=True).eval()
|
# 初始化Milvus集合
|
connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport"))
|
# 加载集合
|
self.collection = Collection(name="smartobject")
|
self.collection.load()
|
|
# 配置日志
|
logging.basicConfig(
|
level=logging.INFO,
|
format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s',
|
datefmt='%Y-%m-%d %H:%M:%S',
|
handlers=[
|
# 按大小轮转的日志文件(最大10MB,保留3个备份)
|
RotatingFileHandler(
|
filename=os.path.join(log_dir, 'start_log.log'),
|
maxBytes=10 * 1024 * 1024, # 10MB
|
backupCount=3,
|
encoding='utf-8'
|
),
|
# 同时输出到控制台
|
logging.StreamHandler()
|
]
|
)
|
|
def cross_collection_vector_compare(self,image_id,video_point_id,rule_id,video_image_time,frame_id,task_id,video_path,videotaskurl):
|
try:
|
# 1. 从集合A获取向量和元数据
|
res_a = self.collection.query(
|
expr=f"id in {image_id}",
|
output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "object_label", "task_id", "event_level_id",
|
"video_point_id", "detect_num", "detect_id", "detect_time", "image_path", "video_path"],
|
consistency_level="Strong"
|
)
|
|
# 2. 从集合B获取向量和元数据
|
res_b = self.collection_rule.query(
|
expr=f"rule_id in {rule_id}",
|
output_fields=["id", "zh_desc_class", "text_vector","range_value"],
|
consistency_level="Strong"
|
)
|
|
# 3. 计算两两相似度
|
results = []
|
pos_weights = {
|
('n', 'n'): 1.0, # 名词匹配
|
('v', 'v'): 0.8, # 动词匹配
|
('n', 'v'): 0.3 # 名词-动词交叉
|
}
|
# 循环结果集
|
for item_a in res_a:
|
if item_a["detect_num"]>0 : # 有检测到目标时进行比对
|
for item_b in res_b: # 和每个规则项进行对比
|
similarity = 0
|
# 1. 向量相似度(60%权重)
|
# 将向量转为numpy数组
|
vec_a = np.array(item_a["text_vector"])
|
vec_b = np.array(item_b["text_vector"])
|
assert vec_a.shape == vec_b.shape, f"维度不匹配: {vec_a.shape} vs {vec_b.shape}"
|
vec_a = vec_a.astype(np.float64) # 提升计算精度
|
vec_b = vec_b.astype(np.float64)
|
# 计算相似度
|
vector_sim = np.dot(vec_a, vec_b) / (np.linalg.norm(vec_a) * np.linalg.norm(vec_b))
|
|
# 2. 词性匹配度(40%权重)
|
desc_a = ast.literal_eval(item_a['zh_desc_class']) # 获取目标语义
|
desc_b = ast.literal_eval(item_b['zh_desc_class'])
|
for clas_a in desc_a: # 和每个目标语义进行对比
|
pos_match = 0
|
for (_, pos_a, _), (_, pos_b, _) in zip(clas_a, desc_b):
|
pos_match += pos_weights.get((pos_a, pos_b), 0)
|
pos_match /= max(len(clas_a), len(desc_b))
|
|
#按权重组装 当前相似度 并赋值最大值
|
stem_int = 0.6 * vector_sim + 0.4 * pos_match
|
if stem_int > similarity:
|
similarity = stem_int
|
|
# 3.相似度如果大于规则匹配阈值,组装预警数据
|
if similarity > item_b["range_value"]:
|
logging.info(f"相似度:{similarity}:{item_a['zh_desc_class']} {item_b['zh_desc_class']}")
|
results.append({
|
"a_id": item_a["id"],
|
"b_id": item_b["id"],
|
"similarity": round(float(similarity), 4)
|
})
|
|
# 4. 按相似度排序
|
if len(results) > 0:
|
comparison_results = sorted(results, key=lambda x: x["similarity"], reverse=True) # 集合排序
|
# 保存录像视频
|
asyncio.run(self.video_task(video_point_id,video_image_time,"basic",frame_id,video_path,videotaskurl))
|
# 保存到milvus
|
self.update_milvus(image_id,res_a,1,comparison_results[0]['similarity'],rule_id)
|
else:
|
self.update_milvus(image_id,res_a,0,0.0,rule_id)
|
except Exception as e:
|
self.logger.info(f"{video_point_id}线程:规则对比时出错:{image_id,rule_id}: {e}")
|
|
#启动录像
|
async def video_task(self,video_point_id,video_image_time,task_id,frame_id,video_path,videotaskurl):
|
try:
|
json_data = {
|
"cameraId": f"{video_point_id}",
|
"timestamp": video_image_time,
|
"preSeconds": 10,
|
"postSeconds": 10,
|
"taskId": task_id,
|
"fid": frame_id,
|
"uploadUrl": video_path,
|
"uploadType": "local"
|
}
|
self.logger.info(f"{video_point_id}线程:调用录像服务:{videotaskurl, json_data}")
|
|
# 定义请求的 URL
|
# 发送 GET 请求
|
response = requests.post(videotaskurl,json=json_data,timeout=(0.03, 0.03))
|
# 检查响应状态码
|
if response.status_code == 200:
|
data = response.json()
|
print(data)
|
except Exception as e:
|
self.logger.info(f"{self._thread_name}线程:调用录像时出错:地址:{videotaskurl}:{e}")
|
|
def update_milvus(self,image_id,res_a,is_waning,similarity,rule_id_list):
|
try:
|
# 保存到milvus
|
self.collection.upsert(
|
data=[{
|
"id": image_id[0],
|
"text_vector": res_a[0]["text_vector"],
|
"is_waning": is_waning,
|
"waning_value": similarity,
|
"rule_id": rule_id_list,
|
"zh_desc_class": res_a[0]['zh_desc_class'], # text_vector
|
"bounding_box": res_a[0]['bounding_box'], # bounding_box
|
"object_label": res_a[0]['object_label'], # desc
|
"task_id": res_a[0]['task_id'], # task_id
|
"event_level_id": res_a[0]['event_level_id'], # event_level_id
|
"video_point_id": res_a[0]['video_point_id'], # video_point_id
|
"detect_num": res_a[0]['detect_num'],
|
"detect_id": res_a[0]['detect_id'], # detect_id
|
"detect_time": res_a[0]['detect_time'], # detect_time
|
"image_path": res_a[0]['image_path'], # image_path
|
"video_path": res_a[0]['video_path'] # video_path
|
}]
|
)
|
except Exception as e:
|
self.logger.info(f"{self._thread_name}线程:规则对比后修改数据时出错:{image_id}:是否预警{is_waning}:预警值:{similarity}:规则id:{rule_id_list}:数据集合:{len(res_a)} :{e}")
|
|
def tark_do():
|
try :
|
# 1. 从集合A获取向量和元数据
|
res_a = collection.query(
|
expr=f"id == {image_id}",
|
output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "object_label", "task_id", "event_level_id",
|
"video_point_id", "detect_num", "is_waning", "waning_value", "rule_id", "detect_id", "detect_time", "image_path", "video_path"],
|
consistency_level="Strong"
|
)
|
# 图片和视频地址
|
image = Image.open(image_path) # 替换为您的图片路径
|
image = image.thumbnail((512, 512))
|
text = "描述这张图片的内容"
|
# 处理输入
|
inputs = qwen_tokenizer(text=text, images=image, return_tensors="pt").to("cuda")
|
# 生成输出
|
with torch.no_grad():
|
outputs = qwen_model.generate(**inputs, max_new_tokens=50)
|
response = qwen_tokenizer.decode(outputs[0], skip_special_tokens=True)
|
print(response)
|
|
# 保存到milvus
|
collection.upsert(
|
data=[{
|
"id": image_id,
|
"text_vector": res_a[0]["text_vector"],
|
"is_waning": res_a[0]["is_waning"],
|
"waning_value": res_a[0]["waning_value"],
|
"rule_id": res_a[0]["rule_id"],
|
"zh_desc_class": res_a[0]['zh_desc_class'], # text_vector
|
"bounding_box": res_a[0]['bounding_box'], # bounding_box
|
"object_label": f"{response}", # desc
|
"task_id": res_a[0]['task_id'], # task_id
|
"event_level_id": res_a[0]['event_level_id'], # event_level_id
|
"video_point_id": res_a[0]['video_point_id'], # video_point_id
|
"detect_num": res_a[0]['detect_num'],
|
"detect_id": res_a[0]['detect_id'], # detect_id
|
"detect_time": res_a[0]['detect_time'], # detect_time
|
"image_path": res_a[0]['image_path'], # image_path
|
"video_path": res_a[0]['video_path'] # video_path
|
}]
|
)
|
print(f"执行任务{image_id,image_path}:结束")
|
return 1
|
except Exception as e:
|
print(f"执行模型解析时出错:{image_id,image_path} :{e}")
|
return 0
|
|
|
# 使用示例
|
if __name__ == "__main__":
|
pool = ThreadPool()
|
while True:
|
try:
|
pool.tark_do()
|
except Exception as e:
|
logging.info(f"主线程未知错误:{e}")
|