shidong
2025-07-22 2c3655dcfa20d6ac36219b879debef5590f936d4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import os
import logging
from logging.handlers import RotatingFileHandler
 
import cv2
import torch
from PIL import Image
from pymilvus import connections, Collection
from transformers import AutoTokenizer, AutoModelForVision2Seq, AutoProcessor, BitsAndBytesConfig
from flask import Flask, request, jsonify
 
app = Flask(__name__)
#规则比对
def cross_collection_vector_compare(self,image_id,video_point_id,rule_id,video_image_time,frame_id,task_id,video_path,videotaskurl):
    try:
        # 1. 从集合A获取向量和元数据
        res_a = self.collection.query(
            expr=f"id in {image_id}",
            output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "object_label", "task_id", "event_level_id",
                           "video_point_id", "detect_num", "detect_id", "detect_time", "image_path", "video_path"],
            consistency_level="Strong"
        )
 
        # 2. 从集合B获取向量和元数据
        res_b = self.collection_rule.query(
            expr=f"rule_id in {rule_id}",
            output_fields=["id", "zh_desc_class", "text_vector","range_value"],
            consistency_level="Strong"
        )
 
        # 3. 计算两两相似度
        results = []
        pos_weights = {
            ('n', 'n'): 1.0,  # 名词匹配
            ('v', 'v'): 0.8,  # 动词匹配
            ('n', 'v'): 0.3  # 名词-动词交叉
        }
        # 循环结果集
        for item_a in res_a:
            if item_a["detect_num"]>0 : # 有检测到目标时进行比对
                for item_b in res_b: # 和每个规则项进行对比
                    similarity = 0
                    # 1. 向量相似度(60%权重)
                    # 将向量转为numpy数组
                    vec_a = np.array(item_a["text_vector"])
                    vec_b = np.array(item_b["text_vector"])
                    assert vec_a.shape == vec_b.shape, f"维度不匹配: {vec_a.shape} vs {vec_b.shape}"
                    vec_a = vec_a.astype(np.float64)  # 提升计算精度
                    vec_b = vec_b.astype(np.float64)
                    # 计算相似度
                    vector_sim = np.dot(vec_a, vec_b) / (np.linalg.norm(vec_a) * np.linalg.norm(vec_b))
 
                    # 2. 词性匹配度(40%权重)
                    desc_a = ast.literal_eval(item_a['zh_desc_class'])  # 获取目标语义
                    desc_b = ast.literal_eval(item_b['zh_desc_class'])
                    for clas_a in desc_a:   # 和每个目标语义进行对比
                        pos_match = 0
                        for (_, pos_a, _), (_, pos_b, _) in zip(clas_a, desc_b):
                            pos_match += pos_weights.get((pos_a, pos_b), 0)
                        pos_match /= max(len(clas_a), len(desc_b))
 
                        #按权重组装 当前相似度 并赋值最大值
                        stem_int = 0.6 * vector_sim + 0.4 * pos_match
                        if stem_int > similarity:
                            similarity = stem_int
 
                    # 3.相似度如果大于规则匹配阈值,组装预警数据
                    if similarity > item_b["range_value"]:
                        logging.info(f"相似度:{similarity}:{item_a['zh_desc_class']} {item_b['zh_desc_class']}")
                        results.append({
                            "a_id": item_a["id"],
                            "b_id": item_b["id"],
                            "similarity": round(float(similarity), 4)
                        })
 
        # 4. 按相似度排序
        if len(results) > 0:
            comparison_results = sorted(results, key=lambda x: x["similarity"], reverse=True) # 集合排序
            # 保存录像视频
            asyncio.run(self.video_task(video_point_id,video_image_time,"basic",frame_id,video_path,videotaskurl))
            # 保存到milvus
            self.update_milvus(image_id,res_a,1,comparison_results[0]['similarity'],rule_id)
        else:
            self.update_milvus(image_id,res_a,0,0.0,rule_id)
    except Exception as e:
        self.logger.info(f"{video_point_id}线程:规则对比时出错:{image_id,rule_id}: {e}")
 
#启动录像
async def video_task(self,video_point_id,video_image_time,task_id,frame_id,video_path,videotaskurl):
    try:
        json_data = {
            "cameraId": f"{video_point_id}",
            "timestamp": video_image_time,
            "preSeconds": 10,
            "postSeconds": 10,
            "taskId": task_id,
            "fid": frame_id,
            "uploadUrl": video_path,
            "uploadType": "local"
        }
        self.logger.info(f"{video_point_id}线程:调用录像服务:{videotaskurl, json_data}")
 
        # 定义请求的 URL
        # 发送 GET 请求
        response = requests.post(videotaskurl,json=json_data,timeout=(0.03, 0.03))
        # 检查响应状态码
        if response.status_code == 200:
            data = response.json()
            print(data)
    except Exception as e:
        self.logger.info(f"{self._thread_name}线程:调用录像时出错:地址:{videotaskurl}:{e}")
 
