zhm
2025-07-24 028e39f85bf4115024d0467613f26a1750ff004e
qwen_ollama.py
@@ -10,7 +10,7 @@
import logging
from PIL import Image
from logging.handlers import RotatingFileHandler
from transformers import AutoModelForVision2Seq, AutoProcessor
class detect_tasks():
    def __init__(self):
@@ -19,6 +19,20 @@
        # 初始化Milvus集合
        self.collection = None
        self.model = AutoModelForVision2Seq.from_pretrained(
            "./Qwen2.5-VL-3B-Instruct-GPTQ-Int4",
            device_map="auto"
        )
        self.processor = AutoProcessor.from_pretrained(
            "./Qwen2.5-VL-3B-Instruct-GPTQ-Int4",
            trust_remote_code=True,
            use_fast=True  # 强制启用快速处理器
        )
    def remove_duplicate_lines(self, text):
@@ -67,43 +81,50 @@
            self.logger.addHandler(handler)
    def image_desc(self, image_path, ollama_url, ollama_mode="qwen2.5vl:3b"):
        try:
            image_des = None
            # 图片预处理
            img = Image.open(image_path)
            buffered = io.BytesIO()
            img.save(buffered, format="JPEG", quality=85)
            img_str = base64.b64encode(buffered.getvalue()).decode('utf-8')
            # 调用API
            response = requests.post(
                f"{ollama_url}/api/generate",
                json={
                    "model": ollama_mode,
                    "prompt": "请按以下要求描述图片:\n1. 列出主要物体\n2.不进行推理和think\n返回小于2000字的整段描述,描述中的物体信息不加数字序号",
                    "images": [img_str],
                    "options": {
                        "num_gpu_layers": 35
                    },
                    "stream": False  # 非流式
                },
                headers={"Content-Type": "application/json"}
            )
            result = response.json()
            if result and result["response"]:
                image_des = (result["response"]).replace('\n', '')
                if len(image_des) > 4 and image_des.startswith("这张图片"):
                    image_des = image_des[4:]
        image = Image.open(image_path).convert("RGB")  # 替换为您的图片路径
        image = image.resize((600, 600), Image.Resampling.LANCZOS)  # 高质量缩放
                image_des = self.remove_duplicate_lines(image_des)
                image_des = self.remove_duplicate_lines_d(image_des)
                image_des = self.remove_duplicate_lines_n(image_des)
                # self.logger.info(image_des)
            else:
                self.logger.info(f"{self._thread_name}线程:执行图片描述时出错:{image_path, result, response}")
            return image_des
        except Exception as e:
            self.logger.info(f"{self._thread_name}线程:执行图片描述时出错:{image_path, e}")
            return None
        messages = [
            {
                "role": "user",
                "content": [
                    {
                        "type": "image",
                    },
                    {"type": "text", "text": "请详细描述图片中的目标信息及特征。返回格式为整段文字描述"},
                ],
            }
        ]
        # Preparation for inference
        text = self.processor.apply_chat_template(
            messages, add_generation_prompt=True
        )
        inputs = self.processor(
            text=[text],
            images=[image],
            padding=True,
            return_tensors="pt",
        )
        inputs = inputs.to("cuda")
        with torch.no_grad():
            outputs = self.model.generate(**inputs,
                                     max_new_tokens=300,
                                     do_sample=True,
                                     temperature=0.7,
                                     renormalize_logits=True
                                     )
        generated_ids = outputs[:, len(inputs.input_ids[0]):]
        image_text = self.processor.batch_decode(
            generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
        )
        image_des = (image_text[0]).strip()
        return image_des
    def get_rule(self, ragurl):
        try:
@@ -193,6 +214,8 @@
        )
        # 从json提取data字段内容
        ret = response.json()["data"]
        #result = response.json()
        #ret = result.get("data") or result.get("message", {}).get("content", "")
        # 移除<think>标签和内容
        ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
        # 字符串清理,移除空格,制表符,换行符,星号
