From 0fb0c2c61ef6f48089b941f0e185e51288fe01b0 Mon Sep 17 00:00:00 2001
From: shidong <shidong@jhsoft.cc>
Date: 星期二, 08 七月 2025 19:07:04 +0800
Subject: [PATCH] #2025/7/8 #增了读取知识库ID
---
qwen_thread_batch.py | 203 ++++++++++++++++++++++++++++++++++++++------------
1 files changed, 152 insertions(+), 51 deletions(-)
diff --git a/qwen_thread_batch.py b/qwen_thread_batch.py
index c4f497c..1027151 100644
--- a/qwen_thread_batch.py
+++ b/qwen_thread_batch.py
@@ -13,7 +13,7 @@
import re
from logging.handlers import RotatingFileHandler
-from transformers import AutoModelForVision2Seq, AutoProcessor, BitsAndBytesConfig
+from transformers import AutoModelForVision2Seq, AutoProcessor
class qwen_thread_batch:
@@ -30,19 +30,13 @@
self.config = config
self.model_pool = []
self.lock_pool = [threading.Lock() for _ in range(max_workers)]
- quant_config = BitsAndBytesConfig(
- load_in_4bit=True,
- bnb_4bit_compute_dtype=torch.float16,
- bnb_4bit_quant_type="nf4",
- bnb_4bit_use_double_quant=True
- )
for i in range(max_workers):
model = AutoModelForVision2Seq.from_pretrained(
model_path,
- device_map="cuda:1",
+ device_map=f"cuda:{config.get('cuda')}",
trust_remote_code=True,
- quantization_config=quant_config,
- use_flash_attention_2=True,
+ use_safetensors=True,
+ torch_dtype=torch.float16
).eval()
self.model_pool.append(model)
@@ -89,20 +83,34 @@
raise
def tark_do(self,res_a,ragurl,rag_mode,max_tokens):
- try :
- current_time = datetime.now()
+ try:
# 1. 浠庨泦鍚圓鑾峰彇鍚戦噺鍜屽厓鏁版嵁
is_waning = 0
+ is_desc = 2
+
+ # 鐢熸垚鍥剧墖鎻忚堪
desc_list = self.image_desc(res_a)
+ risk_description = ""
+ suggestion = ""
if desc_list:
- for desc,res in zip(desc_list,res_a):
+ for desc, res in zip(desc_list, res_a):
+ # 鍥剧墖鎻忚堪鐢熸垚鎴愬姛
if desc:
- # rule_text = self.get_rule(ragurl)
- is_waning = self.image_rule_chat(desc,res['waning_value'],ragurl,rag_mode,max_tokens)
is_desc = 2
+ # 璋冪敤瑙勫垯鍖归厤鏂规硶,鍒ゆ柇鏄惁棰勮
+ is_waning = self.image_rule_chat(desc, res['waning_value'], ragurl,rag_mode,max_tokens)
+ # 濡傛灉棰勮,鍒欑敓鎴愰殣鎮f弿杩板拰澶勭悊寤鸿
+ if is_waning == 1:
+ # 鑾峰彇瑙勭珷鍒跺害鏁版嵁
+ filedata = self.get_filedata(res['waning_value'], ragurl)
+ # 鐢熸垚闅愭偅鎻忚堪
+ risk_description = self.image_rule_chat_with_detail(filedata, res['waning_value'], ragurl, rag_mode)
+ # 鐢熸垚澶勭悊寤鸿
+ suggestion = self.image_rule_chat_suggestion(res['waning_value'], ragurl, rag_mode)
else:
- is_waning = 0
is_desc = 3
+
+ # 鏁版嵁缁�
data = {
"id": res['id'],
"event_level_id": res['event_level_id'], # event_level_id
@@ -123,39 +131,45 @@
"image_path": res['image_path'], # image_path
"image_desc_path": res['image_desc_path'], # image_desc_path
"video_path": res['video_path'],
- "text_vector": res['text_vector']
+ "text_vector": res['text_vector'],
+ "risk_description": risk_description,
+ "suggestion": suggestion,
+ "knowledge_id": res['knowledge_id']
}
+
# 淇濆瓨鍒癿ilvus
image_id = self.collection.upsert(data).primary_keys
- #self.logger.info(f"{res['id']}--{image_id}:{desc}")
- if is_desc == 2:
- data = {
- "id": str(image_id[0]),
- "video_point_id": res['video_point_id'],
- "video_path": res["video_point_name"],
- "zh_desc_class": desc,
- "detect_time": res['detect_time'],
- "image_path": f"{res['image_path']}",
- "task_name": res["task_name"],
- "event_level_name": res["event_level_name"],
- "rtsp_address": f"{res['video_path']}"
- }
- # 璋冪敤rag
- asyncio.run(self.insert_json_data(ragurl, data))
- self.logger.info(f"澶勭悊瀹屾瘯:{datetime.now() - current_time}:{len(res_a)}")
+ logging.info(image_id)
+ data = {
+ "id": str(image_id[0]),
+ "video_point_id": res['video_point_id'],
+ "video_path": res["video_point_name"],
+ "zh_desc_class": desc,
+ "detect_time": res['detect_time'],
+ "image_path": f"{res['image_path']}",
+ "task_name": res["task_name"],
+ "event_level_name": res["event_level_name"],
+ "rtsp_address": f"{res['video_path']}"
+ }
+ # 璋冪敤rag
+ asyncio.run(self.insert_json_data(ragurl, data))
except Exception as e:
- self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊:浠诲姟锛歿e}")
+ self.logger.info(f"绾跨▼锛氭墽琛屾ā鍨嬭В鏋愭椂鍑洪敊::{e}")
return 0
def image_desc(self, res_data):
try:
model, lock = self._acquire_model()
image_data = []
- for res in res_data:
- # 2. 澶勭悊鍥惧儚
- image = Image.open(f"{res['image_desc_path']}").convert("RGB") # 鏇挎崲涓烘偍鐨勫浘鐗囪矾寰�
- image = image.resize((448, 448), Image.Resampling.LANCZOS) # 楂樿川閲忕缉鏀�
- image_data.append(image)
+ # 1. 骞惰鍔犺浇鍥惧儚
+ def _load_image(path):
+ return Image.open(path).convert("RGB").resize((448, 448), Image.Resampling.LANCZOS)
+
+ with ThreadPoolExecutor(max_workers=4) as executor:
+ image_data = list(executor.map(
+ _load_image,
+ [res['image_desc_path'] for res in res_data]
+ ))
messages = [
{
@@ -178,17 +192,9 @@
padding=True,
return_tensors="pt",
)
- inputs = inputs.to("cuda:1")
- current_time = datetime.now()
+ inputs = inputs.to(model.device)
with torch.inference_mode():
- outputs = model.generate(**inputs,
- max_new_tokens=50,
- do_sample=False,
- temperature=0.7,
- top_k=40,
- num_beams=1,
- repetition_penalty= 1.1
- )
+ outputs = model.generate(**inputs,max_new_tokens=100)
generated_ids = outputs[:, len(inputs.input_ids[0]):]
image_text = self.processor.batch_decode(
generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
@@ -235,9 +241,9 @@
def image_rule_chat(self, image_des,rule_text, ragurl, rag_mode,max_tokens):
try:
+
content = (
f"鍥剧墖鎻忚堪鍐呭涓猴細\n{image_des}\n瑙勫垯鍐呭锛歕n{rule_text}銆俓n璇烽獙璇佸浘鐗囨弿杩颁腑鏄惁鏈夌鍚堣鍒欑殑鍐呭锛屼笉杩涜鎺ㄧ悊鍜宼hink銆傝繑鍥炵粨鏋滄牸寮忎负[xxx绗﹀悎鐨勮鍒檌d]锛屽鏋滄病鏈夎繑鍥瀃]")
- # self.logger.info(content)
#self.logger.info(len(content))
search_data = {
"prompt": "",
@@ -256,7 +262,6 @@
}
response = requests.post(ragurl + "/chat", json=search_data)
results = response.json().get('data')
- #self.logger.info(len(results))
# self.logger.info(results)
ret = re.sub(r'<think>.*?</think>', '', results, flags=re.DOTALL)
ret = ret.replace(" ", "").replace("\t", "").replace("\n", "")
@@ -268,6 +273,102 @@
self.logger.info(f"绾跨▼锛氭墽琛岃鍒欏尮閰嶆椂鍑洪敊:{image_des, rule_text, ragurl, rag_mode,e}")
return None
+ # 闅愭偅鎻忚堪
+ def image_rule_chat_with_detail(self,filedata, rule_text, ollama_url, ollama_mode="qwen2.5vl:3b"):
+
+ # API璋冪敤
+ response = requests.post(
+ # ollama鍦板潃
+ url=f"{ollama_url}/chat",
+ json={
+ "prompt":"",
+ # 璇锋眰鍐呭
+ "messages": [
+ {
+ "role": "user",
+ "content": f"璇锋牴鎹绔犲埗搴{filedata}]\n鏌ユ壘[{rule_text}]鐨勫畨鍏ㄩ殣鎮f弿杩帮紝涓嶈繘琛屾帹鐞嗗拰think銆傝繑鍥炰俊鎭皬浜�800瀛�"
+ }
+ ],
+ # 鎸囧畾妯″瀷
+ "llm_name": "qwen3:8b",
+ "stream": False, # 鍏抽棴娴佸紡杈撳嚭
+ "gen_conf": {
+ "temperature": 0.7, # 鎺у埗鐢熸垚闅忔満鎬�
+ "max_tokens": 800 # 鏈�澶ц緭鍑洪暱搴�
+ }
+ }
+ )
+ # 浠巎son鎻愬彇data瀛楁鍐呭
+ ret = response.json()["data"]
+ #result = response.json()
+ #ret = result.get("data") or result.get("message", {}).get("content", "")
+ # 绉婚櫎<think>鏍囩鍜屽唴瀹�
+ ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
+ # 瀛楃涓叉竻鐞�,绉婚櫎绌烘牸,鍒惰〃绗�,鎹㈣绗�,鏄熷彿
+ ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
+ print(ret)
+ return ret
+ #澶勭悊寤鸿
+ def image_rule_chat_suggestion(self, rule_text, ollama_url, ollama_mode="qwen2.5vl:3b"):
+ self.logger.info("----------------------------------------------------------------")
+ # 璇锋眰鍐呭
+ content = (
+ f"璇锋牴鎹繚瑙勫唴瀹筟{rule_text}]\n杩涜杩斿洖澶勭悊杩濊寤鸿锛屼笉杩涜鎺ㄧ悊鍜宼hink銆傝繑鍥炵簿鍑嗕俊鎭�")
+ response = requests.post(
+ # ollama鍦板潃
+ url=f"{ollama_url}/chat",
+ json={
+ # 鎸囧畾妯″瀷
+ "llm_name": "qwen3:8b",
+ "messages": [
+ {"role": "user", "content": content}
+ ],
+ "stream": False # 鍏抽棴娴佸紡杈撳嚭
+ }
+ )
+ # 浠巎son鎻愬彇data瀛楁鍐呭
+ ret = response.json()["data"]
+ #result = response.json()
+ #ret = result.get("data") or result.get("message", {}).get("content", "")
+ # 绉婚櫎<think>鏍囩鍜屽唴瀹�
+ ret = re.sub(r'<think>.*?</think>', '', ret, flags=re.DOTALL)
+ # 瀛楃涓叉竻鐞�,绉婚櫎绌烘牸,鍒惰〃绗�,鎹㈣绗�,鏄熷彿
+ ret = ret.replace(" ", "").replace("\t", "").replace("\n", "").replace("**","")
+ print(ret)
+ return ret
+
+ # RAG鏈嶅姟鍙戦�佽姹�,鑾峰彇鐭ヨ瘑搴撳唴瀹�
+ def get_filedata(self, searchtext, ragurl):
+ search_data = {
+ # 鐭ヨ瘑搴撻泦鍚�
+ "collection_name": "smart_knowledge",
+ # 鏌ヨ鏂囨湰
+ "query_text": searchtext,
+ # 鎼滅储妯″紡
+ "search_mode": "hybrid",
+ # 鏈�澶氳繑鍥炵粨鏋�
+ "limit": 100,
+ # 璋冨瘑鍚戦噺鎼滅储鏉冮噸
+ "weight_dense": 0.7,
+ # 绋�鐤忓悜閲忔悳绱㈡潈閲�
+ "weight_sparse": 0.3,
+ # 绌哄瓧绗︿覆
+ "filter_expr": "",
+ # 鍙繑鍥� text 瀛楁
+ "output_fields": ["text"]
+ }
+ # 鍚� ragurl + "/search" 绔偣鍙戦�丳OST璇锋眰
+ response = requests.post(ragurl + "/search", json=search_data)
+ # 浠庡搷搴斾腑鑾峰彇'results'瀛楁
+ results = response.json().get('results')
+ # 鍒濆鍖� text
+ text = ""
+ # 閬嶅巻鎵�鏈夌粨鏋滆鍒�(rule)锛屽皢姣忔潯瑙勫垯鐨�'entity'涓殑'text'瀛楁鍙栧嚭.
+ for rule in results:
+ text = text + rule['entity'].get('text') + ";\n"
+
+ return text
+
async def insert_json_data(self, ragurl, data):
try:
data = {'collection_name': "smartrag", "data": data, "description": ""}
--
Gitblit v1.8.0