zhm
2025-07-24 028e39f85bf4115024d0467613f26a1750ff004e
qwen_detect.py
@@ -1,5 +1,4 @@
from operator import itemgetter
import torch
import threading
import time as time_sel
from typing import Dict
@@ -7,10 +6,11 @@
import requests
import os
import logging
from transformers import AutoProcessor, AutoModelForVision2Seq
from pymilvus import connections, Collection
from logging.handlers import RotatingFileHandler
import get_mem
from qwen_thread_description import qwen_thread_description
class ThreadPool:
    def __init__(self):
@@ -31,26 +31,8 @@
                    value = value.strip()
                    # 将键值对添加到字典中
                    self.config[key] = value
        # 配置日志
        # 确保日志目录存在
        log_dir = "logs"
        os.makedirs(log_dir, exist_ok=True)
        self.threads: Dict[str, threading.Thread] = {}
        self.lock = threading.Lock()
        # 初始化Milvus集合
        connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport"))
        # 加载集合
        self.collection = Collection(name="smartobject")
        self.collection.load()
        self.pool = qwen_thread(int(self.config.get("threadnum")), self.config,self.config.get("qwenaddr"))
        #是否更新
        self._isupdate = False
        # 初始化共享内存
        get_mem.smem_init()
        # 配置日志
        # 创建实例专属logger
        os.makedirs(self.config.get("logaddr"), exist_ok=True)
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s',
@@ -58,7 +40,7 @@
            handlers=[
                # 按大小轮转的日志文件(最大10MB,保留3个备份)
                RotatingFileHandler(
                    filename=os.path.join(log_dir, 'start_log.log'),
                    filename=os.path.join(self.config.get("logaddr"), 'qwen_log.log'),
                    maxBytes=10 * 1024 * 1024,  # 10MB
                    backupCount=3,
                    encoding='utf-8'
@@ -67,6 +49,28 @@
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
        self.logger.setLevel(logging.INFO)
        self.threads: Dict[str, threading.Thread] = {}
        self.lock = threading.Lock()
        # 初始化Milvus集合
        connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport"))
        # 加载集合
        self.collection = Collection(name="smartobject")
        self.collection.load()
        #创建qwen线程池--预警线程池
        self.poolWarning = qwen_thread(self.config,self.logger)
        # 创建qwen线程池--图片描述等
        self.poolDescription= qwen_thread_description(self.config, self.logger)
        #是否更新
        self._isupdate = False
        # 初始化共享内存
        get_mem.smem_init()
    #启动线程
    def safe_start(self, target_func, camera_id):
@@ -87,40 +91,40 @@
            self.threads[camera_id] = t
            return t
    # 启动线程任务
    def worker(self, camera_id):
        while True:
            try:
                res_a = self.collection.query(
                    expr=f"is_desc == 0 and video_point_id=={camera_id}",
                    output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
                                   "task_name", "event_level_id", "event_level_name",
                                   "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id",
                                   "detect_id",
                                   "detect_time", "image_path", "image_desc_path", "video_path"],
                    consistency_level="Strong",
                    order_by_field="id",  # 按id字段排序
                    order_by_type="desc"  # 降序排列
                )
                # 读取共享内存中的图片
                # image_id = get_mem.smem_read_frame_qianwen(camera_id)
                if len(res_a) > 0:
                    sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
                    # 查询前N个最大的ID
                    num = int(self.config.get("threadnum")) - 1
                    res_a = sorted_results[:num]
                    for res in res_a:
    # 启动线程任务--预警
    def workerWarning(self, camera_id):
            while True:
                try:
                    res_a = self.collection.query(
                        expr=f"is_desc == 0 and video_point_id=={camera_id}",
                        output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name",
                                       "task_id",
                                       "task_name", "event_level_id", "event_level_name",
                                       "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value",
                                       "rule_id",
                                       "detect_id", "knowledge_id", "suggestion", "risk_description",
                                       "detect_time", "image_path", "image_desc_path", "video_path"],
                        consistency_level="Strong",
                        order_by_field="id",  # 按id字段排序
                        order_by_type="desc"  # 降序排列
                    )
                    # 读取共享内存中的图片
                    # image_id = get_mem.smem_read_frame_qianwen(camera_id)
                    if len(res_a) > 0:
                        # sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
                        # res = sorted_results[0]
                        res = max(res_a, key=itemgetter("id"))
                        self.collection.delete(f"id == {res['id']}")
                        # 数据组
                        data = {
                            "id": res['id'],
                            "event_level_id": res['event_level_id'],  # event_level_id
                            "event_level_name": res['event_level_name'],  # event_level_id
                            "rule_id": res["rule_id"],
                            "video_point_id": res['video_point_id'],  # video_point_id
                            "video_point_name": res['video_point_name'],
                            "is_waning": 0,
                            "is_desc": 1,
                            "zh_desc_class": res['zh_desc_class'],  # text_vector
                            "is_desc":4,#预警中状态
                            "zh_desc_class": res['zh_desc_class'],
                            "bounding_box": res['bounding_box'],  # bounding_box
                            "task_id": res['task_id'],  # task_id
                            "task_name": res['task_name'],  # task_id
@@ -131,19 +135,74 @@
                            "image_path": res['image_path'],  # image_path
                            "image_desc_path": res['image_desc_path'],  # image_desc_path
                            "video_path": res['video_path'],
                            "text_vector": res['text_vector']
                            "text_vector": res['text_vector'],
                            "risk_description": res['risk_description'],
                            "suggestion": res['suggestion'],
                            "knowledge_id": res['knowledge_id']
                        }
                        # logging.info(f"读取图像成功: {res['id']}")
                        # 保存到milvus
                        image_id = self.collection.upsert(data).primary_keys
                        image_id = self.collection.insert(data).primary_keys
                        res['id'] = image_id[0]
                        # logging.info(f"读取图像成功: {image_id}")
                        image_id = self.pool.submit(res)
                        # image_id = pool.tark_do(image_id,self.config.get("ragurl"),self.config.get("ragmode"),self.config.get("max_tokens"))
                        # logging.info(f"处理图像成功: {image_id}")
                    sorted_results = None
                        self.poolWarning.submit(res)
                    time_sel.sleep(0.01)
                except Exception as e:
                    logging.info(f"{camera_id}线程错误:{e}")
    # 启动线程任务--生成图片描述等
    def worker(self, camera_id):
        while True:
            try:
                res_a = self.collection.query(
                    expr=f"is_desc == 5 and video_point_id=={camera_id}",
                    output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
                                   "task_name", "event_level_id", "event_level_name",
                                   "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id",
                                   "detect_id","knowledge_id","suggestion","risk_description",
                                   "detect_time", "image_path", "image_desc_path", "video_path"],
                    consistency_level="Strong",
                    order_by_field="id",  # 按id字段排序
                    order_by_type="desc"  # 降序排列
                )
                # 读取共享内存中的图片
                # image_id = get_mem.smem_read_frame_qianwen(camera_id)
                if len(res_a) > 0:
                    #sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
                    #res = sorted_results[0]
                    res = max(res_a, key=itemgetter("id"))
                    self.collection.delete(f"id == {res['id']}")
                    # 数据组
                    data = {
                        "event_level_id": res['event_level_id'],  # event_level_id
                        "event_level_name": res['event_level_name'],  # event_level_id
                        "rule_id": res["rule_id"],
                        "video_point_id": res['video_point_id'],  # video_point_id
                        "video_point_name": res['video_point_name'],
                        "is_waning": res['is_waning'],
                        "is_desc": 1,
                        "zh_desc_class": res['zh_desc_class'],
                        "bounding_box": res['bounding_box'],  # bounding_box
                        "task_id": res['task_id'],  # task_id
                        "task_name": res['task_name'],  # task_id
                        "detect_id": res['detect_id'],  # detect_id
                        "detect_time": res['detect_time'],  # detect_time
                        "detect_num": res['detect_num'],
                        "waning_value": res['waning_value'],
                        "image_path": res['image_path'],  # image_path
                        "image_desc_path": res['image_desc_path'],  # image_desc_path
                        "video_path": res['video_path'],
                        "text_vector": res['text_vector'],
                        "risk_description": res['risk_description'],
                        "suggestion": res['suggestion'],
                        "knowledge_id": res['knowledge_id']
                    }
                    # 保存到milvus
                    image_id = self.collection.insert(data).primary_keys
                    res['id'] = image_id[0]
                    self.poolDescription.submit(res)
                time_sel.sleep(0.01)
            except Exception as e:
                logging.info(f"{camera_id}线程错误:{e}")
    #调用是否需要更新
    def isUpdate(self):
@@ -228,6 +287,9 @@
                        thread = pool.threads.get(camera.get("camera_id"))
                        if not thread:
                            logging.info(f"开始创建{camera.get('camera_id')}线程")
                            #先对数据进行预警
                            pool.safe_start(pool.workerWarning, camera.get('camera_id'))
                            #在生成图片描述、处理建议等信息
                            pool.safe_start(pool.worker, camera.get('camera_id'))
                            logging.info(f"{camera.get('camera_id')}线程创建完毕")