import asyncio import base64 import io import json import os import re import requests import torch import logging from PIL import Image from logging.handlers import RotatingFileHandler class detect_tasks(): def __init__(self): #线程名称 self._thread_name = '' # 初始化Milvus集合 self.collection = None # 初始化llm self.llm = None def remove_duplicate_lines(self,text): seen = set() result = [] for line in text.split('。'): # 按句号分割 if line.strip() and line not in seen: seen.add(line) result.append(line) return '。'.join(result) def remove_duplicate_lines_d(self,text): seen = set() result = [] for line in text.split(','): # 按句号分割 if line.strip() and line not in seen: seen.add(line) result.append(line) return '。'.join(result) def remove_duplicate_lines_n(self,text): seen = set() result = [] for line in text.split('\n'): # 按句号分割 if line.strip() and line not in seen: seen.add(line) result.append(line) return '。'.join(result) def init_logging(self,logname): # 创建实例专属logger self.logger = logging.getLogger(f"{self.__class__}_{id(self)}") self.logger.setLevel(logging.INFO) # 避免重复添加handler if not self.logger.handlers: handler = RotatingFileHandler( filename=os.path.join("logs", logname+'_log.log'), maxBytes=10 * 1024 * 1024, backupCount=3, encoding='utf-8' ) formatter = logging.Formatter( '%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s' ) handler.setFormatter(formatter) self.logger.addHandler(handler) def image_desc(self,image_path): try: image_des = None # 构造多模态消息 messages = [ { "role": "user", "content": [ {"type": "text", "text": "请详细描述图片中的目标信息及特征。返回格式为整段文字描述"}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{self.image_to_base64(image_path)}" } } ] } ] # 发送请求 #self.logger.info("发送请求") response = self.llm.invoke(messages) if response and response.content: image_des = response.content if len(image_des) > 4 and image_des.startswith("这张图片"): image_des = image_des[4:] image_des = self.remove_duplicate_lines(image_des) image_des = self.remove_duplicate_lines_d(image_des) image_des = self.remove_duplicate_lines_n(image_des) #self.logger.info(image_des) else: self.logger.info(f"{self._thread_name}线程:执行图片描述时出错:{image_path, response}") return image_des except Exception as e: self.logger.info(f"{self._thread_name}线程:执行图片描述时出错:{image_path,e}") return None def get_rule(self,ragurl): try: rule_text = None search_data = { "collection_name": "smart_rule", "query_text": "", "search_mode": "hybrid", "limit": 100, "weight_dense": 0.7, "weight_sparse": 0.3, "filter_expr": "", "output_fields": ["text"] } response = requests.post(ragurl + "/search", json=search_data) results = response.json().get('results') rule_text = "" ruleid = 1 for rule in results: if rule['score'] >= 0: rule_text = rule_text + str(ruleid) + ". " + rule['entity'].get('text') + ";\n" ruleid = ruleid + 1 # self.logger.info(len(rule_text)) else: self.logger.info(f"{self._thread_name}线程:执行获取规则时出错:{response}") return rule_text except Exception as e: self.logger.info(f"{self._thread_name}线程:执行获取规则时出错:{e}") return None def image_rule_chat(self, image_des,rule_text, ragurl, rag_mode,max_tokens): try: content = ( f"图片描述内容为:\n{image_des}\n规则内容:\n{rule_text}。\n请验证图片描述中是否有符合规则的内容,不进行推理和think。返回结果格式为[xxx符合的规则id],如果没有返回[]") # self.logger.info(content) #self.logger.info(len(content)) search_data = { "prompt": "", "messages": [ { "role": "user", "content": content } ], "llm_name": rag_mode, "stream": False, "gen_conf": { "temperature": 0.7, "max_tokens": max_tokens } } response = requests.post(ragurl + "/chat", json=search_data) results = response.json().get('data') #self.logger.info(len(results)) # self.logger.info(results) ret = re.sub(r'.*?', '', results, flags=re.DOTALL) ret = ret.replace(" ", "").replace("\t", "").replace("\n", "") is_waning = 0 if len(ret) > 2: is_waning = 1 return is_waning except Exception as e: self.logger.info(f"{self._thread_name}线程:执行规则匹配时出错:{image_des, rule_text, ragurl, rag_mode,e}") return None async def insert_json_data(self, ragurl, data): try: data = {'collection_name': "smartrag", "data": data, "description": ""} requests.post(ragurl + "/insert_json_data", json=data, timeout=(0.3, 0.3)) #self.logger.info(f"调用录像服务:{ragurl, data}") except Exception as e: #self.logger.info(f"{self._thread_name}线程:调用录像时出错:地址:{ragurl}:{e}") return # 处理多模态输入(示例:文本+图像) def image_to_base64(self,image_path): with open(image_path, "rb") as img_file: return base64.b64encode(img_file.read()).decode('utf-8') def tark_do(self,res,ragurl,rag_mode,max_tokens): try : # 1. 从集合A获取向量和元数据 is_waning = 0 is_desc = 0 #res_a = self.collection.query( # expr=f"id == {image_id}", # output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id", # "task_name","event_level_id","event_level_name", # "video_point_id", "detect_num", "is_waning", "waning_value", "rule_id", "detect_id", # "detect_time", "image_path", "image_desc_path", "video_path"], # consistency_level="Strong" #) image_des = self.image_desc(f"{res['image_desc_path']}") if image_des: #rule_text = self.get_rule(ragurl) is_waning = self.image_rule_chat(image_des,res['waning_value'],ragurl,rag_mode,max_tokens) is_desc = 2 else: is_waning = 0 is_desc = 3 data = { "id": res['id'], "event_level_id": res['event_level_id'], # event_level_id "event_level_name": res['event_level_name'], # event_level_id "rule_id": res["rule_id"], "video_point_id": res['video_point_id'], # video_point_id "video_point_name": res['video_point_name'], "is_waning": is_waning, "is_desc": is_desc, "zh_desc_class": image_des, # text_vector "bounding_box": res['bounding_box'], # bounding_box "task_id": res['task_id'], # task_id "task_name": res['task_name'], # task_id "detect_id": res['detect_id'], # detect_id "detect_time": res['detect_time'], # detect_time "detect_num": res['detect_num'], "waning_value": res['waning_value'], "image_path": res['image_path'], # image_path "image_desc_path": res['image_desc_path'], # image_desc_path "video_path": res['video_path'], "text_vector": res['text_vector'] } # 保存到milvus image_id = self.collection.upsert(data).primary_keys if is_desc == 2 : data = { "id": str(image_id[0]), "video_point_id": res['video_point_id'], "video_path": res["video_point_name"], "zh_desc_class": image_des, "detect_time": res['detect_time'], "image_path": f"{res['image_path']}", "task_name": res["task_name"], "event_level_name": res["event_level_name"], "rtsp_address": f"{res['video_path']}" } #调用rag asyncio.run(self.insert_json_data(ragurl, data)) return image_id except Exception as e: self.logger.info(f"{self._thread_name}线程:执行模型解析时出错:任务:{ res['id']} :{e}") return 0