| | |
| | | from pymilvus import connections, Collection |
| | | from logging.handlers import RotatingFileHandler |
| | | import get_mem |
| | | from qwen_thread_description import qwen_thread_description |
| | | |
| | | |
| | | class ThreadPool: |
| | | def __init__(self): |
| | |
| | | # 加载集合 |
| | | self.collection = Collection(name="smartobject") |
| | | self.collection.load() |
| | | #创建qwen线程池 |
| | | self.pool = qwen_thread(self.config,self.logger) |
| | | #创建qwen线程池--预警线程池 |
| | | self.poolWarning = qwen_thread(self.config,self.logger) |
| | | # 创建qwen线程池--图片描述等 |
| | | self.poolDescription= qwen_thread_description(self.config, self.logger) |
| | | #是否更新 |
| | | self._isupdate = False |
| | | |
| | |
| | | self.threads[camera_id] = t |
| | | return t |
| | | |
| | | # 启动线程任务 |
| | | # 启动线程任务--预警 |
| | | def workerWarning(self, camera_id): |
| | | while True: |
| | | try: |
| | | res_a = self.collection.query( |
| | | expr=f"is_desc == 0 and video_point_id=={camera_id}", |
| | | output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", |
| | | "task_id", |
| | | "task_name", "event_level_id", "event_level_name", |
| | | "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", |
| | | "rule_id", |
| | | "detect_id", "knowledge_id", "suggestion", "risk_description", |
| | | "detect_time", "image_path", "image_desc_path", "video_path"], |
| | | consistency_level="Strong", |
| | | order_by_field="id", # 按id字段排序 |
| | | order_by_type="desc" # 降序排列 |
| | | ) |
| | | # 读取共享内存中的图片 |
| | | # image_id = get_mem.smem_read_frame_qianwen(camera_id) |
| | | if len(res_a) > 0: |
| | | # sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True) |
| | | # res = sorted_results[0] |
| | | res = max(res_a, key=itemgetter("id")) |
| | | self.collection.delete(f"id == {res['id']}") |
| | | # 数据组 |
| | | data = { |
| | | "event_level_id": res['event_level_id'], # event_level_id |
| | | "event_level_name": res['event_level_name'], # event_level_id |
| | | "rule_id": res["rule_id"], |
| | | "video_point_id": res['video_point_id'], # video_point_id |
| | | "video_point_name": res['video_point_name'], |
| | | "is_waning": 0, |
| | | "is_desc":4,#预警中状态 |
| | | "zh_desc_class": res['zh_desc_class'], |
| | | "bounding_box": res['bounding_box'], # bounding_box |
| | | "task_id": res['task_id'], # task_id |
| | | "task_name": res['task_name'], # task_id |
| | | "detect_id": res['detect_id'], # detect_id |
| | | "detect_time": res['detect_time'], # detect_time |
| | | "detect_num": res['detect_num'], |
| | | "waning_value": res['waning_value'], |
| | | "image_path": res['image_path'], # image_path |
| | | "image_desc_path": res['image_desc_path'], # image_desc_path |
| | | "video_path": res['video_path'], |
| | | "text_vector": res['text_vector'], |
| | | "risk_description": res['risk_description'], |
| | | "suggestion": res['suggestion'], |
| | | "knowledge_id": res['knowledge_id'] |
| | | } |
| | | # 保存到milvus |
| | | image_id = self.collection.insert(data).primary_keys |
| | | res['id'] = image_id[0] |
| | | self.poolWarning.submit(res) |
| | | time_sel.sleep(0.01) |
| | | except Exception as e: |
| | | logging.info(f"{camera_id}线程错误:{e}") |
| | | |
| | | # 启动线程任务--生成图片描述等 |
| | | def worker(self, camera_id): |
| | | while True: |
| | | try: |
| | | res_a = self.collection.query( |
| | | expr=f"is_desc == 0 and video_point_id=={camera_id}", |
| | | expr=f"is_desc == 5 and video_point_id=={camera_id}", |
| | | output_fields=["id", "zh_desc_class", "text_vector", "bounding_box", "video_point_name", "task_id", |
| | | "task_name", "event_level_id", "event_level_name", |
| | | "video_point_id", "detect_num", "is_waning", "is_desc", "waning_value", "rule_id", |
| | | "detect_id","knowledge_id","suggestion", |
| | | "detect_id","knowledge_id","suggestion","risk_description", |
| | | "detect_time", "image_path", "image_desc_path", "video_path"], |
| | | consistency_level="Strong", |
| | | order_by_field="id", # 按id字段排序 |
| | |
| | | # 读取共享内存中的图片 |
| | | # image_id = get_mem.smem_read_frame_qianwen(camera_id) |
| | | if len(res_a) > 0: |
| | | sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True) |
| | | res = sorted_results[0] |
| | | #sorted_results = sorted(res_a, key=itemgetter("id"), reverse=True) |
| | | #res = sorted_results[0] |
| | | res = max(res_a, key=itemgetter("id")) |
| | | self.collection.delete(f"id == {res['id']}") |
| | | # 数据组 |
| | | data = { |
| | | "id": res['id'], |
| | | "event_level_id": res['event_level_id'], # event_level_id |
| | | "event_level_name": res['event_level_name'], # event_level_id |
| | | "rule_id": res["rule_id"], |
| | | "video_point_id": res['video_point_id'], # video_point_id |
| | | "video_point_name": res['video_point_name'], |
| | | "is_waning": 0, |
| | | "is_waning": res['is_waning'], |
| | | "is_desc": 1, |
| | | "zh_desc_class": res['zh_desc_class'], # text_vector |
| | | "zh_desc_class": res['zh_desc_class'], |
| | | "bounding_box": res['bounding_box'], # bounding_box |
| | | "task_id": res['task_id'], # task_id |
| | | "task_name": res['task_name'], # task_id |
| | |
| | | "image_desc_path": res['image_desc_path'], # image_desc_path |
| | | "video_path": res['video_path'], |
| | | "text_vector": res['text_vector'], |
| | | "knowledge_id": res['knowledge_id'], |
| | | "risk_description": res['risk_description'], |
| | | "suggestion": res['suggestion'], |
| | | "knowledge_id": res['knowledge_id'] |
| | | } |
| | | # logging.info(f"读取图像成功: {res['id']}") |
| | | # 保存到milvus |
| | | image_id = self.collection.upsert(data).primary_keys |
| | | image_id = self.collection.insert(data).primary_keys |
| | | res['id'] = image_id[0] |
| | | self.pool.submit(res) |
| | | self.poolDescription.submit(res) |
| | | time_sel.sleep(0.01) |
| | | except Exception as e: |
| | | logging.info(f"{camera_id}线程错误:{e}") |
| | | time_sel.sleep(0.01) |
| | | |
| | | |
| | | #调用是否需要更新 |
| | | def isUpdate(self): |
| | |
| | | |
| | | def shutdown_all(self) -> None: |
| | | """清理所有线程""" |
| | | with self.lock: |
| | | for camera_id, thread in list(self.threads.items()): |
| | | if thread.is_alive(): |
| | | thread.join(timeout=1) |
| | | del self.threads[camera_id] |
| | | for camera_id, thread in list(self.threads.items()): |
| | | if thread.is_alive(): |
| | | thread.join(timeout=1) |
| | | del self.threads[camera_id] |
| | | |
| | | |
| | | #获取任务 |
| | | def getTaskconf(self,isupdate): |
| | |
| | | thread = pool.threads.get(camera.get("camera_id")) |
| | | if not thread: |
| | | logging.info(f"开始创建{camera.get('camera_id')}线程") |
| | | #先对数据进行预警 |
| | | pool.safe_start(pool.workerWarning, camera.get('camera_id')) |
| | | #在生成图片描述、处理建议等信息 |
| | | pool.safe_start(pool.worker, camera.get('camera_id')) |
| | | logging.info(f"{camera.get('camera_id')}线程创建完毕") |
| | | |
| | |
| | | thread = pool.threads.get(camera.get("camera_id")) |
| | | if not thread: |
| | | logging.info(f"开始创建{camera.get('camera_id')}线程") |
| | | pool.safe_start(pool.workerWarning, camera.get('camera_id')) |
| | | pool.safe_start(pool.worker, camera.get('camera_id')) |
| | | logging.info(f"{camera.get('camera_id')}线程创建完毕") |
| | | |