def update_milvus(self,image_id,res_a,is_waning,similarity,rule_id_list):
    try:
        # 保存到milvus
        self.collection.upsert(
            data=[{
                "id": image_id[0],
                "text_vector": res_a[0]["text_vector"],
                "is_waning": is_waning,
                "waning_value": similarity,
                "rule_id": rule_id_list,
                "zh_desc_class": res_a[0]['zh_desc_class'],  # text_vector
                "bounding_box": res_a[0]['bounding_box'],  # bounding_box
                "object_label": res_a[0]['object_label'],  # desc
                "task_id": res_a[0]['task_id'],  # task_id
                "event_level_id": res_a[0]['event_level_id'],  # event_level_id
                "video_point_id": res_a[0]['video_point_id'],  # video_point_id
                "detect_num": res_a[0]['detect_num'],
                "detect_id": res_a[0]['detect_id'],  # detect_id
                "detect_time": res_a[0]['detect_time'],  # detect_time
                "image_path": res_a[0]['image_path'],  # image_path
                "video_path": res_a[0]['video_path']  # video_path
            }]
        )
    except Exception as e:
        self.logger.info(f"{self._thread_name}线程:规则对比后修改数据时出错:{image_id}:是否预警{is_waning}:预警值:{similarity}:规则id:{rule_id_list}:数据集合:{len(res_a)} :{e}")
 
def tark_do(image_id,image_path,milvusurl,milvusport):
    try :
        # 初始化qwenvl检测模型
        qwen_tokenizer = AutoProcessor.from_pretrained("Qwen2-VL-2B", trust_remote_code=True)
        qwen_model = AutoModelForVision2Seq.from_pretrained("Qwen2-VL-2B",  device_map="auto", trust_remote_code=True).eval()
 
        # 初始化Milvus集合
        connections.connect("default", host=milvusurl, port=milvusport)
        # 加载集合
        collection = Collection(name="smartobject")
        collection.load()
        # 1. 从集合A获取向量和元数据
        res_a = collection.query(
            expr=f"id == {image_id}",
            output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "object_label", "task_id", "event_level_id",
                           "video_point_id", "detect_num", "is_waning", "waning_value",  "rule_id", "detect_id", "detect_time", "image_path", "video_path"],
            consistency_level="Strong"
        )
        # 图片和视频地址
        image = Image.open(image_path)  # 替换为您的图片路径
        image = image.thumbnail((512, 512))
        text = "描述这张图片的内容"
        # 处理输入
        inputs = qwen_tokenizer(text=text, images=image, return_tensors="pt").to("cuda")
        # 生成输出
        with torch.no_grad():
            outputs = qwen_model.generate(**inputs, max_new_tokens=50)
        response = qwen_tokenizer.decode(outputs[0], skip_special_tokens=True)
        print(response)
 
        # 保存到milvus
        collection.upsert(
            data=[{
                "id": image_id,
                "text_vector": res_a[0]["text_vector"],
                "is_waning": res_a[0]["is_waning"],
                "waning_value": res_a[0]["waning_value"],
                "rule_id": res_a[0]["rule_id"],
                "zh_desc_class": res_a[0]['zh_desc_class'],  # text_vector
                "bounding_box": res_a[0]['bounding_box'],  # bounding_box
                "object_label": f"{response}",  # desc
                "task_id": res_a[0]['task_id'],  # task_id
                "event_level_id": res_a[0]['event_level_id'],  # event_level_id
                "video_point_id": res_a[0]['video_point_id'],  # video_point_id
                "detect_num": res_a[0]['detect_num'],
                "detect_id": res_a[0]['detect_id'],  # detect_id
                "detect_time": res_a[0]['detect_time'],  # detect_time
                "image_path": res_a[0]['image_path'],  # image_path
                "video_path": res_a[0]['video_path']  # video_path
            }]
        )
        print(f"执行任务{image_id,image_path}:结束")
        return 1
    except Exception as e:
        print(f"执行模型解析时出错:{image_id,image_path} :{e}")
        return 0
 
 
# 示例:文本处理接口
@app.route('/process', methods=['POST'])
def process_text():
    data = request.json
    # 调用您的处理函数(例如 Qwen2-VL 模型)
    try:
        return tark_do(data.get("image_id"),data.get("image_path"),data.get("milvusurl"),data.get("milvusport"))  # 替换为实际函数
    except Exception as e:
        # 错误处理
        return 0
 
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)