|
import threading
|
import time as time_sel
|
from typing import Dict
|
import qwen_task
|
import requests
|
import os
|
import torch
|
import logging
|
from transformers import AutoProcessor, AutoModelForVision2Seq
|
from pymilvus import connections, Collection
|
from logging.handlers import RotatingFileHandler
|
import get_mem
|
|
|
class ThreadPool:
|
def __init__(self):
|
#读取配置文件
|
self.config = {}
|
with open('./conf.txt', 'r', encoding='utf-8') as file:
|
for line in file:
|
# 去除每行的首尾空白字符(包括换行符)
|
line = line.strip()
|
# 跳过空行
|
if not line:
|
continue
|
# 分割键和值
|
if '=' in line:
|
key, value = line.split('=', 1)
|
# 去除键和值的首尾空白字符
|
key = key.strip()
|
value = value.strip()
|
# 将键值对添加到字典中
|
self.config[key] = value
|
# 配置日志
|
# 确保日志目录存在
|
log_dir = "logs"
|
os.makedirs(log_dir, exist_ok=True)
|
self.threads: Dict[str, threading.Thread] = {}
|
self.lock = threading.Lock()
|
|
self.device = "cuda" if torch.cuda.is_available() else "cpu"
|
# 初始化qwenvl检测模型
|
# 指定本地路径
|
model_path = "./Qwen2.5-VL-3B-Instruct-AWQ"
|
# 加载模型
|
self.qwen_model = AutoModelForVision2Seq.from_pretrained(
|
model_path,
|
device_map="auto"
|
).eval()
|
# 加载处理器
|
self.qwen_tokenizer = AutoProcessor.from_pretrained(model_path,use_fast=True)
|
|
# 初始化Milvus集合
|
connections.connect("default", host=self.config.get("milvusurl"), port=self.config.get("milvusport"))
|
# 加载集合
|
self.collection = Collection(name="smartobject")
|
self.collection.load()
|
|
#是否更新
|
self._isupdate = False
|
|
# 初始化共享内存
|
get_mem.smem_init()
|
|
# 配置日志
|
logging.basicConfig(
|
level=logging.INFO,
|
format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s: %(message)s',
|
datefmt='%Y-%m-%d %H:%M:%S',
|
handlers=[
|
# 按大小轮转的日志文件(最大10MB,保留3个备份)
|
RotatingFileHandler(
|
filename=os.path.join(log_dir, 'start_log.log'),
|
maxBytes=10 * 1024 * 1024, # 10MB
|
backupCount=3,
|
encoding='utf-8'
|
),
|
# 同时输出到控制台
|
logging.StreamHandler()
|
]
|
)
|
|
#启动线程
|
def safe_start(self, target_func, camera_id):
|
"""线程安全启动方法"""
|
def wrapped():
|
thread_name = threading.current_thread().name
|
try:
|
target_func(camera_id)
|
except Exception as e:
|
logging.error(f"线程异常: {str(e)}", exc_info=True)
|
|
with self.lock: # 确保线程安全创建
|
t = threading.Thread(
|
target=wrapped,
|
daemon=True # 设置为守护线程
|
)
|
t.start()
|
self.threads[camera_id] = t
|
return t
|
|
# 启动线程任务
|
def worker(self, camera_id):
|
# 初始化
|
detect = qwen_task.detect_tasks()
|
detect._thread_name = f"{camera_id}" # 设置名称
|
detect.init_logging(f"{camera_id}")
|
# 初始化检测模型
|
detect.device = self.device
|
# 初始化qwen模型
|
detect.qwen_tokenizer = self.qwen_tokenizer
|
detect.qwen_model = self.qwen_model
|
|
# 初始化Milvus集合
|
detect.collection = self.collection
|
|
# 设置线程启动状态
|
detect._running = True
|
try:
|
# 读取共享内存中的图片
|
image_id_list = [458671636370971032]
|
for image_id in image_id_list:
|
logging.info(f"读取图像成功: {image_id}")
|
image_id = detect.tark_do(image_id,self.config.get("filesavepath"),self.config.get("ragurl"),self.config.get("max_tokens"))
|
logging.info(f"处理图像成功: {image_id}")
|
except Exception as e:
|
logging.info(f"{detect._thread_name}线程错误:{e}")
|
|
#调用是否需要更新
|
def isUpdate(self):
|
try:
|
# 定义请求的 URL
|
url = self.config.get("isupdateurl")
|
# 发送 GET 请求
|
response = requests.get(url)
|
|
# 检查响应状态码
|
if response.status_code == 200:
|
data = response.json().get("data")
|
if data.get("isChange") == 1:
|
return True
|
else:
|
return False
|
except Exception as e:
|
logging.info(f"调用是否需要更新时出错:URL:{self.config.get('isupdateurl')}:{e}")
|
return False
|
|
#修改是否更新状态
|
def update_status(self):
|
try:
|
# 更新状态
|
url = self.config.get("updatestatusurl")
|
# 发送 GET 请求
|
response = requests.post(url)
|
# 检查响应状态码
|
if response.status_code == 200:
|
return True
|
else:
|
return False
|
except Exception as e:
|
logging.info(f"修改是否更新状态时出错:URL:{self.config.get('updatestatusurl')}:{e}")
|
return False
|
|
def shutdown_all(self) -> None:
|
"""清理所有线程"""
|
with self.lock:
|
for camera_id, thread in list(self.threads.items()):
|
if thread.is_alive():
|
thread.join(timeout=1)
|
del self.threads[camera_id]
|
|
#获取任务
|
def getTaskconf(self,isupdate):
|
try:
|
# 定义请求的 URL
|
url = self.config.get("gettaskconfurl")
|
# 发送 GET 请求
|
response = requests.get(url)
|
# 检查响应状态码
|
if response.status_code == 200:
|
data = response.json()
|
if isupdate:
|
# 更新状态
|
self.update_status()
|
return data.get("data")
|
else:
|
return []
|
except Exception as e:
|
logging.info(f"调用获取任务时出错:URL:{self.config.get('gettaskconfurl')}:{e}")
|
return []
|
|
# 使用示例
|
if __name__ == "__main__":
|
pool = ThreadPool()
|
is_init = True
|
camera_data = pool.getTaskconf(False)
|
while True:
|
try:
|
pool._isupdate = False # 是否更新数据
|
# 是否需要更新任务数据
|
if pool.isUpdate():
|
# 获取摄像机任务
|
camera_data = pool.getTaskconf(True)
|
pool._isupdate = True # 更新数据
|
|
if is_init:
|
if camera_data:
|
for camera in camera_data:
|
thread = pool.threads.get(camera.get("camera_id"))
|
if not thread:
|
logging.info(f"开始创建{camera.get('camera_id')}线程")
|
pool.safe_start(pool.worker, camera.get('camera_id'))
|
logging.info(f"{camera.get('camera_id')}线程创建完毕")
|
|
if pool._isupdate:
|
logging.info(f"更新线程开始")
|
pool.shutdown_all()
|
if camera_data:
|
for camera in camera_data:
|
thread = pool.threads.get(camera.get("camera_id"))
|
if not thread:
|
logging.info(f"开始创建{camera.get('camera_id')}线程")
|
pool.safe_start(pool.worker, camera.get('camera_id'))
|
logging.info(f"{camera.get('camera_id')}线程创建完毕")
|
|
logging.info(f"更新线程结束")
|
|
is_init = False
|
time_sel.sleep(1)
|
except Exception as e:
|
logging.info(f"主线程未知错误:{e}")
|