zhm
2025-07-24 028e39f85bf4115024d0467613f26a1750ff004e
qwen_thread.py
@@ -1,3 +1,4 @@
import logging
import time
from concurrent.futures import ThreadPoolExecutor
import threading
@@ -26,13 +27,13 @@
        # 加载集合
        self.collection = Collection(name="smartobject")
        self.collection.load()
        if config.get('cuda') == None or config.get('cuda') == '0':
        if config.get('cuda') is None or config.get('cuda') == '0':
            self.device = f"cuda"
        else:
            self.device = f"cuda:{config.get('cuda')}"
        self.model_pool = []
        self.lock_pool = [threading.Lock() for _ in range(int(config.get("threadnum")))]
        for i in range(int(config.get("threadnum"))):
        self.lock_pool = [threading.Lock() for _ in range(int(config.get("qwenwarning")))]
        for i in range(int(config.get("qwenwarning"))):
            model = AutoModelForVision2Seq.from_pretrained(
                config.get("qwenaddr"),
                device_map=self.device,
@@ -70,27 +71,15 @@
    def tark_do(self,res,ragurl,rag_mode,max_tokens):
        try:
            # 1. 从集合A获取向量和元数据
            is_waning = 0
            is_desc = 2
            # 生成图片描述
            ks_time = datetime.now()
            desc_time = datetime.now() - ks_time
            current_time = datetime.now()
            risk_description = ""
            suggestion = ""
            # 调用规则匹配方法,判断是否预警
            is_waning = self.image_rule(res)
            # 如果预警,则生成隐患描述和处理建议
            if is_waning == 1:
                # 获取规章制度数据
                filedata = self.get_filedata(res['waning_value'],res['suggestion'], ragurl)
                # 生成隐患描述
                risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
                # 生成处理建议
                suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
                #self.logger.info(f"{res['video_point_id']}执行完毕:{res['id']}:是否预警{is_waning},安全隐患:{risk_description}\n处理建议:{suggestion}")
                # 数据组
            self.logger.info(f"预警规则规则规则is_waning:{is_waning}")
            #更新数据的预警结果与数据预警状态
            data = {
                "event_level_id": res['event_level_id'],  # event_level_id
                "event_level_name": res['event_level_name'],  # event_level_id
@@ -98,7 +87,7 @@
                "video_point_id": res['video_point_id'],  # video_point_id
                "video_point_name": res['video_point_name'],
                "is_waning": is_waning,
                "is_desc": 1,
                "is_desc": 5,  #改为已经预警
                "zh_desc_class": res['zh_desc_class'],  # text_vector
                "bounding_box": res['bounding_box'],  # bounding_box
                "task_id": res['task_id'],  # task_id
@@ -119,105 +108,14 @@
            # 保存到milvus
            image_id = self.collection.insert(data).primary_keys
            res['id'] = image_id[0]
            # 图片描述生成成功
            desc = self.image_desc(res)
            if desc:
                is_desc = 2
            else:
                is_desc = 3
            # 数据组
            data = {
                "event_level_id": res['event_level_id'],  # event_level_id
                "event_level_name": res['event_level_name'],  # event_level_id
                "rule_id": res["rule_id"],
                "video_point_id": res['video_point_id'],  # video_point_id
                "video_point_name": res['video_point_name'],
                "is_waning": is_waning,
                "is_desc": is_desc,
                "zh_desc_class": desc,  # text_vector
                "bounding_box": res['bounding_box'],  # bounding_box
                "task_id": res['task_id'],  # task_id
                "task_name": res['task_name'],  # task_id
                "detect_id": res['detect_id'],  # detect_id
                "detect_time": res['detect_time'],  # detect_time
                "detect_num": res['detect_num'],
                "waning_value": res['waning_value'],
                "image_path": res['image_path'],  # image_path
                "image_desc_path": res['image_desc_path'],  # image_desc_path
                "video_path": res['video_path'],
                "text_vector": res['text_vector'],
                "risk_description": risk_description,
                "suggestion": suggestion,
                "knowledge_id": res['knowledge_id']
            }
            self.collection.delete(f"id == {res['id']}")
            # 保存到milvus
            image_id = self.collection.insert(data).primary_keys
            data = {
                "id": str(image_id[0]),
                "video_point_id": res['video_point_id'],
                "video_path": res["video_point_name"],
                "zh_desc_class": desc,
                "detect_time": res['detect_time'],
                "image_path": f"{res['image_path']}",
                "task_name": res["task_name"],
                "event_level_name": res["event_level_name"],
                "rtsp_address": f"{res['video_path']}"
            }
            # 调用rag
            asyncio.run(self.insert_json_data(ragurl, data))
            rag_time = datetime.now() - current_time
            self.logger.info(f"{res['video_point_id']}执行完毕:{image_id}运行结束总体用时:{datetime.now() - ks_time},图片描述用时{desc_time},RAG用时{rag_time}")
            if is_waning == 1:
                self.logger.info(f"{res['video_point_id']}执行完毕:{image_id},图片描述:{desc}\n隐患:{risk_description}\n建议:{suggestion}")
            self.logger.info(f"{res['video_point_id']}预警执行完毕:{image_id}运行结束总体用时:{datetime.now() - ks_time}")
            return None
        except Exception as e:
            self.logger.info(f"线程:执行模型解析时出错::{e}")
            return 0
    def image_desc(self, res_data):
        try:
            model, lock = self._acquire_model()
            image = Image.open(res_data['image_desc_path']).convert("RGB").resize((600, 600), Image.Resampling.LANCZOS)
            messages = [
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image",
                        },
                        {"type": "text", "text": "请详细描述图片中的目标信息及特征。返回格式为整段文字描述"},
                    ],
                }
            ]
            # Preparation for inference
            text = self.processor.apply_chat_template(
                messages, add_generation_prompt=True
            )
            inputs = self.processor(
                text=[text],
                images=[image],
                padding=True,
                return_tensors="pt",
            )
            inputs = inputs.to(model.device)
            with torch.inference_mode(), torch.amp.autocast(device_type=self.device, dtype=torch.float16):
                outputs = model.generate(**inputs,max_new_tokens=200,do_sample=False,num_beams=1,temperature=None,top_p=None,top_k=1,use_cache=True,repetition_penalty=1.0)
            generated_ids = outputs[:, len(inputs.input_ids[0]):]
            image_text = self.processor.batch_decode(
                generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
            )
            image_des = (image_text[0]).strip()
            #self.logger.info(f"{res_data['video_point_id']}:{res_data['id']}:{res_data['detect_time']}:{image_des}")
            return image_des
        except Exception as e:
            self.logger.info(f"线程:执行图片描述时出错:{e}")
        finally:
            # 4. 释放模型
            self._release_model(model)
            torch.cuda.empty_cache()
    def image_rule(self, res_data):
        self.logger.info(f"预警规则规则规则等级分类就是打裂缝多少积分")
        try:
            model, lock = self._acquire_model()
            image = Image.open(res_data['image_desc_path']).convert("RGB").resize((600, 600), Image.Resampling.LANCZOS)
