From dabd20efe434704c96044dbd6c084e8266d1ef3f Mon Sep 17 00:00:00 2001
From: xqz <837938965@qq.com>
Date: 星期二, 08 七月 2025 18:34:40 +0800
Subject: [PATCH] Merge remote-tracking branch 'origin/develop' into develop
---
qwen_detect.py | 92 +++--
qwen_detect_batch.py | 249 ++++++++++++++
qwen_thread_batch.py | 331 +++++++++++++++++++
qwen_thread.py | 316 ++++++++++++++++++
4 files changed, 950 insertions(+), 38 deletions(-)
diff --git a/qwen_detect.py b/qwen_detect.py
index db6ccea..29788f2 100644
--- a/qwen_detect.py
+++ b/qwen_detect.py
@@ -1,18 +1,16 @@
-# 绂佺敤tokenizer骞惰锛堝繀椤绘斁鍦ㄦ墍鏈塱mport涔嬪墠锛�
+from operator import itemgetter
import torch
import threading
import time as time_sel
from typing import Dict
-import qwen_task
+from qwen_thread import qwen_thread
import requests
import os
-
import logging
from transformers import AutoProcessor, AutoModelForVision2Seq
from pymilvus import connections, Collection
from logging.handlers import RotatingFileHandler
import get_mem
-
class ThreadPool:
def __init__(self):
@@ -40,24 +38,12 @@
self.threads: Dict[str, threading.Thread] = {}
self.lock = threading.Lock()
- self.device = "cuda" if torch.cuda.is_available() else "cpu"
-
- # 鍒濆鍖杚wenvl妫�娴嬫ā鍨�
- self.qwen_model = AutoModelForVision2Seq.from_pretrained(
- "./Qwen2-VL-2B-Instruct", device_map="auto",
- trust_remote_code=True,
- torch_dtype=torch.float16
- )
-
- # default processer
- self.qwen_tokenizer = AutoProcessor.from_pretrained("./Qwen2-VL-2B-Instruct")
-
# 鍒濆鍖朚ilvus闆嗗悎
connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport"))
# 鍔犺浇闆嗗悎
self.collection = Collection(name="smartobject")
self.collection.load()
-
+ self.pool = qwen_thread(int(self.config.get("threadnum")), self.config,"/home/debian/Qwen2.5-VL-3B-Instruct-GPTQ-Int4")
#鏄惁鏇存柊
self._isupdate = False
@@ -103,31 +89,61 @@
# 鍚姩绾跨▼浠诲姟
def worker(self, camera_id):
- # 鍒濆鍖�
- detect = qwen_task.detect_tasks()
- detect._thread_name = f"{camera_id}" # 璁剧疆鍚嶇О
- detect.init_logging(f"{camera_id}")
- # 鍒濆鍖栨娴嬫ā鍨�
- detect.device = self.device
- # 鍒濆鍖杚wen妯″瀷
- detect.qwen_tokenizer = self.qwen_tokenizer
- detect.qwen_model = self.qwen_model
-
- # 鍒濆鍖朚ilvus闆嗗悎
- detect.collection = self.collection
-
- # 璁剧疆绾跨▼鍚姩鐘舵��
- detect._running = True
while True:
try:
+ res_a = self.collection.query(
+ expr=f"is_desc == 0 and video_point_id=={camera_id}",
+ output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
+ "task_name", "event_level_id", "event_level_name",
+ "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id",
+ "detect_id",
+ "detect_time", "image_path", "image_desc_path", "video_path"],
+ consistency_level="Strong",
+ order_by_field="id", # 鎸塱d瀛楁鎺掑簭
+ order_by_type="desc" # 闄嶅簭鎺掑垪
+ )
+
# 璇诲彇鍏变韩鍐呭瓨涓殑鍥剧墖
- image_id = get_mem.smem_read_frame_qianwen(camera_id)
- if image_id > 0:
- logging.info(f"璇诲彇鍥惧儚鎴愬姛: {image_id}")
- image_id = detect.tark_do(image_id,self.config.get("filesavepath"),self.config.get("ragurl"),self.config.get("max_tokens"))
- logging.info(f"澶勭悊鍥惧儚鎴愬姛: {image_id}")
+ # image_id = get_mem.smem_read_frame_qianwen(camera_id)
+ if len(res_a) > 0:
+ sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
+ # 鏌ヨ鍓峃涓渶澶х殑ID
+ num = int(self.config.get("threadnum")) - 1
+ res_a = sorted_results[:num]
+ for res in res_a:
+ data = {
+ "id": res['id'],
+ "event_level_id": res['event_level_id'], # event_level_id
+ "event_level_name": res['event_level_name'], # event_level_id
+ "rule_id": res["rule_id"],
+ "video_point_id": res['video_point_id'], # video_point_id
+ "video_point_name": res['video_point_name'],
+ "is_waning": 0,
+ "is_desc": 1,
+ "zh_desc_class": res['zh_desc_class'], # text_vector
+ "bounding_box": res['bounding_box'], # bounding_box
+ "task_id": res['task_id'], # task_id
+ "task_name": res['task_name'], # task_id
+ "detect_id": res['detect_id'], # detect_id
+ "detect_time": res['detect_time'], # detect_time
+ "detect_num": res['detect_num'],
+ "waning_value": res['waning_value'],
+ "image_path": res['image_path'], # image_path
+ "image_desc_path": res['image_desc_path'], # image_desc_path
+ "video_path": res['video_path'],
+ "text_vector": res['text_vector']
+ }
+ # logging.info(f"璇诲彇鍥惧儚鎴愬姛: {res['id']}")
+ # 淇濆瓨鍒癿ilvus
+ image_id = self.collection.upsert(data).primary_keys
+ res['id'] = image_id[0]
+ # logging.info(f"璇诲彇鍥惧儚鎴愬姛: {image_id}")
+ image_id = self.pool.submit(res)
+ # image_id = pool.tark_do(image_id,self.config.get("ragurl"),self.config.get("ragmode"),self.config.get("max_tokens"))
+ # logging.info(f"澶勭悊鍥惧儚鎴愬姛: {image_id}")
+ sorted_results = None
except Exception as e:
- logging.info(f"{detect._thread_name}绾跨▼閿欒:{e}")
+ logging.info(f"{camera_id}绾跨▼閿欒:{e}")
#璋冪敤鏄惁闇�瑕佹洿鏂�
def isUpdate(self):
diff --git a/qwen_detect_batch.py b/qwen_detect_batch.py
new file mode 100644
index 0000000..6389eb6
--- /dev/null
+++ b/qwen_detect_batch.py
@@ -0,0 +1,249 @@
+from operator import itemgetter
+import threading
+import time as time_sel
+from typing import Dict
+from qwen_thread_batch import qwen_thread_batch
+import requests
+import os
+import logging
+from pymilvus import connections, Collection
+from logging.handlers import RotatingFileHandler
+import get_mem
+
+class ThreadPool:
+ def __init__(self):
+ #璇诲彇閰嶇疆鏂囦欢
+ self.config = {}
+ with open('./conf.txt', 'r', encoding='utf-8') as file:
+ for line in file:
+ # 鍘婚櫎姣忚鐨勯灏剧┖鐧藉瓧绗︼紙鍖呮嫭鎹㈣绗︼級
+ line = line.strip()
+ # 璺宠繃绌鸿
+ if not line:
+ continue
+ # 鍒嗗壊閿拰鍊�
+ if '=' in line:
+ key, value = line.split('=', 1)
+ # 鍘婚櫎閿拰鍊肩殑棣栧熬绌虹櫧瀛楃
+ key = key.strip()
+ value = value.strip()
+ # 灏嗛敭鍊煎娣诲姞鍒板瓧鍏镐腑
+ self.config[key] = value
+ # 閰嶇疆鏃ュ織
+ # 纭繚鏃ュ織鐩綍瀛樺湪
+ log_dir = "logs"
+ os.makedirs(log_dir, exist_ok=True)
+ self.threads: Dict[str, threading.Thread] = {}
+ self.lock = threading.Lock()
+
+ # 鍒濆鍖朚ilvus闆嗗悎
+ connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport"))
+ # 鍔犺浇闆嗗悎
+ self.collection = Collection(name="smartobject")
+ self.collection.load()
+ self.pool = qwen_thread_batch(int(self.config.get("threadnum")), self.config,"/home/debian/Qwen2.5-VL-3B-Instruct-GPTQ-Int4")
+ #鏄惁鏇存柊
+ self._isupdate = False
+
+ # 鍒濆鍖栧叡浜唴瀛�
+ get_mem.smem_init()
+
+ # 閰嶇疆鏃ュ織
+ logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s',
+ datefmt='%Y-%m-%d %H:%M:%S',
+ handlers=[
+ # 鎸夊ぇ灏忚疆杞殑鏃ュ織鏂囦欢锛堟渶澶�10MB锛屼繚鐣�3涓浠斤級
+ RotatingFileHandler(
+ filename=os.path.join(log_dir, 'start_log.log'),
+ maxBytes=10 * 1024 * 1024, # 10MB
+ backupCount=3,
+ encoding='utf-8'
+ ),
+ # 鍚屾椂杈撳嚭鍒版帶鍒跺彴
+ logging.StreamHandler()
+ ]
+ )
+
+ #鍚姩绾跨▼
+ def safe_start(self, target_func, camera_id):
+ """绾跨▼瀹夊叏鍚姩鏂规硶"""
+ def wrapped():
+ thread_name = threading.current_thread().name
+ try:
+ target_func(camera_id)
+ except Exception as e:
+ logging.error(f"绾跨▼寮傚父: {str(e)}", exc_info=True)
+
+ with self.lock: # 纭繚绾跨▼瀹夊叏鍒涘缓
+ t = threading.Thread(
+ target=wrapped,
+ daemon=True # 璁剧疆涓哄畧鎶ょ嚎绋�
+ )
+ t.start()
+ self.threads[camera_id] = t
+ return t
+
+ # 鍚姩绾跨▼浠诲姟
+ def worker(self, camera_id):
+ while True:
+ try:
+ res_a = self.collection.query(
+ expr=f"is_desc == 0 and video_point_id=={camera_id}",
+ output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
+ "task_name", "event_level_id", "event_level_name",
+ "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id",
+ "detect_id",
+ "detect_time", "image_path", "image_desc_path", "video_path"],
+ consistency_level="Strong",
+ order_by_field="id", # 鎸塱d瀛楁鎺掑簭
+ order_by_type="desc" # 闄嶅簭鎺掑垪
+ )
+
+ # 璇诲彇鍏变韩鍐呭瓨涓殑鍥剧墖
+ # image_id = get_mem.smem_read_frame_qianwen(camera_id)
+ if len(res_a) > 0:
+ sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
+ # 鏌ヨ鍓峃涓渶澶х殑ID
+ res_a = sorted_results[:int(self.config.get("detectnum"))-1]
+ res_data = []
+ for res in res_a:
+ data = {
+ "id": res['id'],
+ "event_level_id": res['event_level_id'], # event_level_id
+ "event_level_name": res['event_level_name'], # event_level_id
+ "rule_id": res["rule_id"],
+ "video_point_id": res['video_point_id'], # video_point_id
+ "video_point_name": res['video_point_name'],
+ "is_waning": 0,
+ "is_desc": 1,
+ "zh_desc_class": res['zh_desc_class'], # text_vector
+ "bounding_box": res['bounding_box'], # bounding_box
+ "task_id": res['task_id'], # task_id
+ "task_name": res['task_name'], # task_id
+ "detect_id": res['detect_id'], # detect_id
+ "detect_time": res['detect_time'], # detect_time
+ "detect_num": res['detect_num'],
+ "waning_value": res['waning_value'],
+ "image_path": res['image_path'], # image_path
+ "image_desc_path": res['image_desc_path'], # image_desc_path
+ "video_path": res['video_path'],
+ "text_vector": res['text_vector']
+ }
+ # logging.info(f"璇诲彇鍥惧儚鎴愬姛: {res['id']}")
+ # 淇濆瓨鍒癿ilvus
+ image_id = self.collection.upsert(data).primary_keys
+ res['id'] = image_id[0]
+ res_data.append(res)
+ # logging.info(f"璇诲彇鍥惧儚鎴愬姛: {image_id}")
+ self.pool.submit(res_data)
+ # image_id = pool.tark_do(image_id,self.config.get("ragurl"),self.config.get("ragmode"),self.config.get("max_tokens"))
+ # logging.info(f"澶勭悊鍥惧儚鎴愬姛: {image_id}")
+ sorted_results = None
+ except Exception as e:
+ logging.info(f"{camera_id}绾跨▼閿欒:{e}")
+
+ #璋冪敤鏄惁闇�瑕佹洿鏂�
+ def isUpdate(self):
+ try:
+ # 瀹氫箟璇锋眰鐨� URL
+ url = self.config.get("isupdateurl")
+ # 鍙戦�� GET 璇锋眰
+ response = requests.get(url)
+
+ # 妫�鏌ュ搷搴旂姸鎬佺爜
+ if response.status_code == 200:
+ data = response.json().get("data")
+ if data.get("isChange") == 1:
+ return True
+ else:
+ return False
+ except Exception as e:
+ logging.info(f"璋冪敤鏄惁闇�瑕佹洿鏂版椂鍑洪敊:URL:{self.config.get('isupdateurl')}:{e}")
+ return False
+
+ #淇敼鏄惁鏇存柊鐘舵��
+ def update_status(self):
+ try:
+ # 鏇存柊鐘舵��
+ url = self.config.get("updatestatusurl")
+ # 鍙戦�� GET 璇锋眰
+ response = requests.post(url)
+ # 妫�鏌ュ搷搴旂姸鎬佺爜
+ if response.status_code == 200:
+ return True
+ else:
+ return False
+ except Exception as e:
+ logging.info(f"淇敼鏄惁鏇存柊鐘舵�佹椂鍑洪敊:URL:{self.config.get('updatestatusurl')}:{e}")
+ return False
+
+ def shutdown_all(self) -> None:
+ """娓呯悊鎵�鏈夌嚎绋�"""
+ with self.lock:
+ for camera_id, thread in list(self.threads.items()):
+ if thread.is_alive():
+ thread.join(timeout=1)
+ del self.threads[camera_id]
+
+ #鑾峰彇浠诲姟
+ def getTaskconf(self,isupdate):
+ try:
+ # 瀹氫箟璇锋眰鐨� URL
+ url = self.config.get("gettaskconfurl")
+ # 鍙戦�� GET 璇锋眰
+ response = requests.get(url)
+ # 妫�鏌ュ搷搴旂姸鎬佺爜
+ if response.status_code == 200:
+ data = response.json()
+ if isupdate:
+ # 鏇存柊鐘舵��
+ self.update_status()
+ return data.get("data")
+ else:
+ return []
+ except Exception as e:
+ logging.info(f"璋冪敤鑾峰彇浠诲姟鏃跺嚭閿�:URL:{self.config.get('gettaskconfurl')}:{e}")
+ return []
+
+# 浣跨敤绀轰緥
+if __name__ == "__main__":
+ pool = ThreadPool()
+ is_init = True
+ camera_data = pool.getTaskconf(False)
+ while True:
+ try:
+ pool._isupdate = False # 鏄惁鏇存柊鏁版嵁
+ # 鏄惁闇�瑕佹洿鏂颁换鍔℃暟鎹�
+ if pool.isUpdate():
+ # 鑾峰彇鎽勫儚鏈轰换鍔�
+ camera_data = pool.getTaskconf(True)
+ pool._isupdate = True # 鏇存柊鏁版嵁
+
+ if is_init:
+ if camera_data:
+ for camera in camera_data:
+ thread = pool.threads.get(camera.get("camera_id"))
+ if not thread:
+ logging.info(f"寮�濮嬪垱寤簕camera.get('camera_id')}绾跨▼")
+ pool.safe_start(pool.worker, camera.get('camera_id'))
+ logging.info(f"{camera.get('camera_id')}绾跨▼鍒涘缓瀹屾瘯")
+
+ if pool._isupdate:
+ logging.info(f"鏇存柊绾跨▼寮�濮�")
+ pool.shutdown_all()
+ if camera_data:
+ for camera in camera_data:
+ thread = pool.threads.get(camera.get("camera_id"))
+ if not thread:
+ logging.info(f"寮�濮嬪垱寤簕camera.get('camera_id')}绾跨▼")
+ pool.safe_start(pool.worker, camera.get('camera_id'))
+ logging.info(f"{camera.get('camera_id')}绾跨▼鍒涘缓瀹屾瘯")
+
+ logging.info(f"鏇存柊绾跨▼缁撴潫")
+
+ is_init = False
+ time_sel.sleep(1)
+ except Exception as e:
+ logging.info(f"涓荤嚎绋嬫湭鐭ラ敊璇�:{e}")
diff --git a/qwen_thread.py b/qwen_thread.py
new file mode 100644
index 0000000..56ba944
--- /dev/null
+++ b/qwen_thread.py
@@ -0,0 +1,316 @@
+import time
+from concurrent.futures import ThreadPoolExecutor
+import threading
+
+import torch
+from PIL import Image
+from pymilvus import connections, Collection
+from datetime import datetime
+import os
+import requests
+import asyncio
+import logging
+import re
+from logging.handlers import RotatingFileHandler
+
+from transformers import AutoModelForVision2Seq, AutoProcessor
+
+
+class qwen_thread:
+ def __init__(self, max_workers,config,model_path):
+ self.executor = ThreadPoolExecutor(max_workers=max_workers)
+ self.semaphore = threading.Semaphore(max_workers)
+ self.max_workers = max_workers
+ # 鍒濆鍖朚ilvus闆嗗悎
+ connections.connect("default", host=config.get("milvusurl"), port=config.get("milvusport"))
+ # 鍔犺浇闆嗗悎
+ self.collection = Collection(name="smartobject")
+ self.collection.load()
+
+ self.config = config
+ self.model_pool = []
+ self.lock_pool = [threading.Lock() for _ in range(max_workers)]
+ for i in range(max_workers):
+ model = AutoModelForVision2Seq.from_pretrained(
+ model_path,
+ device_map="cuda:1",
+ trust_remote_code=True,
+ torch_dtype=torch.float16
+ ).eval()
+ self.model_pool.append(model)
+
+ # 鍏变韩鐨勫鐞嗗櫒 (绾跨▼瀹夊叏)
+ self.processor = AutoProcessor.from_pretrained(model_path,use_fast=True)
+
+
+ # 鍒涘缓瀹炰緥涓撳睘logger
+ self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
+ self.logger.setLevel(logging.INFO)
+ # 閬垮厤閲嶅娣诲姞handler
+ if not self.logger.handlers:
+ handler = RotatingFileHandler(
+ filename=os.path.join("logs", 'thread_log.log'),
+ maxBytes=10 * 1024 * 1024,
+ backupCount=3,
+ encoding='utf-8'
+ )
+ formatter = logging.Formatter(
+ '%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s'
+ )
+ handler.setFormatter(formatter)
+ self.logger.addHandler(handler)
+
+ def submit(self,res_a):
+ # 灏濊瘯鑾峰彇淇″彿閲忥紙闈為樆濉烇級
+ acquired = self.semaphore.acquire(blocking=False)
+
+ if not acquired:
+ self.logger.info(f"绾跨▼姹犲凡婊★紝绛夊緟绌洪棽绾跨▼... (褰撳墠娲昏穬: {self.max_workers - self.semaphore._value}/{self.max_workers})")
+ # 闃诲绛夊緟鐩村埌鏈夊彲鐢ㄧ嚎绋�
+ self.semaphore.acquire(blocking=True)
+
+ future = self.executor.submit(self._wrap_task, res_a)
+ future.add_done_callback(self._release_semaphore)
+ return future
+
+ def _wrap_task(self, res):
+ try:
+ #self.logger.info(f"澶勭悊: { res['id']}寮�濮�")
+ current_time = datetime.now()
+ image_id = self.tark_do(res, self.config.get("ragurl"), self.config.get("ragmode"), self.config.get("max_tokens"))
+ self.logger.info(f"澶勭悊: { res['id']}瀹屾瘯{image_id}:{datetime.now() - current_time}")
+ return image_id
+ except Exception as e:
+ self.logger.info(f"浠诲姟 { res['id']} 澶勭悊鍑洪敊: {e}")
+ raise
+
+ def tark_do(self,res,ragurl,rag_mode,max_tokens):
+ try :
+ # 1. 浠庨泦鍚圓鑾峰彇鍚戦噺鍜屽厓鏁版嵁
+ is_waning = 0
+ image_des = self.image_desc(f"{res['image_desc_path']}")
+ self.logger.info(image_des)
+ if image_des:
+ # rule_text = self.get_rule(ragurl)
+ is_waning = self.image_rule_chat(image_des,res['waning_value'],ragurl,rag_mode,max_tokens)
+ is_desc = 2
+ else:
+ is_waning = 0
+ is_desc = 3
+ data = {
+ "id": res['id'],
+ "event_level_id": res['event_level_id'], # event_level_id
+ "event_level_name": res['event_level_name'], # event_level_id
+ "rule_id": res["rule_id"],
+ "video_point_id": res['video_point_id'], # video_point_id
+ "video_point_name": res['video_point_name'],
+ "is_waning": is_waning,
+ "is_desc": is_desc,
+ "zh_desc_class": image_des, # text_vector
+ "bounding_box": res['bounding_box'], # bounding_box
+ "task_id": res['task_id'], # task_id
+ "task_name": res['task_name'], # task_id
+ "detect_id": res['detect_id'], # detect_id
+ "detect_time": res['detect_time'], # detect_time
+ "detect_num": res['detect_num'],
+ "waning_value": res['waning_value'],
+ "image_path": res['image_path'], # image_path
+ "image_desc_path": res['image_desc_path'], # image_desc_path
+ "video_path": res['video_path'],
+ "text_vector": res['text_vector']
+ }
+ # 淇濆瓨鍒癿ilvus
+ image_id = self.collection.upsert(data).primary_keys
+ if is_desc == 2:
+ data = {
+ "id": str(image_id[0]),
+ "video_point_id": res['video_point_id'],
+ "video_path": res["video_point_name"],
+ "zh_desc_class": image_des,
+ "detect_time": res['detect_time'],
+ "image_path": f"{res['image_path']}",
+ "task_name": res["task_name"],
+ "event_level_name": res["event_level_name"],
+ "rtsp_address": f"{res['video_path']}"
+ }
+ # 璋冪敤rag
+ asyncio.run(self.insert_json_data(ragurl, data))
+ return image_id
+ except Exception as e:
+ self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊:浠诲姟锛歿e}")
+ return 0
+
+ def image_desc(self, image_path):
+ try:
+ model, lock = self._acquire_model()
+ # 2. 澶勭悊鍥惧儚
+ image = Image.open(image_path).convert("RGB") # 鏇挎崲涓烘偍鐨勫浘鐗囪矾寰�
+ image = image.resize((600, 600), Image.Resampling.LANCZOS) # 楂樿川閲忕缉鏀�
+ messages = [
+ {
+ "role": "user",
+ "content": [
+ {
+ "type": "image",
+ },
+ {"type": "text", "text": "璇疯缁嗘弿杩板浘鐗囦腑鐨勭洰鏍囦俊鎭強鐗瑰緛銆傝繑鍥炴牸寮忎负鏁存鏂囧瓧鎻忚堪"},
+ ],
+ }
+ ]
+ # Preparation for inference
+ text = self.processor.apply_chat_template(
+ messages, add_generation_prompt=True
+ )
+ inputs = self.processor(
+ text=[text],
+ images=[image],
+ padding=True,
+ return_tensors="pt",
+ )
+ inputs = inputs.to("cuda:1")
+ current_time = datetime.now()
+ outputs = model.generate(**inputs,
+ max_new_tokens=300,
+ do_sample=True,
+ temperature=0.7,
+ renormalize_logits=True
+ )
+ print(f"澶勭悊瀹屾瘯:{datetime.now() - current_time}")
+ generated_ids = outputs[:, len(inputs.input_ids[0]):]
+ image_text = self.processor.batch_decode(
+ generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
+ )
+ image_des = (image_text[0]).strip()
+ return image_des
+ except Exception as e:
+ self.logger.info(f"绾跨▼锛氭墽琛屽浘鐗囨弿杩版椂鍑洪敊:{e}")
+ finally:
+ # 4. 閲婃斁妯″瀷
+ self._release_model(model)
+ torch.cuda.empty_cache()
+
+ def get_rule(self,ragurl):
+ try:
+ rule_text = None
+ search_data = {
+ "collection_name": "smart_rule",
+ "query_text": "",
+ "search_mode": "hybrid",
+ "limit": 100,
+ "weight_dense": 0.7,
+ "weight_sparse": 0.3,
+ "filter_expr": "",
+ "output_fields": ["text"]
+ }
+ response = requests.post(ragurl + "/search", json=search_data)
+ results = response.json().get('results')
+ rule_text = ""
+ ruleid = 1
+ for rule in results:
+ if rule['score'] >= 0:
+ rule_text = rule_text + str(ruleid) + ". " + rule['entity'].get('text') + ";\n"
+ ruleid = ruleid + 1
+ # self.logger.info(len(rule_text))
+ else:
+ self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{response}")
+ return rule_text
+ except Exception as e:
+ self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{e}")
+ return None
+
+ def image_rule_chat(self, image_des,rule_text, ragurl, rag_mode,max_tokens):
+ try:
+ content = (
+ f"鍥剧墖鎻忚堪鍐呭涓猴細\n{image_des}\n瑙勫垯鍐呭锛歕n{rule_text}銆俓n璇烽獙璇佸浘鐗囨弿杩颁腑鏄惁鏈夌鍚堣鍒欑殑鍐呭锛屼笉杩涜鎺ㄧ悊鍜宼hink銆傝繑鍥炵粨鏋滄牸寮忎负[xxx绗﹀悎鐨勮鍒檌d]锛屽鏋滄病鏈夎繑鍥瀃]")
+ # self.logger.info(content)
+ #self.logger.info(len(content))
+ search_data = {
+ "prompt": "",
+ "messages": [
+ {
+ "role": "user",
+ "content": content
+ }
+ ],
+ "llm_name": rag_mode,
+ "stream": False,
+ "gen_conf": {
+ "temperature": 0.7,
+ "max_tokens": max_tokens
+ }
+ }
+ response = requests.post(ragurl + "/chat", json=search_data)
+ results = response.json().get('data')
+ #self.logger.info(len(results))
+ # self.logger.info(results)
+ ret = re.sub(r'<think>.*?</think>', '', results, flags=re.DOTALL)
+ ret = ret.replace(" ", "").replace("\t", "").replace("\n", "")
+ is_waning = 0
+ if len(ret) > 2:
+ is_waning = 1
+ return is_waning
+ except Exception as e:
+ self.logger.info(f"绾跨▼锛氭墽琛岃鍒欏尮閰嶆椂鍑洪敊:{image_des, rule_text, ragurl, rag_mode,e}")
+ return None
+
+ async def insert_json_data(self, ragurl, data):
+ try:
+ data = {'collection_name': "smartrag", "data": data, "description": ""}
+ requests.post(ragurl + "/insert_json_data", json=data, timeout=(0.3, 0.3))
+ #self.logger.info(f"璋冪敤褰曞儚鏈嶅姟:{ragurl, data}")
+ except Exception as e:
+ #self.logger.info(f"{self._thread_name}绾跨▼锛氳皟鐢ㄥ綍鍍忔椂鍑洪敊:鍦板潃锛歿ragurl}锛歿e}")
+ return
+
+ def _release_semaphore(self, future):
+ self.semaphore.release()
+ self.logger.info(f"閲婃斁绾跨▼ (鍓╀綑绌洪棽: {self.semaphore._value}/{self.max_workers})")
+
+ def shutdown(self):
+ """瀹夊叏鍏抽棴"""
+ self.executor.shutdown(wait=False)
+ for model in self.model_pool:
+ del model
+ torch.cuda.empty_cache()
+
+ def _acquire_model(self):
+ """浠庢睜涓幏鍙栦竴涓┖闂叉ā鍨� (绠�鍗曡疆璇�)"""
+ while True:
+ for i, (model, lock) in enumerate(zip(self.model_pool, self.lock_pool)):
+ if lock.acquire(blocking=False):
+ return model, lock
+ time.sleep(0.1) # 閬垮厤CPU绌鸿浆
+
+ def _release_model(self, model):
+ """閲婃斁妯″瀷鍥炴睜"""
+ for i, m in enumerate(self.model_pool):
+ if m == model:
+ self.lock_pool[i].release()
+ break
+
+
+ def remove_duplicate_lines(self,text):
+ seen = set()
+ result = []
+ for line in text.split('銆�'): # 鎸夊彞鍙峰垎鍓�
+ if line.strip() and line not in seen:
+ seen.add(line)
+ result.append(line)
+ return '銆�'.join(result)
+ def remove_duplicate_lines_d(self,text):
+ seen = set()
+ result = []
+ for line in text.split(','): # 鎸夊彞鍙峰垎鍓�
+ if line.strip() and line not in seen:
+ seen.add(line)
+ result.append(line)
+ return '銆�'.join(result)
+ def remove_duplicate_lines_n(self,text):
+ seen = set()
+ result = []
+ for line in text.split('\n'): # 鎸夊彞鍙峰垎鍓�
+ if line.strip() and line not in seen:
+ seen.add(line)
+ result.append(line)
+ return '銆�'.join(result)
+
diff --git a/qwen_thread_batch.py b/qwen_thread_batch.py
new file mode 100644
index 0000000..c4f497c
--- /dev/null
+++ b/qwen_thread_batch.py
@@ -0,0 +1,331 @@
+import time
+from concurrent.futures import ThreadPoolExecutor
+import threading
+
+import torch
+from PIL import Image
+from pymilvus import connections, Collection
+from datetime import datetime
+import os
+import requests
+import asyncio
+import logging
+import re
+from logging.handlers import RotatingFileHandler
+
+from transformers import AutoModelForVision2Seq, AutoProcessor, BitsAndBytesConfig
+
+
+class qwen_thread_batch:
+ def __init__(self, max_workers,config,model_path):
+ self.executor = ThreadPoolExecutor(max_workers=max_workers)
+ self.semaphore = threading.Semaphore(max_workers)
+ self.max_workers = max_workers
+ # 鍒濆鍖朚ilvus闆嗗悎
+ connections.connect("default", host=config.get("milvusurl"), port=config.get("milvusport"))
+ # 鍔犺浇闆嗗悎
+ self.collection = Collection(name="smartobject")
+ self.collection.load()
+
+ self.config = config
+ self.model_pool = []
+ self.lock_pool = [threading.Lock() for _ in range(max_workers)]
+ quant_config = BitsAndBytesConfig(
+ load_in_4bit=True,
+ bnb_4bit_compute_dtype=torch.float16,
+ bnb_4bit_quant_type="nf4",
+ bnb_4bit_use_double_quant=True
+ )
+ for i in range(max_workers):
+ model = AutoModelForVision2Seq.from_pretrained(
+ model_path,
+ device_map="cuda:1",
+ trust_remote_code=True,
+ quantization_config=quant_config,
+ use_flash_attention_2=True,
+
+ ).eval()
+ self.model_pool.append(model)
+
+ # 鍏变韩鐨勫鐞嗗櫒 (绾跨▼瀹夊叏)
+ self.processor = AutoProcessor.from_pretrained(model_path,use_fast=True)
+
+
+ # 鍒涘缓瀹炰緥涓撳睘logger
+ self.logger = logging.getLogger(f"{self.__class__}_{id(self)}")
+ self.logger.setLevel(logging.INFO)
+ # 閬垮厤閲嶅娣诲姞handler
+ if not self.logger.handlers:
+ handler = RotatingFileHandler(
+ filename=os.path.join("logs", 'thread_log.log'),
+ maxBytes=10 * 1024 * 1024,
+ backupCount=3,
+ encoding='utf-8'
+ )
+ formatter = logging.Formatter(
+ '%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s'
+ )
+ handler.setFormatter(formatter)
+ self.logger.addHandler(handler)
+
+ def submit(self,res_a):
+ # 灏濊瘯鑾峰彇淇″彿閲忥紙闈為樆濉烇級
+ acquired = self.semaphore.acquire(blocking=False)
+
+ if not acquired:
+ #self.logger.info(f"绾跨▼姹犲凡婊★紝绛夊緟绌洪棽绾跨▼... (褰撳墠娲昏穬: {self.max_workers - self.semaphore._value}/{self.max_workers})")
+ # 闃诲绛夊緟鐩村埌鏈夊彲鐢ㄧ嚎绋�
+ self.semaphore.acquire(blocking=True)
+
+ future = self.executor.submit(self._wrap_task, res_a)
+ future.add_done_callback(self._release_semaphore)
+ return future
+
+ def _wrap_task(self, res_a):
+ try:
+ self.tark_do(res_a, self.config.get("ragurl"), self.config.get("ragmode"), self.config.get("max_tokens"))
+ except Exception as e:
+ self.logger.info(f"澶勭悊鍑洪敊: {e}")
+ raise
+
+ def tark_do(self,res_a,ragurl,rag_mode,max_tokens):
+ try :
+ current_time = datetime.now()
+ # 1. 浠庨泦鍚圓鑾峰彇鍚戦噺鍜屽厓鏁版嵁
+ is_waning = 0
+ desc_list = self.image_desc(res_a)
+ if desc_list:
+ for desc,res in zip(desc_list,res_a):
+ if desc:
+ # rule_text = self.get_rule(ragurl)
+ is_waning = self.image_rule_chat(desc,res['waning_value'],ragurl,rag_mode,max_tokens)
+ is_desc = 2
+ else:
+ is_waning = 0
+ is_desc = 3
+ data = {
+ "id": res['id'],
+ "event_level_id": res['event_level_id'], # event_level_id
+ "event_level_name": res['event_level_name'], # event_level_id
+ "rule_id": res["rule_id"],
+ "video_point_id": res['video_point_id'], # video_point_id
+ "video_point_name": res['video_point_name'],
+ "is_waning": is_waning,
+ "is_desc": is_desc,
+ "zh_desc_class": desc, # text_vector
+ "bounding_box": res['bounding_box'], # bounding_box
+ "task_id": res['task_id'], # task_id
+ "task_name": res['task_name'], # task_id
+ "detect_id": res['detect_id'], # detect_id
+ "detect_time": res['detect_time'], # detect_time
+ "detect_num": res['detect_num'],
+ "waning_value": res['waning_value'],
+ "image_path": res['image_path'], # image_path
+ "image_desc_path": res['image_desc_path'], # image_desc_path
+ "video_path": res['video_path'],
+ "text_vector": res['text_vector']
+ }
+ # 淇濆瓨鍒癿ilvus
+ image_id = self.collection.upsert(data).primary_keys
+ #self.logger.info(f"{res['id']}--{image_id}:{desc}")
+ if is_desc == 2:
+ data = {
+ "id": str(image_id[0]),
+ "video_point_id": res['video_point_id'],
+ "video_path": res["video_point_name"],
+ "zh_desc_class": desc,
+ "detect_time": res['detect_time'],
+ "image_path": f"{res['image_path']}",
+ "task_name": res["task_name"],
+ "event_level_name": res["event_level_name"],
+ "rtsp_address": f"{res['video_path']}"
+ }
+ # 璋冪敤rag
+ asyncio.run(self.insert_json_data(ragurl, data))
+ self.logger.info(f"澶勭悊瀹屾瘯:{datetime.now() - current_time}:{len(res_a)}")
+ except Exception as e:
+ self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊:浠诲姟锛歿e}")
+ return 0
+
+ def image_desc(self, res_data):
+ try:
+ model, lock = self._acquire_model()
+ image_data = []
+ for res in res_data:
+ # 2. 澶勭悊鍥惧儚
+ image = Image.open(f"{res['image_desc_path']}").convert("RGB") # 鏇挎崲涓烘偍鐨勫浘鐗囪矾寰�
+ image = image.resize((448, 448), Image.Resampling.LANCZOS) # 楂樿川閲忕缉鏀�
+ image_data.append(image)
+
+ messages = [
+ {
+ "role": "user",
+ "content": [
+ {
+ "type": "image",
+ },
+ {"type": "text", "text": "璇疯缁嗘弿杩板浘鐗囦腑鐨勭洰鏍囦俊鎭強鐗瑰緛銆傝繑鍥炴牸寮忎负鏁存鏂囧瓧鎻忚堪"},
+ ],
+ }
+ ]
+ # Preparation for inference
+ text = self.processor.apply_chat_template(
+ messages, add_generation_prompt=True
+ )
+ inputs = self.processor(
+ text=[text] * len(image_data),
+ images=[image_data],
+ padding=True,
+ return_tensors="pt",
+ )
+ inputs = inputs.to("cuda:1")
+ current_time = datetime.now()
+ with torch.inference_mode():
+ outputs = model.generate(**inputs,
+ max_new_tokens=50,
+ do_sample=False,
+ temperature=0.7,
+ top_k=40,
+ num_beams=1,
+ repetition_penalty= 1.1
+ )
+ generated_ids = outputs[:, len(inputs.input_ids[0]):]
+ image_text = self.processor.batch_decode(
+ generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
+ )
+ image_des = []
+ for text in image_text:
+ image_des.append(text)
+ return image_des
+ except Exception as e:
+ self.logger.info(f"绾跨▼锛氭墽琛屽浘鐗囨弿杩版椂鍑洪敊:{e}")
+ finally:
+ # 4. 閲婃斁妯″瀷
+ self._release_model(model)
+ torch.cuda.empty_cache()
+
+ def get_rule(self,ragurl):
+ try:
+ rule_text = None
+ search_data = {
+ "collection_name": "smart_rule",
+ "query_text": "",
+ "search_mode": "hybrid",
+ "limit": 100,
+ "weight_dense": 0.7,
+ "weight_sparse": 0.3,
+ "filter_expr": "",
+ "output_fields": ["text"]
+ }
+ response = requests.post(ragurl + "/search", json=search_data)
+ results = response.json().get('results')
+ rule_text = ""
+ ruleid = 1
+ for rule in results:
+ if rule['score'] >= 0:
+ rule_text = rule_text + str(ruleid) + ". " + rule['entity'].get('text') + ";\n"
+ ruleid = ruleid + 1
+ # self.logger.info(len(rule_text))
+ else:
+ self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{response}")
+ return rule_text
+ except Exception as e:
+ self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{e}")
+ return None
+
+ def image_rule_chat(self, image_des,rule_text, ragurl, rag_mode,max_tokens):
+ try:
+ content = (
+ f"鍥剧墖鎻忚堪鍐呭涓猴細\n{image_des}\n瑙勫垯鍐呭锛歕n{rule_text}銆俓n璇烽獙璇佸浘鐗囨弿杩颁腑鏄惁鏈夌鍚堣鍒欑殑鍐呭锛屼笉杩涜鎺ㄧ悊鍜宼hink銆傝繑鍥炵粨鏋滄牸寮忎负[xxx绗﹀悎鐨勮鍒檌d]锛屽鏋滄病鏈夎繑鍥瀃]")
+ # self.logger.info(content)
+ #self.logger.info(len(content))
+ search_data = {
+ "prompt": "",
+ "messages": [
+ {
+ "role": "user",
+ "content": content
+ }
+ ],
+ "llm_name": rag_mode,
+ "stream": False,
+ "gen_conf": {
+ "temperature": 0.7,
+ "max_tokens": max_tokens
+ }
+ }
+ response = requests.post(ragurl + "/chat", json=search_data)
+ results = response.json().get('data')
+ #self.logger.info(len(results))
+ # self.logger.info(results)
+ ret = re.sub(r'<think>.*?</think>', '', results, flags=re.DOTALL)
+ ret = ret.replace(" ", "").replace("\t", "").replace("\n", "")
+ is_waning = 0
+ if len(ret) > 2:
+ is_waning = 1
+ return is_waning
+ except Exception as e:
+ self.logger.info(f"绾跨▼锛氭墽琛岃鍒欏尮閰嶆椂鍑洪敊:{image_des, rule_text, ragurl, rag_mode,e}")
+ return None
+
+ async def insert_json_data(self, ragurl, data):
+ try:
+ data = {'collection_name': "smartrag", "data": data, "description": ""}
+ requests.post(ragurl + "/insert_json_data", json=data, timeout=(0.3, 0.3))
+ #self.logger.info(f"璋冪敤褰曞儚鏈嶅姟:{ragurl, data}")
+ except Exception as e:
+ #self.logger.info(f"{self._thread_name}绾跨▼锛氳皟鐢ㄥ綍鍍忔椂鍑洪敊:鍦板潃锛歿ragurl}锛歿e}")
+ return
+
+ def _release_semaphore(self, future):
+ self.semaphore.release()
+ #self.logger.info(f"閲婃斁绾跨▼ (鍓╀綑绌洪棽: {self.semaphore._value}/{self.max_workers})")
+
+ def shutdown(self):
+ """瀹夊叏鍏抽棴"""
+ self.executor.shutdown(wait=False)
+ for model in self.model_pool:
+ del model
+ torch.cuda.empty_cache()
+
+ def _acquire_model(self):
+ """浠庢睜涓幏鍙栦竴涓┖闂叉ā鍨� (绠�鍗曡疆璇�)"""
+ while True:
+ for i, (model, lock) in enumerate(zip(self.model_pool, self.lock_pool)):
+ if lock.acquire(blocking=False):
+ return model, lock
+ time.sleep(0.1) # 閬垮厤CPU绌鸿浆
+
+ def _release_model(self, model):
+ """閲婃斁妯″瀷鍥炴睜"""
+ for i, m in enumerate(self.model_pool):
+ if m == model:
+ self.lock_pool[i].release()
+ break
+
+
+ def remove_duplicate_lines(self,text):
+ seen = set()
+ result = []
+ for line in text.split('銆�'): # 鎸夊彞鍙峰垎鍓�
+ if line.strip() and line not in seen:
+ seen.add(line)
+ result.append(line)
+ return '銆�'.join(result)
+ def remove_duplicate_lines_d(self,text):
+ seen = set()
+ result = []
+ for line in text.split(','): # 鎸夊彞鍙峰垎鍓�
+ if line.strip() and line not in seen:
+ seen.add(line)
+ result.append(line)
+ return '銆�'.join(result)
+ def remove_duplicate_lines_n(self,text):
+ seen = set()
+ result = []
+ for line in text.split('\n'): # 鎸夊彞鍙峰垎鍓�
+ if line.strip() and line not in seen:
+ seen.add(line)
+ result.append(line)
+ return '銆�'.join(result)
+
--
Gitblit v1.8.0