#2025/7/11
#qwen_detect.py和qwen_thread.py优化了日志输出,采用一个日志,方便后期排查问题
#conf.txt从服务器获取的最新配置文件,备份
#批量处理的效果太差,比单张慢两倍,留存qwen_detect_batch.py和qwen_thread_batch.py
| | |
| | | milvusurl = 192.168.1.232 |
| | | milvusurl = 192.168.1.176 |
| | | milvusport = 19530 |
| | | isupdateurl = http://192.168.1.232:8088/v1/task/isChange?stateType=2 |
| | | gettaskconfurl = http://192.168.1.232:8088/v1/task/getTaskConf |
| | | updatestatusurl = http://192.168.1.232:8088/v1/task/updateChangeStatus?stateType=2 |
| | | videotaskurl = http://192.168.1.232:8089/api/v1/camera/record/task |
| | | ragurl=http://192.168.1.232:8870 |
| | | ragmode=qwen2.5vl |
| | | ollamaurl=http://192.168.1.232:11434 |
| | | ollamamode=qwen2.5vl |
| | | isupdateurl = http://192.168.1.176:8088/v1/task/isChange?stateType=2 |
| | | gettaskconfurl = http://192.168.1.176:8088/v1/task/getTaskConf |
| | | updatestatusurl = http://192.168.1.176:8088/v1/task/updateChangeStatus?stateType=2 |
| | | videotaskurl = http://192.168.1.176:8089/api/v1/camera/record/task |
| | | ragurl=http://192.168.1.176:8870 |
| | | ragmode=qwen3 |
| | | ollamaurl=http://192.168.1.176:11434 |
| | | ollamamode=qwen3 |
| | | vllmurl=http://192.168.1.176:8880/v1 |
| | | vllmmode=qwen3 |
| | | filesavepath = /opt/smart |
| | | max_tokens = 2000 |
| | | threadnum = 3 |
| | | max_tokens = 200 |
| | | threadnum = 4 |
| | | detectnum = 1 |
| | | qwenaddr = /home/debian/Qwen2.5-VL-3B-Instruct-GPTQ-Int4 |
| | | cuda = 1 |
| | | |
| | |
| | | value = value.strip() |
| | | # 将键值对添加到字典中 |
| | | self.config[key] = value |
| | | # 配置日志 |
| | | # 确保日志目录存在 |
| | | log_dir = "logs" |
| | | os.makedirs(log_dir, exist_ok=True) |
| | | self.threads: Dict[str, threading.Thread] = {} |
| | | self.lock = threading.Lock() |
| | | |
| | | # 初始化Milvus集合 |
| | | connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport")) |
| | | # 加载集合 |
| | | self.collection = Collection(name="smartobject") |
| | | self.collection.load() |
| | | self.pool = qwen_thread(self.config) |
| | | #是否更新 |
| | | self._isupdate = False |
| | | |
| | | # 初始化共享内存 |
| | | get_mem.smem_init() |
| | | |
| | | # 配置日志 |
| | | # 创建实例专属logger |
| | | os.makedirs("logs", exist_ok=True) |
| | | logging.basicConfig( |
| | | level=logging.INFO, |
| | | format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s', |
| | |
| | | handlers=[ |
| | | # 按大小轮转的日志文件(最大10MB,保留3个备份) |
| | | RotatingFileHandler( |
| | | filename=os.path.join(log_dir, 'start_log.log'), |
| | | filename=os.path.join("logs", 'qwen_log.log'), |
| | | maxBytes=10 * 1024 * 1024, # 10MB |
| | | backupCount=3, |
| | | encoding='utf-8' |
| | |
| | | logging.StreamHandler() |
| | | ] |
| | | ) |
| | | self.logger = logging.getLogger(f"{self.__class__}_{id(self)}") |
| | | self.logger.setLevel(logging.INFO) |
| | | |
| | | |
| | | self.threads: Dict[str, threading.Thread] = {} |
| | | self.lock = threading.Lock() |
| | | |
| | | # 初始化Milvus集合 |
| | | connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport")) |
| | | # 加载集合 |
| | | self.collection = Collection(name="smartobject") |
| | | self.collection.load() |
| | | #创建qwen线程池 |
| | | self.pool = qwen_thread(self.config,self.logger) |
| | | #是否更新 |
| | | self._isupdate = False |
| | | |
| | | # 初始化共享内存 |
| | | get_mem.smem_init() |
| | | |
| | | |
| | | #启动线程 |
| | | def safe_start(self, target_func, camera_id): |
| | |
| | | |
| | | |
| | | class qwen_thread: |
| | | def __init__(self, config): |
| | | def __init__(self, config,logger): |
| | | self.config = config |
| | | self.max_workers = int(config.get("threadnum")) |
| | | self.executor = ThreadPoolExecutor(max_workers=int(config.get("threadnum"))) |
| | | self.semaphore = threading.Semaphore(int(config.get("threadnum"))) |
| | | self.logger = logger |
| | | |
| | | # 初始化Milvus集合 |
| | | connections.connect("default", host=config.get("milvusurl"), port=config.get("milvusport")) |
| | |
| | | # 共享的处理器 (线程安全) |
| | | self.processor = AutoProcessor.from_pretrained(config.get("qwenaddr"), use_fast=True) |
| | | |
| | | # 创建实例专属logger |
| | | self.logger = logging.getLogger(f"{self.__class__}_{id(self)}") |
| | | self.logger.setLevel(logging.INFO) |
| | | # 避免重复添加handler |
| | | if not self.logger.handlers: |
| | | handler = RotatingFileHandler( |
| | | filename=os.path.join("logs", 'thread_log.log'), |
| | | maxBytes=10 * 1024 * 1024, |
| | | backupCount=3, |
| | | encoding='utf-8' |
| | | ) |
| | | formatter = logging.Formatter( |
| | | '%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s' |
| | | ) |
| | | handler.setFormatter(formatter) |
| | | self.logger.addHandler(handler) |
| | | |
| | | def submit(self,res_a): |
| | | # 尝试获取信号量(非阻塞) |
| | |
| | | # 调用规则匹配方法,判断是否预警 |
| | | is_waning = self.image_rule_chat(desc, res['waning_value'], ragurl,rag_mode,max_tokens) |
| | | # 如果预警,则生成隐患描述和处理建议 |
| | | #if is_waning == 1: |
| | | # 获取规章制度数据 |
| | | filedata = self.get_filedata(res['waning_value'],res['suggestion'], ragurl) |
| | | # 生成隐患描述 |
| | | risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens) |
| | | # 生成处理建议 |
| | | suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens) |
| | | if is_waning == 1: |
| | | # 获取规章制度数据 |
| | | filedata = self.get_filedata(res['waning_value'],res['suggestion'], ragurl) |
| | | # 生成隐患描述 |
| | | risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens) |
| | | # 生成处理建议 |
| | | suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens) |
| | | else: |
| | | is_desc = 3 |
| | | |
| | |
| | | "rule_id": res["rule_id"], |
| | | "video_point_id": res['video_point_id'], # video_point_id |
| | | "video_point_name": res['video_point_name'], |
| | | "is_waning": 1, |
| | | "is_waning": is_waning, |
| | | "is_desc": is_desc, |
| | | "zh_desc_class": desc, # text_vector |
| | | "bounding_box": res['bounding_box'], # bounding_box |
| | |
| | | # 调用rag |
| | | asyncio.run(self.insert_json_data(ragurl, data)) |
| | | rag_time = datetime.now() - current_time |
| | | self.logger.info(f"{image_id}运行结束总体用时:{datetime.now() - ks_time},图片描述用时{desc_time},RAG用时{rag_time}") |
| | | self.logger.info(f"{res['video_point_id']}执行完毕:{image_id}运行结束总体用时:{datetime.now() - ks_time},图片描述用时{desc_time},RAG用时{rag_time}") |
| | | except Exception as e: |
| | | self.logger.info(f"线程:执行模型解析时出错::{e}") |
| | | return 0 |