@@ -227,7 +125,7 @@
                    "role": "user",
                    "content": [
                        {"type": "image", "image": image},
                        {"type": "text", "text": f"图片中是否有{res_data['waning_value']}?请回答yes或no"},
                        {"type": "text", "text": f"请检测图片中{res_data['waning_value']}?请回答yes或no"},
                    ],
                }
            ]
@@ -254,179 +152,19 @@
            )
            image_des = (image_text[0]).strip()
            return image_des
            upper_text = image_des.upper()
            self.logger.info(f"预警规则规则规则:{upper_text}")
            if "YES" in upper_text:
                return 1
            else:
                return 0
        except Exception as e:
            self.logger.info(f"线程:执行图片描述时出错:{e}")
            return 0
        finally:
            # 4. 释放模型
            self._release_model(model)
            torch.cuda.empty_cache()
    def get_rule(self,ragurl):
        try:
            rule_text = None
            search_data = {
                "collection_name": "smart_rule",
                "query_text": "",
                "search_mode": "hybrid",
                "limit": 100,
                "weight_dense": 0.7,
                "weight_sparse": 0.3,
                "filter_expr": "",
                "output_fields": ["text"]
            }
            response = requests.post(ragurl + "/search", json=search_data)
            results = response.json().get('results')
            rule_text = ""
            ruleid = 1
            for rule in results:
                if rule['score'] >= 0:
                    rule_text = rule_text + str(ruleid) + ". " + rule['entity'].get('text') + ";\n"
                    ruleid = ruleid + 1
            # self.logger.info(len(rule_text))
            else:
                self.logger.info(f"线程:执行获取规则时出错:{response}")
            return rule_text
        except Exception as e:
            self.logger.info(f"线程:执行获取规则时出错:{e}")
            return None
    def image_rule_chat(self, image_des,rule_text, ragurl, rag_mode,max_tokens):
        try:
            content = (
                f"图片描述内容为:\n{image_des}\n规则内容:\n{rule_text}。\n请验证图片描述中是否有不符合规则的内容,不进行推理和think。返回结果格式为[xxx符合的规则id],如果没有返回[]")
            #self.logger.info(len(content))
            search_data = {
                "prompt": "",
                "messages": [
                    {
                        "role": "user",
                        "content": content
                    }
                ],
                "llm_name": rag_mode,
                "stream": False,
                "gen_conf": {
                    "temperature": 0.7,
                    "max_tokens": max_tokens
                }
            }
            response = requests.post(ragurl + "/chat", json=search_data)
            results = response.json().get('data')
            ret = re.sub(r'<think>.*?</think>', '', results, flags=re.DOTALL)
            ret = ret.replace(" ", "").replace("\t", "").replace("\n", "")
            #self.logger.info(f"{rule_text}:{ret}")
            is_waning = 0
            if len(ret) > 2:
                is_waning = 1
            return is_waning
        except Exception as e:
            self.logger.info(f"线程:执行规则匹配时出错:{image_des, rule_text, ragurl, rag_mode,e}")
            return None
    # 隐患描述
    def image_rule_chat_with_detail(self,filedata, rule_text, ragurl, rag_mode,max_tokens):
        # API调用
        content = (
            f"规章制度为:[{filedata}]\n违反内容为:[{rule_text}]\n请查询违反内容在规章制度中的安全隐患,不进行推理和think,返回简短的文字信息")
        # self.logger.info(len(content))
        search_data = {
            "prompt": "",
            "messages": [
                {
                    "role": "user",
                    "content": content
                }
            ],
            "llm_name": rag_mode,
            "stream": False,
            "gen_conf": {
                "temperature": 0.7,
                "max_tokens": max_tokens
            }
        }
        #self.logger.info(content)
        response = requests.post(ragurl + "/chat", json=search_data)
        # 从json提取data字段内容
        ret = response.json()["data"]
        # 移除<think>标签和内容
        ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
        # 字符串清理,移除空格,制表符,换行符,星号
        ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
        #print(f"安全隐患:{ret}")
        return ret
    #处理建议
    def image_rule_chat_suggestion(self,filedata, rule_text, ragurl, rag_mode,max_tokens):
        # 请求内容
        content = (
            f"规章制度为:[{filedata}]\n违反内容为:[{rule_text}]\n请查询违反内容在规章制度中的处理建议,不进行推理和think,返回简短的文字信息")
        response = requests.post(
            # ollama地址
            url=f"{ragurl}/chat",
            json={
                # 指定模型
                "llm_name": rag_mode,
                "messages": [
                    {"role": "user", "content": content}
                ],
                "stream": False,  # 关闭流式输出
                "gen_conf": {
                    "temperature": 0.7,
                    "max_tokens": max_tokens
                }
            }
        )
        # 从json提取data字段内容
        ret = response.json()["data"]
        # 移除<think>标签和内容
        ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
        # 字符串清理,移除空格,制表符,换行符,星号
        ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
        #print(f"处理建议:{ret}")
        return ret
    # RAG服务发送请求,获取知识库内容
    def get_filedata(self, searchtext,filter_expr, ragurl):
        search_data = {
            # 知识库集合
            "collection_name": "smart_knowledge",
            # 查询文本
            "query_text": searchtext,
            # 搜索模式
            "search_mode": "hybrid",
            # 最多返回结果
            "limit": 10,
            # 调密向量搜索权重
            "weight_dense": 0.9,
            # 稀疏向量搜索权重
            "weight_sparse": 0.1,
            # 空字符串
            "filter_expr": f"docnm_kwd in {filter_expr}",
            # 只返回 text 字段
            "output_fields": ["text"]
        }
        #print(search_data)
        # 向 ragurl + "/search" 端点发送POST请求
        response = requests.post(ragurl + "/search", json=search_data)
        # 从响应中获取'results'字段
        results = response.json().get('results')
        # 初始化 text
        text = ""
        # 遍历所有结果规则(rule),将每条规则的'entity'中的'text'字段取出.
        for rule in results:
            text = text + rule['entity'].get('text') + ";\n"
        #print(text)
        return text
    async def insert_json_data(self, ragurl, data):
        try:
            data = {'collection_name': "smartrag", "data": data, "description": ""}
            requests.post(ragurl + "/insert_json_data", json=data, timeout=(0.3, 0.3))
            #self.logger.info(f"调用录像服务:{ragurl, data}")
        except Exception as e:
            #self.logger.info(f"{self._thread_name}线程:调用录像时出错:地址:{ragurl}:{e}")
            return
    def _release_semaphore(self, future):
        self.semaphore.release()
        #self.logger.info(f"释放线程 (剩余空闲: {self.semaphore._value}/{self.max_workers})")
@@ -453,29 +191,4 @@
                self.lock_pool[i].release()
                break
    def remove_duplicate_lines(self,text):
        seen = set()
        result = []
        for line in text.split('。'):  # 按句号分割
            if line.strip() and line not in seen:
                seen.add(line)
                result.append(line)
        return '。'.join(result)
    def remove_duplicate_lines_d(self,text):
        seen = set()
        result = []
        for line in text.split(','):  # 按句号分割
            if line.strip() and line not in seen:
                seen.add(line)
                result.append(line)
        return '。'.join(result)
    def remove_duplicate_lines_n(self,text):
        seen = set()
        result = []
        for line in text.split('\n'):  # 按句号分割
            if line.strip() and line not in seen:
                seen.add(line)
                result.append(line)
        return '。'.join(result)