import json import os import re import requests import torch import logging from PIL import Image from logging.handlers import RotatingFileHandler class detect_tasks(): def __init__(self): #线程名称 self._thread_name = '' self.device = None # 初始化检测模型 self.qwen_tokenizer = None self.qwen_model = None # 初始化Milvus集合 self.collection = None def remove_duplicate_lines(self,text): seen = set() result = [] for line in text.split('。'): # 按句号分割 if line.strip() and line not in seen: seen.add(line) result.append(line) return '。'.join(result) def init_logging(self,logname): # 创建实例专属logger self.logger = logging.getLogger(f"{self.__class__}_{id(self)}") self.logger.setLevel(logging.INFO) # 避免重复添加handler if not self.logger.handlers: handler = RotatingFileHandler( filename=os.path.join("logs", logname+'_log.log'), maxBytes=10 * 1024 * 1024, backupCount=3, encoding='utf-8' ) formatter = logging.Formatter( '%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s' ) handler.setFormatter(formatter) self.logger.addHandler(handler) def tark_do(self,image_id,filesavepath,ragurl,max_tokens): try : # 1. 从集合A获取向量和元数据 res_a = self.collection.query( expr=f"id == {image_id}", output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id", "task_name","event_level_id","event_level_name", "video_point_id", "detect_num", "is_waning", "waning_value", "rule_id", "detect_id", "detect_time", "image_path", "image_desc_path","video_path"], consistency_level="Strong" ) # 图片和视频地址 image = Image.open(f"{res_a[0]['image_desc_path']}").convert("RGB") # 替换为您的图片路径 conversation = [ { "role": "user", "content": [ { "type": "image", }, {"type": "text", "text": "请按以下要求描述图片:\n1. 列出主要物体\n2.不进行推理和think\n返回小于2000字的整段描述,描述中的物体信息不加数字序号"}, ], } ] # Preprocess the inputs self.logger.info("aaaa") text_prompt = self.qwen_tokenizer.apply_chat_template(conversation, add_generation_prompt=True) # Excepted output: '<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\n<|vision_start|><|image_pad|><|vision_end|>Describe this image.<|im_end|>\n<|im_start|>assistant\n' self.logger.info("bbbb") inputs = self.qwen_tokenizer( text=[text_prompt], images=[image], padding=True, return_tensors="pt" ) inputs = inputs.to("cuda") self.logger.info("cccc") torch.cuda.empty_cache() with torch.no_grad(): output_ids = self.qwen_model.generate(**inputs, max_new_tokens=50) print(output_ids.device) self.logger.info("dddd") generated_ids = [ output_ids[len(input_ids):] for input_ids, output_ids in zip(inputs.input_ids, output_ids) ] image_text = self.qwen_tokenizer.batch_decode( generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True ) self.logger.info("ffff") image_des = (image_text[0]).replace('\n', '') if len(image_des)>4 and image_des.startswith("这张图片"): image_des = image_des[4:] self.logger.info(image_des) image_des = self.remove_duplicate_lines(image_des) is_waning = 0 data = { "id": image_id, "event_level_id": res_a[0]['event_level_id'], # event_level_id "event_level_name": res_a[0]['event_level_name'], # event_level_id "rule_id": res_a[0]["rule_id"], "video_point_id": res_a[0]['video_point_id'], # video_point_id "video_point_name": res_a[0]['video_point_name'], "is_waning": is_waning, "zh_desc_class": image_des, # text_vector "bounding_box": res_a[0]['bounding_box'], # bounding_box "task_id": res_a[0]['task_id'], # task_id "task_name": res_a[0]['task_name'], # task_id "detect_id": res_a[0]['detect_id'], # detect_id "detect_time": res_a[0]['detect_time'], # detect_time "detect_num": res_a[0]['detect_num'], "waning_value": res_a[0]['waning_value'], "image_path": res_a[0]['image_path'], # image_path "image_desc_path": res_a[0]['image_desc_path'], # image_path "video_path": res_a[0]['video_path'], "text_vector": res_a[0]['text_vector'] } # 保存到milvus image_id = self.collection.upsert(data).primary_keys data = { "id": str(image_id[0]), "video_point_id": res_a[0]['video_point_id'], "video_path": res_a[0]["video_point_name"], "zh_desc_class": image_des, "detect_time": res_a[0]['detect_time'], "image_path": f"{res_a[0]['image_path']}", "task_name": res_a[0]["task_name"], "event_level_name": res_a[0]["event_level_name"], "rtsp_address": f"{res_a[0]['video_path']}" } data = {'collection_name': "smartrag","data": data,"description":""} requests.post(ragurl+"/insert_json_data", json=data ) return image_id except Exception as e: self.logger.info(f"{self._thread_name}线程:执行模型解析时出错:任务:{image_id} :{e}") return 0