import asyncio
|
import base64
|
import io
|
import json
|
import os
|
import re
|
|
import requests
|
import torch
|
import logging
|
from PIL import Image
|
from logging.handlers import RotatingFileHandler
|
|
|
class detect_tasks():
|
def __init__(self):
|
# 线程名称
|
self._thread_name = ''
|
|
# 初始化Milvus集合
|
self.collection = None
|
|
def remove_duplicate_lines(self, text):
|
|
seen = set()
|
result = []
|
for line in text.split('。'): # 按句号分割
|
if line.strip() and line not in seen:
|
seen.add(line)
|
result.append(line)
|
return '。'.join(result)
|
|
def remove_duplicate_lines_d(self, text):
|
seen = set()
|
result = []
|
for line in text.split(','): # 按句号分割
|
if line.strip() and line not in seen:
|
seen.add(line)
|
result.append(line)
|
return '。'.join(result)
|
|
def remove_duplicate_lines_n(self, text):
|
seen = set()
|
result = []
|
for line in text.split('\n'): # 按句号分割
|
if line.strip() and line not in seen:
|
seen.add(line)
|
result.append(line)
|
return '。'.join(result)
|
|
def init_logging(self, logname):
|
# 创建实例专属logger
|
self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
|
self.logger.setLevel(logging.INFO)
|
# 避免重复添加handler
|
if not self.logger.handlers:
|
handler = RotatingFileHandler(
|
filename=os.path.join("logs", logname + '_log.log'),
|
maxBytes=10 * 1024 * 1024,
|
backupCount=3,
|
encoding='utf-8'
|
)
|
formatter = logging.Formatter(
|
'%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s'
|
)
|
handler.setFormatter(formatter)
|
self.logger.addHandler(handler)
|
|
def image_desc(self, image_path, ollama_url, ollama_mode="qwen2.5vl:3b"):
|
try:
|
image_des = None
|
# 图片预处理
|
img = Image.open(image_path)
|
buffered = io.BytesIO()
|
img.save(buffered, format="JPEG", quality=85)
|
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8')
|
# 调用API
|
response = requests.post(
|
f"{ollama_url}/api/generate",
|
json={
|
"model": ollama_mode,
|
"prompt": "请按以下要求描述图片:\n1. 列出主要物体\n2.不进行推理和think\n返回小于2000字的整段描述,描述中的物体信息不加数字序号",
|
"images": [img_str],
|
"options": {
|
"num_gpu_layers": 35
|
},
|
"stream": False # 非流式
|
},
|
headers={"Content-Type": "application/json"}
|
)
|
result = response.json()
|
if result and result["response"]:
|
image_des = (result["response"]).replace('\n', '')
|
if len(image_des) > 4 and image_des.startswith("这张图片"):
|
image_des = image_des[4:]
|
|
image_des = self.remove_duplicate_lines(image_des)
|
image_des = self.remove_duplicate_lines_d(image_des)
|
image_des = self.remove_duplicate_lines_n(image_des)
|
# self.logger.info(image_des)
|
else:
|
self.logger.info(f"{self._thread_name}线程:执行图片描述时出错:{image_path, result, response}")
|
return image_des
|
except Exception as e:
|
self.logger.info(f"{self._thread_name}线程:执行图片描述时出错:{image_path, e}")
|
return None
|
|
def get_rule(self, ragurl):
|
try:
|
rule_text = None
|
search_data = {
|
"collection_name": "smart_rule",
|
"query_text": "",
|
"search_mode": "hybrid",
|
"limit": 100,
|
"weight_dense": 0.7,
|
"weight_sparse": 0.3,
|
"filter_expr": "",
|
"output_fields": ["text"]
|
}
|
response = requests.post(ragurl + "/search", json=search_data)
|
results = response.json().get('results')
|
rule_text = ""
|
ruleid = 1
|
for rule in results:
|
if rule['score'] >= 0:
|
rule_text = rule_text + str(ruleid) + ". " + rule['entity'].get('text') + ";\n"
|
ruleid = ruleid + 1
|
# self.logger.info(len(rule_text))
|
else:
|
self.logger.info(f"{self._thread_name}线程:执行获取规则时出错:{response}")
|
return rule_text
|
except Exception as e:
|
self.logger.info(f"{self._thread_name}线程:执行获取规则时出错:{e}")
|
return None
|
|
def image_rule_chat(self, image_des, rule_text, ollama_url, ollama_mode="qwen2.5vl:3b"):
|
try:
|
is_waning = 0
|
|
# 请求内容
|
content = (
|
f"图片描述内容为:\n{image_des}\n规则内容:\n{rule_text}。\n请验证图片描述中是否有符合规则的内容,不进行推理和think。返回结果格式为[xxx符合的规则id],如果没有返回[]")
|
|
# API调用
|
response = requests.post(
|
# ollama地址
|
url=f"{ollama_url}/api/chat",
|
json={
|
# 默认模型
|
"model": ollama_mode,
|
"messages": [
|
{"role": "user", "content": content}
|
],
|
"stream": False # 关闭流式输出
|
}
|
)
|
# 结果处理
|
ret = response.json()["message"]["content"]
|
if len(ret) > 2:
|
is_waning = 1
|
# self.logger.info(f"{self._thread_name}线程:执行规则匹配时出错:{image_des, rule_text,ret, response, ollama_url, ollama_mode}")
|
return is_waning
|
except Exception as e:
|
self.logger.info(
|
f"{self._thread_name}线程:执行规则匹配时出错:{image_des, rule_text, ollama_url, ollama_mode, e}")
|
return None
|
|
# 隐患描述
|
def image_rule_chat_with_detail(self,filedata, rule_text, ollama_url, ollama_mode="qwen2.5vl:3b"):
|
|
# API调用
|
response = requests.post(
|
# ollama地址
|
url=f"{ollama_url}/chat",
|
json={
|
"prompt":"",
|
# 请求内容
|
"messages": [
|
{
|
"role": "user",
|
"content": f"请根据规章制度[{filedata}]\n查找[{rule_text}]的安全隐患描述,不进行推理和think。返回信息小于800字"
|
}
|
],
|
# 指定模型
|
"llm_name": "qwen3:8b",
|
"stream": False, # 关闭流式输出
|
"gen_conf": {
|
"temperature": 0.7, # 控制生成随机性
|
"max_tokens": 800 # 最大输出长度
|
}
|
}
|
)
|
# 从json提取data字段内容
|
ret = response.json()["data"]
|
# 移除<think>标签和内容
|
ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
|
# 字符串清理,移除空格,制表符,换行符,星号
|
ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
|
print(ret)
|
return ret
|
#处理建议
|
def image_rule_chat_suggestion(self, rule_text, ollama_url, ollama_mode="qwen2.5vl:3b"):
|
self.logger.info("----------------------------------------------------------------")
|
content = (
|
f"请根据违规内容[{rule_text}]\n进行返回处理违规建议,不进行推理和think。返回精准信息")
|
response = requests.post(
|
url=f"{ollama_url}/chat",
|
json={
|
|
"llm_name": "qwen3:8b",
|
"messages": [
|
{"role": "user", "content": content}
|
],
|
"stream": False # 关闭流式输出
|
}
|
)
|
ret = response.json()["data"]
|
ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
|
ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
|
print(ret)
|
return ret
|
|
# print(response.json())
|
# ret = response.json()["detail"]
|
|
def get_filedata(self, searchtext, ragurl):
|
search_data = {
|
"collection_name": "smart_knowledge",
|
"query_text": searchtext,
|
"search_mode": "hybrid",
|
"limit": 100,
|
"weight_dense": 0.7,
|
"weight_sparse": 0.3,
|
"filter_expr": "",
|
"output_fields": ["text"]
|
}
|
response = requests.post(ragurl + "/search", json=search_data)
|
results = response.json().get('results')
|
text = ""
|
for rule in results:
|
text = text + rule['entity'].get('text') + ";\n"
|
|
return text
|
|
async def insert_json_data(self, ragurl, data):
|
try:
|
data = {'collection_name': "smartrag", "data": data, "description": ""}
|
requests.post(ragurl + "/insert_json_data", json=data, timeout=(0.3, 0.3))
|
# self.logger.info(f"调用录像服务:{ragurl, data}")
|
except Exception as e:
|
# self.logger.info(f"{self._thread_name}线程:调用录像时出错:地址:{ragurl}:{e}")
|
return
|
|
def tark_do(self, image_id, ollamaurl, ragurl, ollamamode, ragmode):
|
try:
|
# 1. 从集合A获取向量和元数据
|
is_waning = 0
|
res_a = self.collection.query(
|
expr=f"id == 458942504686042840",
|
output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
|
"task_name", "event_level_id", "event_level_name",
|
"video_point_id", "detect_num", "is_waning", "waning_value", "rule_id", "detect_id",
|
"detect_time", "image_path", "image_desc_path", "video_path","risk_description","suggestion","knowledge_id"],
|
consistency_level="Strong"
|
)
|
|
if not res_a:
|
self.logger.info(f"{self._thread_name}线程:未找到ID为 {image_id} 的记录")
|
return 0
|
|
|
image_des = self.image_desc(f"/opt/smart/img/1748763385.245874.jpg", ollamaurl, ollamamode)
|
risk_description = ""
|
suggestion = ""
|
if image_des:
|
is_waning = self.image_rule_chat(image_des, res_a[0]['waning_value'], ollamaurl, ollamamode)
|
|
if is_waning == 0:
|
|
filedata = self.get_filedata(res_a[0]['waning_value'], ragurl)
|
risk_description = self.image_rule_chat_with_detail(filedata,res_a[0]['waning_value'], ragurl, ragmode)
|
suggestion = self.image_rule_chat_suggestion(res_a[0]['waning_value'], ragurl, ragmode)
|
|
# 数据组
|
data = {
|
"id": image_id,
|
"event_level_id": res_a[0]['event_level_id'], # event_level_id
|
"event_level_name": res_a[0]['event_level_name'], # event_level_id
|
"rule_id": res_a[0]["rule_id"],
|
"video_point_id": res_a[0]['video_point_id'], # video_point_id
|
"video_point_name": res_a[0]['video_point_name'],
|
"is_waning": is_waning,
|
"zh_desc_class": image_des, # text_vector
|
"bounding_box": res_a[0]['bounding_box'], # bounding_box
|
"task_id": res_a[0]['task_id'], # task_id
|
"task_name": res_a[0]['task_name'], # task_id
|
"detect_id": res_a[0]['detect_id'], # detect_id
|
"detect_time": res_a[0]['detect_time'], # detect_time
|
"detect_num": res_a[0]['detect_num'],
|
"waning_value": res_a[0]['waning_value'],
|
"image_path": res_a[0]['image_path'], # image_path
|
"image_desc_path": res_a[0]['image_desc_path'], # image_desc_path
|
"video_path": res_a[0]['video_path'],
|
"text_vector": res_a[0]['text_vector'],
|
"risk_description": risk_description,
|
"suggestion": suggestion,
|
"knowledge_id": res_a[0]['knowledge_id']
|
}
|
|
|
|
# 保存到milvus
|
image_id = self.collection.upsert(data).primary_keys
|
data = {
|
"id": str(image_id[0]),
|
"video_point_id": res_a[0]['video_point_id'],
|
"video_path": res_a[0]["video_point_name"],
|
"zh_desc_class": image_des,
|
"detect_time": res_a[0]['detect_time'],
|
"image_path": f"{res_a[0]['image_path']}",
|
"task_name": res_a[0]["task_name"],
|
"event_level_name": res_a[0]["event_level_name"],
|
"rtsp_address": f"{res_a[0]['video_path']}"
|
}
|
# 调用rag
|
asyncio.run(self.insert_json_data(ragurl, data))
|
return image_id
|
except Exception as e:
|
self.logger.info(f"{self._thread_name}线程:执行模型解析时出错:任务:{image_id} :{e}")
|
return 0
|