@@ -202,12 +225,14 @@
    #处理建议
    def image_rule_chat_suggestion(self, rule_text, ollama_url, ollama_mode="qwen2.5vl:3b"):
        self.logger.info("----------------------------------------------------------------")
        # 请求内容
        content = (
            f"请根据违规内容[{rule_text}]\n进行返回处理违规建议,不进行推理和think。返回精准信息")
        response = requests.post(
            # ollama地址
            url=f"{ollama_url}/chat",
            json={
                # 指定模型
                "llm_name": "qwen3:8b",
                "messages": [
                    {"role": "user", "content": content}
@@ -215,118 +240,131 @@
                "stream": False  # 关闭流式输出
            }
        )
        # 从json提取data字段内容
        ret = response.json()["data"]
        #result = response.json()
        #ret = result.get("data") or result.get("message", {}).get("content", "")
        # 移除<think>标签和内容
        ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
        # 字符串清理,移除空格,制表符,换行符,星号
        ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
        print(ret)
        return ret
        # print(response.json())
        # ret = response.json()["detail"]
    # RAG服务发送请求,获取知识库内容
    def get_filedata(self, searchtext, ragurl):
        search_data = {
            # 知识库集合
            "collection_name": "smart_knowledge",
            # 查询文本
            "query_text": searchtext,
            # 搜索模式
            "search_mode": "hybrid",
            # 最多返回结果
            "limit": 100,
            # 调密向量搜索权重
            "weight_dense": 0.7,
            # 稀疏向量搜索权重
            "weight_sparse": 0.3,
            # 空字符串
            "filter_expr": "",
            # 只返回 text 字段
            "output_fields": ["text"]
        }
        # 向 ragurl + "/search" 端点发送POST请求
        response = requests.post(ragurl + "/search", json=search_data)
        # 从响应中获取'results'字段
        results = response.json().get('results')
        # 初始化 text
        text = ""
        # 遍历所有结果规则(rule),将每条规则的'entity'中的'text'字段取出.
        for rule in results:
            text = text + rule['entity'].get('text') + ";\n"
        return text
    # 向RAG系统插入json格式数据
    async def insert_json_data(self, ragurl, data):
        try:
            # data 要插入的数据
            data = {'collection_name': "smartrag", "data": data, "description": ""}
            # 服务的基地址
            requests.post(ragurl + "/insert_json_data", json=data, timeout=(0.3, 0.3))
            # self.logger.info(f"调用录像服务:{ragurl, data}")
        except Exception as e:
            # self.logger.info(f"{self._thread_name}线程:调用录像时出错:地址:{ragurl}:{e}")
            return
    def tark_do(self, image_id, ollamaurl, ragurl, ollamamode, ragmode):
    def tark_do(self, res, ollamaurl, ragurl, ollamamode, ragmode):
        try:
            # 1. 从集合A获取向量和元数据
            is_waning = 0
            res_a = self.collection.query(
                expr=f"id == 458942504686042840",
                output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
                               "task_name", "event_level_id", "event_level_name",
                               "video_point_id", "detect_num", "is_waning", "waning_value", "rule_id", "detect_id",
                               "detect_time", "image_path", "image_desc_path", "video_path","risk_description","suggestion","knowledge_id"],
                consistency_level="Strong"
            )
            is_desc = 2
            if not res_a:
                self.logger.info(f"{self._thread_name}线程:未找到ID为 {image_id} 的记录")
                return 0
            image_des = self.image_desc(f"/opt/smart/img/1748763385.245874.jpg", ollamaurl, ollamamode)
            # 生成图片描述
            image_des = self.image_desc(res['image_desc_path'], ollamaurl, ollamamode)
            risk_description = ""
            suggestion = ""
            # 图片描述生成成功
            if image_des:
                is_waning = self.image_rule_chat(image_des, res_a[0]['waning_value'], ollamaurl, ollamamode)
                is_desc = 2
                # 调用规则匹配方法,判断是否预警
                is_waning = self.image_rule_chat(image_des, res['waning_value'], ollamaurl, ollamamode)
                # 如果预警,则生成隐患描述和处理建议
                if is_waning == 1:
                    # 获取规章制度数据
                    filedata = self.get_filedata(res['waning_value'], ragurl)
                    # 生成隐患描述
                    risk_description = self.image_rule_chat_with_detail(filedata,res['waning_value'], ragurl, ragmode)
                    # 生成处理建议
                    suggestion = self.image_rule_chat_suggestion(res['waning_value'], ragurl, ragmode)
                if is_waning == 0:
                    filedata = self.get_filedata(res_a[0]['waning_value'], ragurl)
                    risk_description = self.image_rule_chat_with_detail(filedata,res_a[0]['waning_value'], ragurl, ragmode)
                    suggestion = self.image_rule_chat_suggestion(res_a[0]['waning_value'], ragurl, ragmode)
            else:
                is_desc = 3
            # 数据组
            data = {
                "id": image_id,
                "event_level_id": res_a[0]['event_level_id'],  # event_level_id
                "event_level_name": res_a[0]['event_level_name'],  # event_level_id
                "rule_id": res_a[0]["rule_id"],
                "video_point_id": res_a[0]['video_point_id'],  # video_point_id
                "video_point_name": res_a[0]['video_point_name'],
                "id": res['id'],
                "event_level_id": res['event_level_id'],  # event_level_id
                "event_level_name": res['event_level_name'],  # event_level_id
                "rule_id": res["rule_id"],
                "video_point_id": res['video_point_id'],  # video_point_id
                "video_point_name": res['video_point_name'],
                "is_waning": is_waning,
                "is_desc": is_desc,
                "zh_desc_class": image_des,  # text_vector
                "bounding_box": res_a[0]['bounding_box'],  # bounding_box
                "task_id": res_a[0]['task_id'],  # task_id
                "task_name": res_a[0]['task_name'],  # task_id
                "detect_id": res_a[0]['detect_id'],  # detect_id
                "detect_time": res_a[0]['detect_time'],  # detect_time
                "detect_num": res_a[0]['detect_num'],
                "waning_value": res_a[0]['waning_value'],
                "image_path": res_a[0]['image_path'],  # image_path
                "image_desc_path": res_a[0]['image_desc_path'],  # image_desc_path
                "video_path": res_a[0]['video_path'],
                "text_vector": res_a[0]['text_vector'],
                "bounding_box": res['bounding_box'],  # bounding_box
                "task_id": res['task_id'],  # task_id
                "task_name": res['task_name'],  # task_id
                "detect_id": res['detect_id'],  # detect_id
                "detect_time": res['detect_time'],  # detect_time
                "detect_num": res['detect_num'],
                "waning_value": res['waning_value'],
                "image_path": res['image_path'],  # image_path
                "image_desc_path": res['image_desc_path'],  # image_desc_path
                "video_path": res['video_path'],
                "text_vector": res['text_vector'],
                "risk_description": risk_description,
                "suggestion": suggestion,
                "knowledge_id": res_a[0]['knowledge_id']
                "knowledge_id": res['knowledge_id']
            }
            # 保存到milvus
            image_id = self.collection.upsert(data).primary_keys
            logging.info(image_id)
            data = {
                "id": str(image_id[0]),
                "video_point_id": res_a[0]['video_point_id'],
                "video_path": res_a[0]["video_point_name"],
                "video_point_id": res['video_point_id'],
                "video_path": res["video_point_name"],
                "zh_desc_class": image_des,
                "detect_time": res_a[0]['detect_time'],
                "image_path": f"{res_a[0]['image_path']}",
                "task_name": res_a[0]["task_name"],
                "event_level_name": res_a[0]["event_level_name"],
                "rtsp_address": f"{res_a[0]['video_path']}"
                "detect_time": res['detect_time'],
                "image_path": f"{res['image_path']}",
                "task_name": res["task_name"],
                "event_level_name": res["event_level_name"],
                "rtsp_address": f"{res['video_path']}"
            }
            # 调用rag
            asyncio.run(self.insert_json_data(ragurl, data))
            return image_id
        except Exception as e:
            self.logger.info(f"{self._thread_name}线程:执行模型解析时出错:任务:{image_id} :{e}")
            return 0
            self.logger.info(f"{self._thread_name}线程:执行模型解析时出错:任务:{res['id']} :{e}")
            return 0