shidong
2025-07-08 0fb0c2c61ef6f48089b941f0e185e51288fe01b0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
 
import threading
import time as time_sel
from typing import Dict
 
import cv2
import numpy as np
 
import detect_task
from datetime import time
import requests
import os
from datetime import datetime
import torch
import logging
import jieba.posseg as pseg
from transformers import AutoModel, AutoTokenizer, AutoModelForCausalLM, AutoConfig, AutoProcessor, \
    AutoModelForVision2Seq
from pymilvus import connections, Collection
from logging.handlers import RotatingFileHandler
 
class ThreadPool:
    def __init__(self):
        #读取配置文件
        self.config = {}
        with open('./conf.txt', 'r', encoding='utf-8') as file:
            for line in file:
                # 去除每行的首尾空白字符(包括换行符)
                line = line.strip()
                # 跳过空行
                if not line:
                    continue
                # 分割键和值
                if '=' in line:
                    key, value = line.split('=', 1)
                    # 去除键和值的首尾空白字符
                    key = key.strip()
                    value = value.strip()
                    # 将键值对添加到字典中
                    self.config[key] = value
 
        self.threads: Dict[str, threading.Thread] = {}
        self.lock = threading.Lock()
        # 配置日志
        # 确保日志目录存在
        log_dir = "logs"
        os.makedirs(log_dir, exist_ok=True)
 
 
        self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
 
        # 初始化向量模型
        self.qwen_tokenizer = AutoProcessor.from_pretrained("Qwen2-VL-2B", trust_remote_code=True)
        self.qwen_model = AutoModelForVision2Seq.from_pretrained("Qwen2-VL-2B", device_map="auto",
                                                            trust_remote_code=True).eval()
        # 初始化Milvus集合
        connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport"))
        # 加载集合
        self.collection = Collection(name="smartobject")
        self.collection.load()
 
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S',
            handlers=[
                # 按大小轮转的日志文件(最大10MB,保留3个备份)
                RotatingFileHandler(
                    filename=os.path.join(log_dir, 'start_log.log'),
                    maxBytes=10 * 1024 * 1024,  # 10MB
                    backupCount=3,
                    encoding='utf-8'
                ),
                # 同时输出到控制台
                logging.StreamHandler()
            ]
        )
 
    def cross_collection_vector_compare(self,image_id,video_point_id,rule_id,video_image_time,frame_id,task_id,video_path,videotaskurl):
        try:
            # 1. 从集合A获取向量和元数据
            res_a = self.collection.query(
                expr=f"id in {image_id}",
                output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "object_label", "task_id", "event_level_id",
                               "video_point_id", "detect_num", "detect_id", "detect_time", "image_path", "video_path"],
                consistency_level="Strong"
            )
 
            # 2. 从集合B获取向量和元数据
            res_b = self.collection_rule.query(
                expr=f"rule_id in {rule_id}",
                output_fields=["id", "zh_desc_class", "text_vector","range_value"],
                consistency_level="Strong"
            )
 
            # 3. 计算两两相似度
            results = []
            pos_weights = {
                ('n', 'n'): 1.0,  # 名词匹配
                ('v', 'v'): 0.8,  # 动词匹配
                ('n', 'v'): 0.3  # 名词-动词交叉
            }
            # 循环结果集
            for item_a in res_a:
                if item_a["detect_num"]>0 : # 有检测到目标时进行比对
                    for item_b in res_b: # 和每个规则项进行对比
                        similarity = 0
                        # 1. 向量相似度(60%权重)
                        # 将向量转为numpy数组
                        vec_a = np.array(item_a["text_vector"])
                        vec_b = np.array(item_b["text_vector"])
                        assert vec_a.shape == vec_b.shape, f"维度不匹配: {vec_a.shape} vs {vec_b.shape}"
                        vec_a = vec_a.astype(np.float64)  # 提升计算精度
                        vec_b = vec_b.astype(np.float64)
                        # 计算相似度
                        vector_sim = np.dot(vec_a, vec_b) / (np.linalg.norm(vec_a) * np.linalg.norm(vec_b))
 
                        # 2. 词性匹配度(40%权重)
                        desc_a = ast.literal_eval(item_a['zh_desc_class'])  # 获取目标语义
                        desc_b = ast.literal_eval(item_b['zh_desc_class'])
                        for clas_a in desc_a:   # 和每个目标语义进行对比
                            pos_match = 0
                            for (_, pos_a, _), (_, pos_b, _) in zip(clas_a, desc_b):
                                pos_match += pos_weights.get((pos_a, pos_b), 0)
                            pos_match /= max(len(clas_a), len(desc_b))
 
                            #按权重组装 当前相似度 并赋值最大值
                            stem_int = 0.6 * vector_sim + 0.4 * pos_match
                            if stem_int > similarity:
                                similarity = stem_int
 
                        # 3.相似度如果大于规则匹配阈值,组装预警数据
                        if similarity > item_b["range_value"]:
                            logging.info(f"相似度:{similarity}:{item_a['zh_desc_class']} {item_b['zh_desc_class']}")
                            results.append({
                                "a_id": item_a["id"],
                                "b_id": item_b["id"],
                                "similarity": round(float(similarity), 4)
                            })
 
            # 4. 按相似度排序
            if len(results) > 0:
                comparison_results = sorted(results, key=lambda x: x["similarity"], reverse=True) # 集合排序
                # 保存录像视频
                asyncio.run(self.video_task(video_point_id,video_image_time,"basic",frame_id,video_path,videotaskurl))
                # 保存到milvus
                self.update_milvus(image_id,res_a,1,comparison_results[0]['similarity'],rule_id)
            else:
                self.update_milvus(image_id,res_a,0,0.0,rule_id)
        except Exception as e:
            self.logger.info(f"{video_point_id}线程:规则对比时出错:{image_id,rule_id}: {e}")
 
    #启动录像
    async def video_task(self,video_point_id,video_image_time,task_id,frame_id,video_path,videotaskurl):
        try:
            json_data = {
                "cameraId": f"{video_point_id}",
                "timestamp": video_image_time,
                "preSeconds": 10,
                "postSeconds": 10,
                "taskId": task_id,
                "fid": frame_id,
                "uploadUrl": video_path,
                "uploadType": "local"
            }
            self.logger.info(f"{video_point_id}线程:调用录像服务:{videotaskurl, json_data}")
 
            # 定义请求的 URL
            # 发送 GET 请求
            response = requests.post(videotaskurl,json=json_data,timeout=(0.03, 0.03))
            # 检查响应状态码
            if response.status_code == 200:
                data = response.json()
                print(data)
        except Exception as e:
            self.logger.info(f"{self._thread_name}线程:调用录像时出错:地址:{videotaskurl}:{e}")
 
    def update_milvus(self,image_id,res_a,is_waning,similarity,rule_id_list):
        try:
            # 保存到milvus
            self.collection.upsert(
                data=[{
                    "id": image_id[0],
                    "text_vector": res_a[0]["text_vector"],
                    "is_waning": is_waning,
                    "waning_value": similarity,
                    "rule_id": rule_id_list,
                    "zh_desc_class": res_a[0]['zh_desc_class'],  # text_vector
                    "bounding_box": res_a[0]['bounding_box'],  # bounding_box
                    "object_label": res_a[0]['object_label'],  # desc
                    "task_id": res_a[0]['task_id'],  # task_id
                    "event_level_id": res_a[0]['event_level_id'],  # event_level_id
                    "video_point_id": res_a[0]['video_point_id'],  # video_point_id
                    "detect_num": res_a[0]['detect_num'],
                    "detect_id": res_a[0]['detect_id'],  # detect_id
                    "detect_time": res_a[0]['detect_time'],  # detect_time
                    "image_path": res_a[0]['image_path'],  # image_path
                    "video_path": res_a[0]['video_path']  # video_path
                }]
            )
        except Exception as e:
            self.logger.info(f"{self._thread_name}线程:规则对比后修改数据时出错:{image_id}:是否预警{is_waning}:预警值:{similarity}:规则id:{rule_id_list}:数据集合:{len(res_a)} :{e}")
 
    def tark_do():
        try :
            # 1. 从集合A获取向量和元数据
            res_a = collection.query(
                expr=f"id == {image_id}",
                output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "object_label", "task_id", "event_level_id",
                               "video_point_id", "detect_num", "is_waning", "waning_value",  "rule_id", "detect_id", "detect_time", "image_path", "video_path"],
                consistency_level="Strong"
            )
            # 图片和视频地址
            image = Image.open(image_path)  # 替换为您的图片路径
            image = image.thumbnail((512, 512))
            text = "描述这张图片的内容"
            # 处理输入
            inputs = qwen_tokenizer(text=text, images=image, return_tensors="pt").to("cuda")
            # 生成输出
            with torch.no_grad():
                outputs = qwen_model.generate(**inputs, max_new_tokens=50)
            response = qwen_tokenizer.decode(outputs[0], skip_special_tokens=True)
            print(response)
 
            # 保存到milvus
            collection.upsert(
                data=[{
                    "id": image_id,
                    "text_vector": res_a[0]["text_vector"],
                    "is_waning": res_a[0]["is_waning"],
                    "waning_value": res_a[0]["waning_value"],
                    "rule_id": res_a[0]["rule_id"],
                    "zh_desc_class": res_a[0]['zh_desc_class'],  # text_vector
                    "bounding_box": res_a[0]['bounding_box'],  # bounding_box
                    "object_label": f"{response}",  # desc
                    "task_id": res_a[0]['task_id'],  # task_id
                    "event_level_id": res_a[0]['event_level_id'],  # event_level_id
                    "video_point_id": res_a[0]['video_point_id'],  # video_point_id
                    "detect_num": res_a[0]['detect_num'],
                    "detect_id": res_a[0]['detect_id'],  # detect_id
                    "detect_time": res_a[0]['detect_time'],  # detect_time
                    "image_path": res_a[0]['image_path'],  # image_path
                    "video_path": res_a[0]['video_path']  # video_path
                }]
            )
            print(f"执行任务{image_id,image_path}:结束")
            return 1
        except Exception as e:
            print(f"执行模型解析时出错:{image_id,image_path} :{e}")
            return 0
 
 
# 使用示例
if __name__ == "__main__":
    pool = ThreadPool()
    while True:
        try:
            pool.tark_do()
        except Exception as e:
            logging.info(f"主线程未知错误:{e}")