| | |
| | | # 禁用tokenizer并行(必须放在所有import之前) |
| | | import torch |
| | | from operator import itemgetter |
| | | import threading |
| | | import time as time_sel |
| | | from typing import Dict |
| | | import qwen_task |
| | | from qwen_thread import qwen_thread |
| | | import requests |
| | | import os |
| | | |
| | | import logging |
| | | from transformers import AutoProcessor, AutoModelForVision2Seq |
| | | from pymilvus import connections, Collection |
| | | from logging.handlers import RotatingFileHandler |
| | | import get_mem |
| | | |
| | | from multiprocessing import Process |
| | | |
| | | class ThreadPool: |
| | | def __init__(self): |
| | |
| | | value = value.strip() |
| | | # 将键值对添加到字典中 |
| | | self.config[key] = value |
| | | # 配置日志 |
| | | # 确保日志目录存在 |
| | | log_dir = "logs" |
| | | os.makedirs(log_dir, exist_ok=True) |
| | | self.threads: Dict[str, threading.Thread] = {} |
| | | self.lock = threading.Lock() |
| | | |
| | | self.device = "cuda" if torch.cuda.is_available() else "cpu" |
| | | |
| | | # 初始化qwenvl检测模型 |
| | | self.qwen_model = AutoModelForVision2Seq.from_pretrained( |
| | | "./Qwen2-VL-2B-Instruct", device_map="auto", |
| | | trust_remote_code=True, |
| | | torch_dtype=torch.float16 |
| | | ) |
| | | |
| | | # default processer |
| | | self.qwen_tokenizer = AutoProcessor.from_pretrained("./Qwen2-VL-2B-Instruct") |
| | | |
| | | # 初始化Milvus集合 |
| | | connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport")) |
| | | # 加载集合 |
| | | self.collection = Collection(name="smartobject") |
| | | self.collection.load() |
| | | |
| | | #是否更新 |
| | | self._isupdate = False |
| | | |
| | | # 初始化共享内存 |
| | | get_mem.smem_init() |
| | | |
| | | # 配置日志 |
| | | # 创建实例专属logger |
| | | os.makedirs(self.config.get("logaddr"), exist_ok=True) |
| | | logging.basicConfig( |
| | | level=logging.INFO, |
| | | format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s', |
| | |
| | | handlers=[ |
| | | # 按大小轮转的日志文件(最大10MB,保留3个备份) |
| | | RotatingFileHandler( |
| | | filename=os.path.join(log_dir, 'start_log.log'), |
| | | filename=os.path.join(self.config.get("logaddr"), 'qwen_log.log'), |
| | | maxBytes=10 * 1024 * 1024, # 10MB |
| | | backupCount=3, |
| | | encoding='utf-8' |
| | |
| | | logging.StreamHandler() |
| | | ] |
| | | ) |
| | | self.logger = logging.getLogger(f"{self.__class__}_{id(self)}") |
| | | self.logger.setLevel(logging.INFO) |
| | | |
| | | |
| | | self.threads: Dict[str, threading.Thread] = {} |
| | | self.lock = threading.Lock() |
| | | |
| | | # 初始化Milvus集合 |
| | | connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport")) |
| | | # 加载集合 |
| | | self.collection = Collection(name="smartobject") |
| | | self.collection.load() |
| | | #创建qwen线程池 |
| | | self.pool = qwen_thread(self.config,self.logger) |
| | | #是否更新 |
| | | self._isupdate = False |
| | | |
| | | # 初始化共享内存 |
| | | get_mem.smem_init() |
| | | |
| | | |
| | | #启动线程 |
| | | def safe_start(self, target_func, camera_id): |
| | |
| | | |
| | | # 启动线程任务 |
| | | def worker(self, camera_id): |
| | | # 初始化 |
| | | detect = qwen_task.detect_tasks() |
| | | detect._thread_name = f"{camera_id}" # 设置名称 |
| | | detect.init_logging(f"{camera_id}") |
| | | # 初始化检测模型 |
| | | detect.device = self.device |
| | | # 初始化qwen模型 |
| | | detect.qwen_tokenizer = self.qwen_tokenizer |
| | | detect.qwen_model = self.qwen_model |
| | | |
| | | # 初始化Milvus集合 |
| | | detect.collection = self.collection |
| | | |
| | | # 设置线程启动状态 |
| | | detect._running = True |
| | | while True: |
| | | try: |
| | | res_a = self.collection.query( |
| | | expr=f"is_desc == 0 and video_point_id=={camera_id}", |
| | | output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id", |
| | | "task_name", "event_level_id", "event_level_name", |
| | | "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id", |
| | | "detect_id","knowledge_id","suggestion","risk_description", |
| | | "detect_time", "image_path", "image_desc_path", "video_path"], |
| | | consistency_level="Strong", |
| | | order_by_field="id", # 按id字段排序 |
| | | order_by_type="desc" # 降序排列 |
| | | ) |
| | | # 读取共享内存中的图片 |
| | | image_id = get_mem.smem_read_frame_qianwen(camera_id) |
| | | if image_id > 0: |
| | | logging.info(f"读取图像成功: {image_id}") |
| | | image_id = detect.tark_do(image_id,self.config.get("filesavepath"),self.config.get("ragurl"),self.config.get("max_tokens")) |
| | | logging.info(f"处理图像成功: {image_id}") |
| | | # image_id = get_mem.smem_read_frame_qianwen(camera_id) |
| | | if len(res_a) > 0: |
| | | sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True) |
| | | res = sorted_results[0] |
| | | self.collection.delete(f"id == {res['id']}") |
| | | # 数据组 |
| | | data = { |
| | | "event_level_id": res['event_level_id'], # event_level_id |
| | | "event_level_name": res['event_level_name'], # event_level_id |
| | | "rule_id": res["rule_id"], |
| | | "video_point_id": res['video_point_id'], # video_point_id |
| | | "video_point_name": res['video_point_name'], |
| | | "is_waning": 0, |
| | | "is_desc": 1, |
| | | "zh_desc_class": "", |
| | | "bounding_box": res['bounding_box'], # bounding_box |
| | | "task_id": res['task_id'], # task_id |
| | | "task_name": res['task_name'], # task_id |
| | | "detect_id": res['detect_id'], # detect_id |
| | | "detect_time": res['detect_time'], # detect_time |
| | | "detect_num": res['detect_num'], |
| | | "waning_value": res['waning_value'], |
| | | "image_path": res['image_path'], # image_path |
| | | "image_desc_path": res['image_desc_path'], # image_desc_path |
| | | "video_path": res['video_path'], |
| | | "text_vector": res['text_vector'], |
| | | "risk_description": res['risk_description'], |
| | | "suggestion": res['suggestion'], |
| | | "knowledge_id": res['knowledge_id'] |
| | | } |
| | | # 保存到milvus |
| | | image_id = self.collection.insert(data).primary_keys |
| | | res['id'] = image_id[0] |
| | | self.pool.submit(res) |
| | | time_sel.sleep(0.01) |
| | | except Exception as e: |
| | | logging.info(f"{detect._thread_name}线程错误:{e}") |
| | | logging.info(f"{camera_id}线程错误:{e}") |
| | | |
| | | |
| | | #调用是否需要更新 |
| | | def isUpdate(self): |