import asyncio import base64 import io import json import os import re import requests import torch import logging from PIL import Image from logging.handlers import RotatingFileHandler class detect_tasks(): def __init__(self): # 线程名称 self._thread_name = '' # 初始化Milvus集合 self.collection = None def remove_duplicate_lines(self, text): seen = set() result = [] for line in text.split('。'): # 按句号分割 if line.strip() and line not in seen: seen.add(line) result.append(line) return '。'.join(result) def remove_duplicate_lines_d(self, text): seen = set() result = [] for line in text.split(','): # 按句号分割 if line.strip() and line not in seen: seen.add(line) result.append(line) return '。'.join(result) def remove_duplicate_lines_n(self, text): seen = set() result = [] for line in text.split('\n'): # 按句号分割 if line.strip() and line not in seen: seen.add(line) result.append(line) return '。'.join(result) def init_logging(self, logname): # 创建实例专属logger self.logger = logging.getLogger(f"{self.__class__}_{id(self)}") self.logger.setLevel(logging.INFO) # 避免重复添加handler if not self.logger.handlers: handler = RotatingFileHandler( filename=os.path.join("logs", logname + '_log.log'), maxBytes=10 * 1024 * 1024, backupCount=3, encoding='utf-8' ) formatter = logging.Formatter( '%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s' ) handler.setFormatter(formatter) self.logger.addHandler(handler) def image_desc(self, image_path, ollama_url, ollama_mode="qwen2.5vl:3b"): try: image_des = None # 图片预处理 img = Image.open(image_path) buffered = io.BytesIO() img.save(buffered, format="JPEG", quality=85) img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') # 调用API response = requests.post( f"{ollama_url}/api/generate", json={ "model": ollama_mode, "prompt": "请按以下要求描述图片:\n1. 列出主要物体\n2.不进行推理和think\n返回小于2000字的整段描述,描述中的物体信息不加数字序号", "images": [img_str], "options": { "num_gpu_layers": 35 }, "stream": False # 非流式 }, headers={"Content-Type": "application/json"} ) result = response.json() if result and result["response"]: image_des = (result["response"]).replace('\n', '') if len(image_des) > 4 and image_des.startswith("这张图片"): image_des = image_des[4:] image_des = self.remove_duplicate_lines(image_des) image_des = self.remove_duplicate_lines_d(image_des) image_des = self.remove_duplicate_lines_n(image_des) # self.logger.info(image_des) else: self.logger.info(f"{self._thread_name}线程:执行图片描述时出错:{image_path, result, response}") return image_des except Exception as e: self.logger.info(f"{self._thread_name}线程:执行图片描述时出错:{image_path, e}") return None def get_rule(self, ragurl): try: rule_text = None search_data = { "collection_name": "smart_rule", "query_text": "", "search_mode": "hybrid", "limit": 100, "weight_dense": 0.7, "weight_sparse": 0.3, "filter_expr": "", "output_fields": ["text"] } response = requests.post(ragurl + "/search", json=search_data) results = response.json().get('results') rule_text = "" ruleid = 1 for rule in results: if rule['score'] >= 0: rule_text = rule_text + str(ruleid) + ". " + rule['entity'].get('text') + ";\n" ruleid = ruleid + 1 # self.logger.info(len(rule_text)) else: self.logger.info(f"{self._thread_name}线程:执行获取规则时出错:{response}") return rule_text except Exception as e: self.logger.info(f"{self._thread_name}线程:执行获取规则时出错:{e}") return None def image_rule_chat(self, image_des, rule_text, ollama_url, ollama_mode="qwen2.5vl:3b"): try: is_waning = 0 # 请求内容 content = ( f"图片描述内容为:\n{image_des}\n规则内容:\n{rule_text}。\n请验证图片描述中是否有符合规则的内容,不进行推理和think。返回结果格式为[xxx符合的规则id],如果没有返回[]") # API调用 response = requests.post( # ollama地址 url=f"{ollama_url}/api/chat", json={ # 默认模型 "model": ollama_mode, "messages": [ {"role": "user", "content": content} ], "stream": False # 关闭流式输出 } ) # 结果处理 ret = response.json()["message"]["content"] if len(ret) > 2: is_waning = 1 # self.logger.info(f"{self._thread_name}线程:执行规则匹配时出错:{image_des, rule_text,ret, response, ollama_url, ollama_mode}") return is_waning except Exception as e: self.logger.info( f"{self._thread_name}线程:执行规则匹配时出错:{image_des, rule_text, ollama_url, ollama_mode, e}") return None # 隐患描述 def image_rule_chat_with_detail(self,filedata, rule_text, ollama_url, ollama_mode="qwen2.5vl:3b"): # API调用 response = requests.post( # ollama地址 url=f"{ollama_url}/chat", json={ "prompt":"", # 请求内容 "messages": [ { "role": "user", "content": f"请根据规章制度[{filedata}]\n查找[{rule_text}]的安全隐患描述,不进行推理和think。返回信息小于800字" } ], # 指定模型 "llm_name": "qwen3:8b", "stream": False, # 关闭流式输出 "gen_conf": { "temperature": 0.7, # 控制生成随机性 "max_tokens": 800 # 最大输出长度 } } ) # 从json提取data字段内容 ret = response.json()["data"] # 移除标签和内容 ret = re.sub(r'.*?', '', ret, flags=re.DOTALL) # 字符串清理,移除空格,制表符,换行符,星号 ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","") print(ret) return ret #处理建议 def image_rule_chat_suggestion(self, rule_text, ollama_url, ollama_mode="qwen2.5vl:3b"): self.logger.info("----------------------------------------------------------------") content = ( f"请根据违规内容[{rule_text}]\n进行返回处理违规建议,不进行推理和think。返回精准信息") response = requests.post( url=f"{ollama_url}/chat", json={ "llm_name": "qwen3:8b", "messages": [ {"role": "user", "content": content} ], "stream": False # 关闭流式输出 } ) ret = response.json()["data"] ret = re.sub(r'.*?', '', ret, flags=re.DOTALL) ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","") print(ret) return ret # print(response.json()) # ret = response.json()["detail"] def get_filedata(self, searchtext, ragurl): search_data = { "collection_name": "smart_knowledge", "query_text": searchtext, "search_mode": "hybrid", "limit": 100, "weight_dense": 0.7, "weight_sparse": 0.3, "filter_expr": "", "output_fields": ["text"] } response = requests.post(ragurl + "/search", json=search_data) results = response.json().get('results') text = "" for rule in results: text = text + rule['entity'].get('text') + ";\n" return text async def insert_json_data(self, ragurl, data): try: data = {'collection_name': "smartrag", "data": data, "description": ""} requests.post(ragurl + "/insert_json_data", json=data, timeout=(0.3, 0.3)) # self.logger.info(f"调用录像服务:{ragurl, data}") except Exception as e: # self.logger.info(f"{self._thread_name}线程:调用录像时出错:地址:{ragurl}:{e}") return def tark_do(self, image_id, ollamaurl, ragurl, ollamamode, ragmode): try: # 1. 从集合A获取向量和元数据 is_waning = 0 res_a = self.collection.query( expr=f"id == 458942504686042840", output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id", "task_name", "event_level_id", "event_level_name", "video_point_id", "detect_num", "is_waning", "waning_value", "rule_id", "detect_id", "detect_time", "image_path", "image_desc_path", "video_path","risk_description","suggestion","knowledge_id"], consistency_level="Strong" ) if not res_a: self.logger.info(f"{self._thread_name}线程:未找到ID为 {image_id} 的记录") return 0 image_des = self.image_desc(f"/opt/smart/img/1748763385.245874.jpg", ollamaurl, ollamamode) risk_description = "" suggestion = "" if image_des: is_waning = self.image_rule_chat(image_des, res_a[0]['waning_value'], ollamaurl, ollamamode) if is_waning == 0: filedata = self.get_filedata(res_a[0]['waning_value'], ragurl) risk_description = self.image_rule_chat_with_detail(filedata,res_a[0]['waning_value'], ragurl, ragmode) suggestion = self.image_rule_chat_suggestion(res_a[0]['waning_value'], ragurl, ragmode) # 数据组 data = { "id": image_id, "event_level_id": res_a[0]['event_level_id'], # event_level_id "event_level_name": res_a[0]['event_level_name'], # event_level_id "rule_id": res_a[0]["rule_id"], "video_point_id": res_a[0]['video_point_id'], # video_point_id "video_point_name": res_a[0]['video_point_name'], "is_waning": is_waning, "zh_desc_class": image_des, # text_vector "bounding_box": res_a[0]['bounding_box'], # bounding_box "task_id": res_a[0]['task_id'], # task_id "task_name": res_a[0]['task_name'], # task_id "detect_id": res_a[0]['detect_id'], # detect_id "detect_time": res_a[0]['detect_time'], # detect_time "detect_num": res_a[0]['detect_num'], "waning_value": res_a[0]['waning_value'], "image_path": res_a[0]['image_path'], # image_path "image_desc_path": res_a[0]['image_desc_path'], # image_desc_path "video_path": res_a[0]['video_path'], "text_vector": res_a[0]['text_vector'], "risk_description": risk_description, "suggestion": suggestion, "knowledge_id": res_a[0]['knowledge_id'] } # 保存到milvus image_id = self.collection.upsert(data).primary_keys data = { "id": str(image_id[0]), "video_point_id": res_a[0]['video_point_id'], "video_path": res_a[0]["video_point_name"], "zh_desc_class": image_des, "detect_time": res_a[0]['detect_time'], "image_path": f"{res_a[0]['image_path']}", "task_name": res_a[0]["task_name"], "event_level_name": res_a[0]["event_level_name"], "rtsp_address": f"{res_a[0]['video_path']}" } # 调用rag asyncio.run(self.insert_json_data(ragurl, data)) return image_id except Exception as e: self.logger.info(f"{self._thread_name}线程:执行模型解析时出错:任务:{image_id} :{e}") return 0