From 028e39f85bf4115024d0467613f26a1750ff004e Mon Sep 17 00:00:00 2001
From: zhm <839713154@qq.com>
Date: 星期四, 24 七月 2025 09:18:19 +0800
Subject: [PATCH] 预警跟图片描述优化
---
qwen_detect.py | 76 ++++
qwen_thread.py | 325 +--------------------
conf.txt | 2
qwen_thread_description.py | 464 ++++++++++++++++++++++++++++++
4 files changed, 555 insertions(+), 312 deletions(-)
diff --git a/conf.txt b/conf.txt
index cc12af5..4e24a17 100644
--- a/conf.txt
+++ b/conf.txt
@@ -14,6 +14,8 @@
max_tokens = 200
threadnum = 4
detectnum = 1
+qwendescription=1
+qwenwarning=1
qwenaddr = /home/debian/Qwen2.5-VL-3B-Instruct-GPTQ-Int4
cuda = 1
logaddr=/home/debian/logs
\ No newline at end of file
diff --git a/qwen_detect.py b/qwen_detect.py
index 2b769ed..b603aef 100644
--- a/qwen_detect.py
+++ b/qwen_detect.py
@@ -9,6 +9,8 @@
from pymilvus import connections, Collection
from logging.handlers import RotatingFileHandler
import get_mem
+from qwen_thread_description import qwen_thread_description
+
class ThreadPool:
def __init__(self):
@@ -59,8 +61,10 @@
# 鍔犺浇闆嗗悎
self.collection = Collection(name="smartobject")
self.collection.load()
- #鍒涘缓qwen绾跨▼姹�
- self.pool = qwen_thread(self.config,self.logger)
+ #鍒涘缓qwen绾跨▼姹�--棰勮绾跨▼姹�
+ self.poolWarning = qwen_thread(self.config,self.logger)
+ # 鍒涘缓qwen绾跨▼姹�--鍥剧墖鎻忚堪绛�
+ self.poolDescription= qwen_thread_description(self.config, self.logger)
#鏄惁鏇存柊
self._isupdate = False
@@ -87,12 +91,69 @@
self.threads[camera_id] = t
return t
- # 鍚姩绾跨▼浠诲姟
+ # 鍚姩绾跨▼浠诲姟--棰勮
+ def workerWarning(self, camera_id):
+ while True:
+ try:
+ res_a = self.collection.query(
+ expr=f"is_desc == 0 and video_point_id=={camera_id}",
+ output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name",
+ "task_id",
+ "task_name", "event_level_id", "event_level_name",
+ "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value",
+ "rule_id",
+ "detect_id", "knowledge_id", "suggestion", "risk_description",
+ "detect_time", "image_path", "image_desc_path", "video_path"],
+ consistency_level="Strong",
+ order_by_field="id", # 鎸塱d瀛楁鎺掑簭
+ order_by_type="desc" # 闄嶅簭鎺掑垪
+ )
+ # 璇诲彇鍏变韩鍐呭瓨涓殑鍥剧墖
+ # image_id = get_mem.smem_read_frame_qianwen(camera_id)
+ if len(res_a) > 0:
+ # sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True)
+ # res = sorted_results[0]
+ res = max(res_a, key=itemgetter("id"))
+ self.collection.delete(f"id == {res['id']}")
+ # 鏁版嵁缁�
+ data = {
+ "event_level_id": res['event_level_id'], # event_level_id
+ "event_level_name": res['event_level_name'], # event_level_id
+ "rule_id": res["rule_id"],
+ "video_point_id": res['video_point_id'], # video_point_id
+ "video_point_name": res['video_point_name'],
+ "is_waning": 0,
+ "is_desc":4,#棰勮涓姸鎬�
+ "zh_desc_class": res['zh_desc_class'],
+ "bounding_box": res['bounding_box'], # bounding_box
+ "task_id": res['task_id'], # task_id
+ "task_name": res['task_name'], # task_id
+ "detect_id": res['detect_id'], # detect_id
+ "detect_time": res['detect_time'], # detect_time
+ "detect_num": res['detect_num'],
+ "waning_value": res['waning_value'],
+ "image_path": res['image_path'], # image_path
+ "image_desc_path": res['image_desc_path'], # image_desc_path
+ "video_path": res['video_path'],
+ "text_vector": res['text_vector'],
+ "risk_description": res['risk_description'],
+ "suggestion": res['suggestion'],
+ "knowledge_id": res['knowledge_id']
+ }
+ # 淇濆瓨鍒癿ilvus
+ image_id = self.collection.insert(data).primary_keys
+ res['id'] = image_id[0]
+ self.poolWarning.submit(res)
+ time_sel.sleep(0.01)
+ except Exception as e:
+ logging.info(f"{camera_id}绾跨▼閿欒:{e}")
+
+ # 鍚姩绾跨▼浠诲姟--鐢熸垚鍥剧墖鎻忚堪绛�
def worker(self, camera_id):
while True:
try:
res_a = self.collection.query(
- expr=f"is_desc == 0 and video_point_id=={camera_id}",
+ expr=f"is_desc == 5 and video_point_id=={camera_id}",
output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id",
"task_name", "event_level_id", "event_level_name",
"video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id",
@@ -116,7 +177,7 @@
"rule_id": res["rule_id"],
"video_point_id": res['video_point_id'], # video_point_id
"video_point_name": res['video_point_name'],
- "is_waning": 0,
+ "is_waning": res['is_waning'],
"is_desc": 1,
"zh_desc_class": res['zh_desc_class'],
"bounding_box": res['bounding_box'], # bounding_box
@@ -137,7 +198,7 @@
# 淇濆瓨鍒癿ilvus
image_id = self.collection.insert(data).primary_keys
res['id'] = image_id[0]
- self.pool.submit(res)
+ self.poolDescription.submit(res)
time_sel.sleep(0.01)
except Exception as e:
logging.info(f"{camera_id}绾跨▼閿欒:{e}")
@@ -226,6 +287,9 @@
thread = pool.threads.get(camera.get("camera_id"))
if not thread:
logging.info(f"寮�濮嬪垱寤簕camera.get('camera_id')}绾跨▼")
+ #鍏堝鏁版嵁杩涜棰勮
+ pool.safe_start(pool.workerWarning, camera.get('camera_id'))
+ #鍦ㄧ敓鎴愬浘鐗囨弿杩般�佸鐞嗗缓璁瓑淇℃伅
pool.safe_start(pool.worker, camera.get('camera_id'))
logging.info(f"{camera.get('camera_id')}绾跨▼鍒涘缓瀹屾瘯")
diff --git a/qwen_thread.py b/qwen_thread.py
index f1cb6e0..f3f8703 100644
--- a/qwen_thread.py
+++ b/qwen_thread.py
@@ -1,3 +1,4 @@
+import logging
import time
from concurrent.futures import ThreadPoolExecutor
import threading
@@ -26,13 +27,13 @@
# 鍔犺浇闆嗗悎
self.collection = Collection(name="smartobject")
self.collection.load()
- if config.get('cuda') == None or config.get('cuda') == '0':
+ if config.get('cuda') is None or config.get('cuda') == '0':
self.device = f"cuda"
else:
self.device = f"cuda:{config.get('cuda')}"
self.model_pool = []
- self.lock_pool = [threading.Lock() for _ in range(int(config.get("threadnum")))]
- for i in range(int(config.get("threadnum"))):
+ self.lock_pool = [threading.Lock() for _ in range(int(config.get("qwenwarning")))]
+ for i in range(int(config.get("qwenwarning"))):
model = AutoModelForVision2Seq.from_pretrained(
config.get("qwenaddr"),
device_map=self.device,
@@ -70,27 +71,15 @@
def tark_do(self,res,ragurl,rag_mode,max_tokens):
try:
- # 1. 浠庨泦鍚圓鑾峰彇鍚戦噺鍜屽厓鏁版嵁
- is_waning = 0
- is_desc = 2
# 鐢熸垚鍥剧墖鎻忚堪
ks_time = datetime.now()
- desc_time = datetime.now() - ks_time
- current_time = datetime.now()
+
risk_description = ""
suggestion = ""
# 璋冪敤瑙勫垯鍖归厤鏂规硶,鍒ゆ柇鏄惁棰勮
is_waning = self.image_rule(res)
- # 濡傛灉棰勮,鍒欑敓鎴愰殣鎮f弿杩板拰澶勭悊寤鸿
- if is_waning == 1:
- # 鑾峰彇瑙勭珷鍒跺害鏁版嵁
- filedata = self.get_filedata(res['waning_value'],res['suggestion'], ragurl)
- # 鐢熸垚闅愭偅鎻忚堪
- risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
- # 鐢熸垚澶勭悊寤鸿
- suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
- #self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿res['id']}:鏄惁棰勮{is_waning},瀹夊叏闅愭偅锛歿risk_description}\n澶勭悊寤鸿锛歿suggestion}")
- # 鏁版嵁缁�
+ self.logger.info(f"棰勮瑙勫垯瑙勫垯瑙勫垯is_waning锛歿is_waning}")
+ #鏇存柊鏁版嵁鐨勯璀︾粨鏋滀笌鏁版嵁棰勮鐘舵��
data = {
"event_level_id": res['event_level_id'], # event_level_id
"event_level_name": res['event_level_name'], # event_level_id
@@ -98,7 +87,7 @@
"video_point_id": res['video_point_id'], # video_point_id
"video_point_name": res['video_point_name'],
"is_waning": is_waning,
- "is_desc": 1,
+ "is_desc": 5, #鏀逛负宸茬粡棰勮
"zh_desc_class": res['zh_desc_class'], # text_vector
"bounding_box": res['bounding_box'], # bounding_box
"task_id": res['task_id'], # task_id
@@ -119,105 +108,14 @@
# 淇濆瓨鍒癿ilvus
image_id = self.collection.insert(data).primary_keys
res['id'] = image_id[0]
- # 鍥剧墖鎻忚堪鐢熸垚鎴愬姛
- desc = self.image_desc(res)
- if desc:
- is_desc = 2
- else:
- is_desc = 3
- # 鏁版嵁缁�
- data = {
- "event_level_id": res['event_level_id'], # event_level_id
- "event_level_name": res['event_level_name'], # event_level_id
- "rule_id": res["rule_id"],
- "video_point_id": res['video_point_id'], # video_point_id
- "video_point_name": res['video_point_name'],
- "is_waning": is_waning,
- "is_desc": is_desc,
- "zh_desc_class": desc, # text_vector
- "bounding_box": res['bounding_box'], # bounding_box
- "task_id": res['task_id'], # task_id
- "task_name": res['task_name'], # task_id
- "detect_id": res['detect_id'], # detect_id
- "detect_time": res['detect_time'], # detect_time
- "detect_num": res['detect_num'],
- "waning_value": res['waning_value'],
- "image_path": res['image_path'], # image_path
- "image_desc_path": res['image_desc_path'], # image_desc_path
- "video_path": res['video_path'],
- "text_vector": res['text_vector'],
- "risk_description": risk_description,
- "suggestion": suggestion,
- "knowledge_id": res['knowledge_id']
- }
- self.collection.delete(f"id == {res['id']}")
- # 淇濆瓨鍒癿ilvus
- image_id = self.collection.insert(data).primary_keys
- data = {
- "id": str(image_id[0]),
- "video_point_id": res['video_point_id'],
- "video_path": res["video_point_name"],
- "zh_desc_class": desc,
- "detect_time": res['detect_time'],
- "image_path": f"{res['image_path']}",
- "task_name": res["task_name"],
- "event_level_name": res["event_level_name"],
- "rtsp_address": f"{res['video_path']}"
- }
- # 璋冪敤rag
- asyncio.run(self.insert_json_data(ragurl, data))
- rag_time = datetime.now() - current_time
- self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿image_id}杩愯缁撴潫鎬讳綋鐢ㄦ椂:{datetime.now() - ks_time},鍥剧墖鎻忚堪鐢ㄦ椂{desc_time},RAG鐢ㄦ椂{rag_time}")
- if is_waning == 1:
- self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿image_id},鍥剧墖鎻忚堪锛歿desc}\n闅愭偅锛歿risk_description}\n寤鸿锛歿suggestion}")
+ self.logger.info(f"{res['video_point_id']}棰勮鎵ц瀹屾瘯锛歿image_id}杩愯缁撴潫鎬讳綋鐢ㄦ椂:{datetime.now() - ks_time}")
+ return None
except Exception as e:
self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊::{e}")
return 0
- def image_desc(self, res_data):
- try:
- model, lock = self._acquire_model()
-
- image = Image.open(res_data['image_desc_path']).convert("RGB").resize((600, 600), Image.Resampling.LANCZOS)
- messages = [
- {
- "role": "user",
- "content": [
- {
- "type": "image",
- },
- {"type": "text", "text": "璇疯缁嗘弿杩板浘鐗囦腑鐨勭洰鏍囦俊鎭強鐗瑰緛銆傝繑鍥炴牸寮忎负鏁存鏂囧瓧鎻忚堪"},
- ],
- }
- ]
- # Preparation for inference
- text = self.processor.apply_chat_template(
- messages, add_generation_prompt=True
- )
- inputs = self.processor(
- text=[text],
- images=[image],
- padding=True,
- return_tensors="pt",
- )
- inputs = inputs.to(model.device)
- with torch.inference_mode(), torch.amp.autocast(device_type=self.device, dtype=torch.float16):
- outputs = model.generate(**inputs,max_new_tokens=200,do_sample=False,num_beams=1,temperature=None,top_p=None,top_k=1,use_cache=True,repetition_penalty=1.0)
- generated_ids = outputs[:, len(inputs.input_ids[0]):]
- image_text = self.processor.batch_decode(
- generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
- )
- image_des = (image_text[0]).strip()
- #self.logger.info(f"{res_data['video_point_id']}:{res_data['id']}:{res_data['detect_time']}:{image_des}")
- return image_des
- except Exception as e:
- self.logger.info(f"绾跨▼锛氭墽琛屽浘鐗囨弿杩版椂鍑洪敊:{e}")
- finally:
- # 4. 閲婃斁妯″瀷
- self._release_model(model)
- torch.cuda.empty_cache()
-
def image_rule(self, res_data):
+ self.logger.info(f"棰勮瑙勫垯瑙勫垯瑙勫垯绛夌骇鍒嗙被灏辨槸鎵撹缂濆灏戠Н鍒�")
try:
model, lock = self._acquire_model()
image = Image.open(res_data['image_desc_path']).convert("RGB").resize((600, 600), Image.Resampling.LANCZOS)
@@ -227,7 +125,7 @@
"role": "user",
"content": [
{"type": "image", "image": image},
- {"type": "text", "text": f"鍥剧墖涓槸鍚︽湁{res_data['waning_value']}?璇峰洖绛攜es鎴杗o"},
+ {"type": "text", "text": f"璇锋娴嬪浘鐗囦腑{res_data['waning_value']}?璇峰洖绛攜es鎴杗o"},
],
}
]
@@ -254,179 +152,19 @@
)
image_des = (image_text[0]).strip()
- return image_des
+ upper_text = image_des.upper()
+ self.logger.info(f"棰勮瑙勫垯瑙勫垯瑙勫垯锛歿upper_text}")
+ if "YES" in upper_text:
+ return 1
+ else:
+ return 0
except Exception as e:
self.logger.info(f"绾跨▼锛氭墽琛屽浘鐗囨弿杩版椂鍑洪敊:{e}")
+ return 0
finally:
# 4. 閲婃斁妯″瀷
self._release_model(model)
torch.cuda.empty_cache()
-
- def get_rule(self,ragurl):
- try:
- rule_text = None
- search_data = {
- "collection_name": "smart_rule",
- "query_text": "",
- "search_mode": "hybrid",
- "limit": 100,
- "weight_dense": 0.7,
- "weight_sparse": 0.3,
- "filter_expr": "",
- "output_fields": ["text"]
- }
- response = requests.post(ragurl + "/search", json=search_data)
- results = response.json().get('results')
- rule_text = ""
- ruleid = 1
- for rule in results:
- if rule['score'] >= 0:
- rule_text = rule_text + str(ruleid) + ". " + rule['entity'].get('text') + ";\n"
- ruleid = ruleid + 1
- # self.logger.info(len(rule_text))
- else:
- self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{response}")
- return rule_text
- except Exception as e:
- self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{e}")
- return None
-
- def image_rule_chat(self, image_des,rule_text, ragurl, rag_mode,max_tokens):
- try:
- content = (
- f"鍥剧墖鎻忚堪鍐呭涓猴細\n{image_des}\n瑙勫垯鍐呭锛歕n{rule_text}銆俓n璇烽獙璇佸浘鐗囨弿杩颁腑鏄惁鏈変笉绗﹀悎瑙勫垯鐨勫唴瀹癸紝涓嶈繘琛屾帹鐞嗗拰think銆傝繑鍥炵粨鏋滄牸寮忎负[xxx绗﹀悎鐨勮鍒檌d]锛屽鏋滄病鏈夎繑鍥瀃]")
- #self.logger.info(len(content))
- search_data = {
- "prompt": "",
- "messages": [
- {
- "role": "user",
- "content": content
- }
- ],
- "llm_name": rag_mode,
- "stream": False,
- "gen_conf": {
- "temperature": 0.7,
- "max_tokens": max_tokens
- }
- }
- response = requests.post(ragurl + "/chat", json=search_data)
- results = response.json().get('data')
- ret = re.sub(r'<think>.*?</think>', '', results, flags=re.DOTALL)
- ret = ret.replace(" ", "").replace("\t", "").replace("\n", "")
- #self.logger.info(f"{rule_text}:{ret}")
- is_waning = 0
- if len(ret) > 2:
- is_waning = 1
- return is_waning
- except Exception as e:
- self.logger.info(f"绾跨▼锛氭墽琛岃鍒欏尮閰嶆椂鍑洪敊:{image_des, rule_text, ragurl, rag_mode,e}")
- return None
-
- # 闅愭偅鎻忚堪
- def image_rule_chat_with_detail(self,filedata, rule_text, ragurl, rag_mode,max_tokens):
- # API璋冪敤
- content = (
- f"瑙勭珷鍒跺害涓猴細[{filedata}]\n杩濆弽鍐呭涓猴細[{rule_text}]\n璇锋煡璇㈣繚鍙嶅唴瀹瑰湪瑙勭珷鍒跺害涓殑瀹夊叏闅愭偅锛屼笉杩涜鎺ㄧ悊鍜宼hink锛岃繑鍥炵畝鐭殑鏂囧瓧淇℃伅")
- # self.logger.info(len(content))
- search_data = {
- "prompt": "",
- "messages": [
- {
- "role": "user",
- "content": content
- }
- ],
- "llm_name": rag_mode,
- "stream": False,
- "gen_conf": {
- "temperature": 0.7,
- "max_tokens": max_tokens
- }
- }
- #self.logger.info(content)
- response = requests.post(ragurl + "/chat", json=search_data)
- # 浠巎son鎻愬彇data瀛楁鍐呭
- ret = response.json()["data"]
- # 绉婚櫎<think>鏍囩鍜屽唴瀹�
- ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
- # 瀛楃涓叉竻鐞�,绉婚櫎绌烘牸,鍒惰〃绗�,鎹㈣绗�,鏄熷彿
- ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
- #print(f"瀹夊叏闅愭偅:{ret}")
- return ret
- #澶勭悊寤鸿
- def image_rule_chat_suggestion(self,filedata, rule_text, ragurl, rag_mode,max_tokens):
- # 璇锋眰鍐呭
- content = (
- f"瑙勭珷鍒跺害涓猴細[{filedata}]\n杩濆弽鍐呭涓猴細[{rule_text}]\n璇锋煡璇㈣繚鍙嶅唴瀹瑰湪瑙勭珷鍒跺害涓殑澶勭悊寤鸿锛屼笉杩涜鎺ㄧ悊鍜宼hink锛岃繑鍥炵畝鐭殑鏂囧瓧淇℃伅")
- response = requests.post(
- # ollama鍦板潃
- url=f"{ragurl}/chat",
- json={
- # 鎸囧畾妯″瀷
- "llm_name": rag_mode,
- "messages": [
- {"role": "user", "content": content}
- ],
- "stream": False, # 鍏抽棴娴佸紡杈撳嚭
- "gen_conf": {
- "temperature": 0.7,
- "max_tokens": max_tokens
- }
- }
- )
- # 浠巎son鎻愬彇data瀛楁鍐呭
- ret = response.json()["data"]
- # 绉婚櫎<think>鏍囩鍜屽唴瀹�
- ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
- # 瀛楃涓叉竻鐞�,绉婚櫎绌烘牸,鍒惰〃绗�,鎹㈣绗�,鏄熷彿
- ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
- #print(f"澶勭悊寤鸿:{ret}")
- return ret
-
- # RAG鏈嶅姟鍙戦�佽姹�,鑾峰彇鐭ヨ瘑搴撳唴瀹�
- def get_filedata(self, searchtext,filter_expr, ragurl):
- search_data = {
- # 鐭ヨ瘑搴撻泦鍚�
- "collection_name": "smart_knowledge",
- # 鏌ヨ鏂囨湰
- "query_text": searchtext,
- # 鎼滅储妯″紡
- "search_mode": "hybrid",
- # 鏈�澶氳繑鍥炵粨鏋�
- "limit": 10,
- # 璋冨瘑鍚戦噺鎼滅储鏉冮噸
- "weight_dense": 0.9,
- # 绋�鐤忓悜閲忔悳绱㈡潈閲�
- "weight_sparse": 0.1,
- # 绌哄瓧绗︿覆
- "filter_expr": f"docnm_kwd in {filter_expr}",
- # 鍙繑鍥� text 瀛楁
- "output_fields": ["text"]
- }
- #print(search_data)
- # 鍚� ragurl + "/search" 绔偣鍙戦�丳OST璇锋眰
- response = requests.post(ragurl + "/search", json=search_data)
- # 浠庡搷搴斾腑鑾峰彇'results'瀛楁
- results = response.json().get('results')
- # 鍒濆鍖� text
- text = ""
- # 閬嶅巻鎵�鏈夌粨鏋滆鍒�(rule)锛屽皢姣忔潯瑙勫垯鐨�'entity'涓殑'text'瀛楁鍙栧嚭.
- for rule in results:
- text = text + rule['entity'].get('text') + ";\n"
- #print(text)
- return text
-
- async def insert_json_data(self, ragurl, data):
- try:
- data = {'collection_name': "smartrag", "data": data, "description": ""}
- requests.post(ragurl + "/insert_json_data", json=data, timeout=(0.3, 0.3))
- #self.logger.info(f"璋冪敤褰曞儚鏈嶅姟:{ragurl, data}")
- except Exception as e:
- #self.logger.info(f"{self._thread_name}绾跨▼锛氳皟鐢ㄥ綍鍍忔椂鍑洪敊:鍦板潃锛歿ragurl}锛歿e}")
- return
-
def _release_semaphore(self, future):
self.semaphore.release()
#self.logger.info(f"閲婃斁绾跨▼ (鍓╀綑绌洪棽: {self.semaphore._value}/{self.max_workers})")
@@ -453,29 +191,4 @@
self.lock_pool[i].release()
break
-
- def remove_duplicate_lines(self,text):
- seen = set()
- result = []
- for line in text.split('銆�'): # 鎸夊彞鍙峰垎鍓�
- if line.strip() and line not in seen:
- seen.add(line)
- result.append(line)
- return '銆�'.join(result)
- def remove_duplicate_lines_d(self,text):
- seen = set()
- result = []
- for line in text.split(','): # 鎸夊彞鍙峰垎鍓�
- if line.strip() and line not in seen:
- seen.add(line)
- result.append(line)
- return '銆�'.join(result)
- def remove_duplicate_lines_n(self,text):
- seen = set()
- result = []
- for line in text.split('\n'): # 鎸夊彞鍙峰垎鍓�
- if line.strip() and line not in seen:
- seen.add(line)
- result.append(line)
- return '銆�'.join(result)
diff --git a/qwen_thread_description.py b/qwen_thread_description.py
new file mode 100644
index 0000000..4dbc849
--- /dev/null
+++ b/qwen_thread_description.py
@@ -0,0 +1,464 @@
+import logging
+import time
+from concurrent.futures import ThreadPoolExecutor
+import threading
+import torch
+from PIL import Image
+from pymilvus import connections, Collection
+from datetime import datetime
+import requests
+import asyncio
+import re
+
+from qwen_vl_utils import process_vision_info
+from transformers import AutoModelForVision2Seq, AutoProcessor
+
+
+class qwen_thread_description:
+ def __init__(self, config,logger):
+ self.config = config
+ self.max_workers = int(config.get("threadnum"))
+ self.executor = ThreadPoolExecutor(max_workers=int(config.get("threadnum")))
+ self.semaphore = threading.Semaphore(int(config.get("threadnum")))
+ self.logger = logger
+
+ # 鍒濆鍖朚ilvus闆嗗悎
+ connections.connect("default", host=config.get("milvusurl"), port=config.get("milvusport"))
+ # 鍔犺浇闆嗗悎
+ self.collection = Collection(name="smartobject")
+ self.collection.load()
+ if config.get('cuda') is None or config.get('cuda') == '0':
+ self.device = f"cuda"
+ else:
+ self.device = f"cuda:{config.get('cuda')}"
+ self.model_pool = []
+ self.lock_pool = [threading.Lock() for _ in range(int(config.get("qwendescription")))]
+ for i in range(int(config.get("qwendescription"))):
+ model = AutoModelForVision2Seq.from_pretrained(
+ config.get("qwenaddr"),
+ device_map=self.device,
+ trust_remote_code=True,
+ use_safetensors=True,
+ torch_dtype=torch.float16
+
+ ).eval()
+ model = model.to(self.device)
+ self.model_pool.append(model)
+
+ # 鍏变韩鐨勫鐞嗗櫒 (绾跨▼瀹夊叏)
+ self.processor = AutoProcessor.from_pretrained(config.get("qwenaddr"), use_fast=True)
+
+
+ def submit(self,res_a):
+ # 灏濊瘯鑾峰彇淇″彿閲忥紙闈為樆濉烇級
+ acquired = self.semaphore.acquire(blocking=False)
+
+ if not acquired:
+ #self.logger.info(f"绾跨▼姹犲凡婊★紝绛夊緟绌洪棽绾跨▼... (褰撳墠娲昏穬: {self.max_workers - self.semaphore._value}/{self.max_workers})")
+ # 闃诲绛夊緟鐩村埌鏈夊彲鐢ㄧ嚎绋�
+ self.semaphore.acquire(blocking=True)
+
+ future = self.executor.submit(self._wrap_task, res_a)
+ future.add_done_callback(self._release_semaphore)
+ return future
+
+ def _wrap_task(self, res_a):
+ try:
+ self.tark_do(res_a, self.config.get("ragurl"), self.config.get("ragmode"), self.config.get("max_tokens"))
+ except Exception as e:
+ self.logger.info(f"澶勭悊鍑洪敊: {e}")
+ raise
+
+ def tark_do(self,res,ragurl,rag_mode,max_tokens):
+ try:
+ # 1. 浠庨泦鍚圓鑾峰彇鍚戦噺鍜屽厓鏁版嵁
+ is_waning = 0
+ is_desc = 2
+ # 鐢熸垚鍥剧墖鎻忚堪
+ ks_time = datetime.now()
+
+ risk_description = ""
+ suggestion = ""
+ # 璋冪敤瑙勫垯鍖归厤鏂规硶,鍒ゆ柇鏄惁棰勮
+ is_waning = res['is_waning']
+ self.logger.info(f"棰勮瑙勫垯瑙勫垯瑙勫垯is_waning锛歿is_waning}")
+ # 濡傛灉棰勮,鍒欑敓鎴愰殣鎮f弿杩板拰澶勭悊寤鸿
+ if is_waning == 1:
+ # 鑾峰彇瑙勭珷鍒跺害鏁版嵁
+ filedata = self.get_filedata(res['waning_value'],res['suggestion'], ragurl)
+ # 鐢熸垚闅愭偅鎻忚堪
+ risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
+ # 鐢熸垚澶勭悊寤鸿
+ suggestion = self.image_rule_chat_suggestion(filedata, res['waning_value'], ragurl,rag_mode,max_tokens)
+ #self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿res['id']}:鏄惁棰勮{is_waning},瀹夊叏闅愭偅锛歿risk_description}\n澶勭悊寤鸿锛歿suggestion}")
+ # 鏁版嵁缁�
+
+ desc_time = datetime.now() - ks_time
+ current_time = datetime.now()
+ # 鍥剧墖鎻忚堪鐢熸垚鎴愬姛
+ desc = self.image_desc(res)
+ if desc:
+ is_desc = 2
+ else:
+ is_desc = 3
+ # 鏁版嵁缁�
+ data = {
+ "event_level_id": res['event_level_id'], # event_level_id
+ "event_level_name": res['event_level_name'], # event_level_id
+ "rule_id": res["rule_id"],
+ "video_point_id": res['video_point_id'], # video_point_id
+ "video_point_name": res['video_point_name'],
+ "is_waning": is_waning,
+ "is_desc": is_desc,
+ "zh_desc_class": desc, # text_vector
+ "bounding_box": res['bounding_box'], # bounding_box
+ "task_id": res['task_id'], # task_id
+ "task_name": res['task_name'], # task_id
+ "detect_id": res['detect_id'], # detect_id
+ "detect_time": res['detect_time'], # detect_time
+ "detect_num": res['detect_num'],
+ "waning_value": res['waning_value'],
+ "image_path": res['image_path'], # image_path
+ "image_desc_path": res['image_desc_path'], # image_desc_path
+ "video_path": res['video_path'],
+ "text_vector": res['text_vector'],
+ "risk_description": risk_description,
+ "suggestion": suggestion,
+ "knowledge_id": res['knowledge_id']
+ }
+ self.collection.delete(f"id == {res['id']}")
+ # 淇濆瓨鍒癿ilvus
+ image_id = self.collection.insert(data).primary_keys
+ data = {
+ "id": str(image_id[0]),
+ "video_point_id": res['video_point_id'],
+ "video_path": res["video_point_name"],
+ "zh_desc_class": desc,
+ "detect_time": res['detect_time'],
+ "image_path": f"{res['image_path']}",
+ "task_name": res["task_name"],
+ "event_level_name": res["event_level_name"],
+ "rtsp_address": f"{res['video_path']}"
+ }
+ # 璋冪敤rag
+ asyncio.run(self.insert_json_data(ragurl, data))
+ rag_time = datetime.now() - current_time
+ self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿image_id}杩愯缁撴潫鎬讳綋鐢ㄦ椂:{datetime.now() - ks_time},RAG鐢ㄦ椂{desc_time},鍥剧墖鎻忚堪鐢ㄦ椂{rag_time}")
+ if is_waning == 1:
+ self.logger.info(f"{res['video_point_id']}鎵ц瀹屾瘯锛歿image_id},鍥剧墖鎻忚堪锛歿desc}\n闅愭偅锛歿risk_description}\n寤鸿锛歿suggestion}")
+ except Exception as e:
+ self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊::{e}")
+ return 0
+
+ def image_desc(self, res_data):
+ try:
+ model, lock = self._acquire_model()
+
+ image = Image.open(res_data['image_desc_path']).convert("RGB").resize((600, 600), Image.Resampling.LANCZOS)
+ messages = [
+ {
+ "role": "user",
+ "content": [
+ {
+ "type": "image",
+ },
+ {"type": "text", "text": "璇疯缁嗘弿杩板浘鐗囦腑鐨勭洰鏍囦俊鎭強鐗瑰緛銆傝繑鍥炴牸寮忎负鏁存鏂囧瓧鎻忚堪"},
+ ],
+ }
+ ]
+ # Preparation for inference
+ text = self.processor.apply_chat_template(
+ messages, add_generation_prompt=True
+ )
+ inputs = self.processor(
+ text=[text],
+ images=[image],
+ padding=True,
+ return_tensors="pt",
+ )
+ inputs = inputs.to(model.device)
+ with torch.inference_mode(), torch.amp.autocast(device_type=self.device, dtype=torch.float16):
+ outputs = model.generate(**inputs,max_new_tokens=200,do_sample=False,num_beams=1,temperature=0.1)
+ generated_ids = outputs[:, len(inputs.input_ids[0]):]
+ image_text = self.processor.batch_decode(
+ generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
+ )
+ image_des = (image_text[0]).strip()
+ #self.logger.info(f"{res_data['video_point_id']}:{res_data['id']}:{res_data['detect_time']}:{image_des}")
+ return image_des
+ except Exception as e:
+ self.logger.info(f"绾跨▼锛氭墽琛屽浘鐗囨弿杩版椂鍑洪敊:{e}")
+ finally:
+ # 4. 閲婃斁妯″瀷
+ self._release_model(model)
+ torch.cuda.empty_cache()
+
+ def image_rule(self, res_data):
+ self.logger.info(f"棰勮瑙勫垯瑙勫垯瑙勫垯绛夌骇鍒嗙被灏辨槸鎵撹缂濆灏戠Н鍒�")
+ try:
+ model, lock = self._acquire_model()
+ image = Image.open(res_data['image_desc_path']).convert("RGB").resize((600, 600), Image.Resampling.LANCZOS)
+
+ messages = [
+ {
+ "role": "user",
+ "content": [
+ {"type": "image", "image": image},
+ {"type": "text", "text": f"璇锋娴嬪浘鐗囦腑{res_data['waning_value']}?璇峰洖绛攜es鎴杗o"},
+ ],
+ }
+ ]
+
+ # Preparation for inference
+ text = self.processor.apply_chat_template(
+ messages, tokenize=False, add_generation_prompt=True
+ )
+ image_inputs, video_inputs = process_vision_info(messages)
+ inputs = self.processor(
+ text=[text],
+ images=image_inputs,
+ videos=video_inputs,
+ padding=True,
+ return_tensors="pt",
+ )
+ inputs = inputs.to(model.device)
+
+ with torch.no_grad():
+ outputs = model.generate(**inputs, max_new_tokens=10)
+ generated_ids = outputs[:, len(inputs.input_ids[0]):]
+ image_text = self.processor.batch_decode(
+ generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
+ )
+
+ image_des = (image_text[0]).strip()
+ upper_text = image_des.upper()
+ self.logger.info(f"棰勮瑙勫垯瑙勫垯瑙勫垯锛歿upper_text}")
+ if "YES" in upper_text:
+ return 1
+ else:
+ return 0
+ except Exception as e:
+ self.logger.info(f"绾跨▼锛氭墽琛屽浘鐗囨弿杩版椂鍑洪敊:{e}")
+ return 0
+ finally:
+ # 4. 閲婃斁妯″瀷
+ self._release_model(model)
+ torch.cuda.empty_cache()
+
+ def get_rule(self,ragurl):
+ try:
+ rule_text = None
+ search_data = {
+ "collection_name": "smart_rule",
+ "query_text": "",
+ "search_mode": "hybrid",
+ "limit": 100,
+ "weight_dense": 0.7,
+ "weight_sparse": 0.3,
+ "filter_expr": "",
+ "output_fields": ["text"]
+ }
+ response = requests.post(ragurl + "/search", json=search_data)
+ results = response.json().get('results')
+ rule_text = ""
+ ruleid = 1
+ for rule in results:
+ if rule['score'] >= 0:
+ rule_text = rule_text + str(ruleid) + ". " + rule['entity'].get('text') + ";\n"
+ ruleid = ruleid + 1
+ # self.logger.info(len(rule_text))
+ else:
+ self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{response}")
+ return rule_text
+ except Exception as e:
+ self.logger.info(f"绾跨▼锛氭墽琛岃幏鍙栬鍒欐椂鍑洪敊:{e}")
+ return None
+
+ def image_rule_chat(self, image_des,rule_text, ragurl, rag_mode,max_tokens):
+ try:
+ content = (
+ f"鍥剧墖鎻忚堪鍐呭涓猴細\n{image_des}\n瑙勫垯鍐呭锛歕n{rule_text}銆俓n璇烽獙璇佸浘鐗囨弿杩颁腑鏄惁鏈変笉绗﹀悎瑙勫垯鐨勫唴瀹癸紝涓嶈繘琛屾帹鐞嗗拰think銆傝繑鍥炵粨鏋滄牸寮忎负[xxx绗﹀悎鐨勮鍒檌d]锛屽鏋滄病鏈夎繑鍥瀃]")
+ #self.logger.info(len(content))
+ search_data = {
+ "prompt": "",
+ "messages": [
+ {
+ "role": "user",
+ "content": content
+ }
+ ],
+ "llm_name": rag_mode,
+ "stream": False,
+ "gen_conf": {
+ "temperature": 0.7,
+ "max_tokens": max_tokens
+ }
+ }
+ response = requests.post(ragurl + "/chat", json=search_data)
+ results = response.json().get('data')
+ ret = re.sub(r'<think>.*?</think>', '', results, flags=re.DOTALL)
+ ret = ret.replace(" ", "").replace("\t", "").replace("\n", "")
+ #self.logger.info(f"{rule_text}:{ret}")
+ is_waning = 0
+ if len(ret) > 2:
+ is_waning = 1
+ return is_waning
+ except Exception as e:
+ self.logger.info(f"绾跨▼锛氭墽琛岃鍒欏尮閰嶆椂鍑洪敊:{image_des, rule_text, ragurl, rag_mode,e}")
+ return None
+
+ # 闅愭偅鎻忚堪
+ def image_rule_chat_with_detail(self,filedata, rule_text, ragurl, rag_mode,max_tokens):
+ # API璋冪敤
+ content = (
+ f"瑙勭珷鍒跺害涓猴細[{filedata}]\n杩濆弽鍐呭涓猴細[{rule_text}]\n璇锋煡璇㈣繚鍙嶅唴瀹瑰湪瑙勭珷鍒跺害涓殑瀹夊叏闅愭偅锛屼笉杩涜鎺ㄧ悊鍜宼hink锛岃繑鍥炵畝鐭殑鏂囧瓧淇℃伅")
+ # self.logger.info(len(content))
+ search_data = {
+ "prompt": "",
+ "messages": [
+ {
+ "role": "user",
+ "content": content
+ }
+ ],
+ "llm_name": rag_mode,
+ "stream": False,
+ "gen_conf": {
+ "temperature": 0.7,
+ "max_tokens": max_tokens
+ }
+ }
+ #self.logger.info(content)
+ response = requests.post(ragurl + "/chat", json=search_data)
+ # 浠巎son鎻愬彇data瀛楁鍐呭
+ ret = response.json()["data"]
+ # 绉婚櫎<think>鏍囩鍜屽唴瀹�
+ ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
+ # 瀛楃涓叉竻鐞�,绉婚櫎绌烘牸,鍒惰〃绗�,鎹㈣绗�,鏄熷彿
+ ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
+ #print(f"瀹夊叏闅愭偅:{ret}")
+ return ret
+ #澶勭悊寤鸿
+ def image_rule_chat_suggestion(self,filedata, rule_text, ragurl, rag_mode,max_tokens):
+ # 璇锋眰鍐呭
+ content = (
+ f"瑙勭珷鍒跺害涓猴細[{filedata}]\n杩濆弽鍐呭涓猴細[{rule_text}]\n璇锋煡璇㈣繚鍙嶅唴瀹瑰湪瑙勭珷鍒跺害涓殑澶勭悊寤鸿锛屼笉杩涜鎺ㄧ悊鍜宼hink锛岃繑鍥炵畝鐭殑鏂囧瓧淇℃伅")
+ response = requests.post(
+ # ollama鍦板潃
+ url=f"{ragurl}/chat",
+ json={
+ # 鎸囧畾妯″瀷
+ "llm_name": rag_mode,
+ "messages": [
+ {"role": "user", "content": content}
+ ],
+ "stream": False, # 鍏抽棴娴佸紡杈撳嚭
+ "gen_conf": {
+ "temperature": 0.7,
+ "max_tokens": max_tokens
+ }
+ }
+ )
+ # 浠巎son鎻愬彇data瀛楁鍐呭
+ ret = response.json()["data"]
+ # 绉婚櫎<think>鏍囩鍜屽唴瀹�
+ ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
+ # 瀛楃涓叉竻鐞�,绉婚櫎绌烘牸,鍒惰〃绗�,鎹㈣绗�,鏄熷彿
+ ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
+ #print(f"澶勭悊寤鸿:{ret}")
+ return ret
+
+ # RAG鏈嶅姟鍙戦�佽姹�,鑾峰彇鐭ヨ瘑搴撳唴瀹�
+ def get_filedata(self, searchtext,filter_expr, ragurl):
+ search_data = {
+ # 鐭ヨ瘑搴撻泦鍚�
+ "collection_name": "smart_knowledge",
+ # 鏌ヨ鏂囨湰
+ "query_text": searchtext,
+ # 鎼滅储妯″紡
+ "search_mode": "hybrid",
+ # 鏈�澶氳繑鍥炵粨鏋�
+ "limit": 10,
+ # 璋冨瘑鍚戦噺鎼滅储鏉冮噸
+ "weight_dense": 0.9,
+ # 绋�鐤忓悜閲忔悳绱㈡潈閲�
+ "weight_sparse": 0.1,
+ # 绌哄瓧绗︿覆
+ "filter_expr": f"docnm_kwd in {filter_expr}",
+ # 鍙繑鍥� text 瀛楁
+ "output_fields": ["text"]
+ }
+ #print(search_data)
+ # 鍚� ragurl + "/search" 绔偣鍙戦�丳OST璇锋眰
+ response = requests.post(ragurl + "/search", json=search_data)
+ # 浠庡搷搴斾腑鑾峰彇'results'瀛楁
+ results = response.json().get('results')
+ # 鍒濆鍖� text
+ text = ""
+ # 閬嶅巻鎵�鏈夌粨鏋滆鍒�(rule)锛屽皢姣忔潯瑙勫垯鐨�'entity'涓殑'text'瀛楁鍙栧嚭.
+ for rule in results:
+ text = text + rule['entity'].get('text') + ";\n"
+ #print(text)
+ return text
+
+ async def insert_json_data(self, ragurl, data):
+ try:
+ data = {'collection_name': "smartrag", "data": data, "description": ""}
+ requests.post(ragurl + "/insert_json_data", json=data, timeout=(0.3, 0.3))
+ #self.logger.info(f"璋冪敤褰曞儚鏈嶅姟:{ragurl, data}")
+ except Exception as e:
+ #self.logger.info(f"{self._thread_name}绾跨▼锛氳皟鐢ㄥ綍鍍忔椂鍑洪敊:鍦板潃锛歿ragurl}锛歿e}")
+ return
+
+ def _release_semaphore(self, future):
+ self.semaphore.release()
+ #self.logger.info(f"閲婃斁绾跨▼ (鍓╀綑绌洪棽: {self.semaphore._value}/{self.max_workers})")
+
+ def shutdown(self):
+ """瀹夊叏鍏抽棴"""
+ self.executor.shutdown(wait=False)
+ for model in self.model_pool:
+ del model
+ torch.cuda.empty_cache()
+
+ def _acquire_model(self):
+ """浠庢睜涓幏鍙栦竴涓┖闂叉ā鍨� (绠�鍗曡疆璇�)"""
+ while True:
+ for i, (model, lock) in enumerate(zip(self.model_pool, self.lock_pool)):
+ if lock.acquire(blocking=False):
+ return model, lock
+ time.sleep(0.1) # 閬垮厤CPU绌鸿浆
+
+ def _release_model(self, model):
+ """閲婃斁妯″瀷鍥炴睜"""
+ for i, m in enumerate(self.model_pool):
+ if m == model:
+ self.lock_pool[i].release()
+ break
+
+
+ def remove_duplicate_lines(self,text):
+ seen = set()
+ result = []
+ for line in text.split('銆�'): # 鎸夊彞鍙峰垎鍓�
+ if line.strip() and line not in seen:
+ seen.add(line)
+ result.append(line)
+ return '銆�'.join(result)
+ def remove_duplicate_lines_d(self,text):
+ seen = set()
+ result = []
+ for line in text.split(','): # 鎸夊彞鍙峰垎鍓�
+ if line.strip() and line not in seen:
+ seen.add(line)
+ result.append(line)
+ return '銆�'.join(result)
+ def remove_duplicate_lines_n(self,text):
+ seen = set()
+ result = []
+ for line in text.split('\n'): # 鎸夊彞鍙峰垎鍓�
+ if line.strip() and line not in seen:
+ seen.add(line)
+ result.append(line)
+ return '銆�'.join(result)
+
--
Gitblit v1.8.0