import json from Log import logger from app.config.agent_base_url import DF_CHAT_AGENT from app.config.config import settings from app.config.const import message_error, message_event, complex_knowledge_chat from app.models import ComplexChatSessionDao, ChatData from app.service.v2.app_driver.chat_agent import ChatAgent from app.service.v2.chat import get_chat_token async def service_chat_mindmap_v1(db, message_id, message, mindmap_chat_id, user_id): res = {} mindmap_query = "" complex_log = ComplexChatSessionDao(db) session = await complex_log.get_session_by_id(message_id) if session: token = await get_chat_token(db, session.chat_id) chat = ChatAgent() url = settings.dify_base_url + DF_CHAT_AGENT if session.mindmap: chat_request = json.loads(session.query) try: async for ans in chat.chat_completions(url, await chat.request_data(message, session.conversation_id, str(user_id), ChatData(), chat_request.get("files", [])), await chat.get_headers(token)): if ans.get("event") == message_error: return res elif ans.get("event") == message_event: mindmap_query += ans.get("answer", "") else: continue except Exception as e: logger.error(e) return res else: mindmap_query = session.content try: mindmap_str = "" token = await get_chat_token(db, mindmap_chat_id) async for ans in chat.chat_completions(url, await chat.request_data(mindmap_query, "", str(user_id), ChatData()), await chat.get_headers(token)): if ans.get("event") == message_error: return res elif ans.get("event") == message_event: mindmap_str += ans.get("answer", "") else: continue except Exception as e: logger.error(e) return res mindmap_list = mindmap_str.split("```") mindmap_str = mindmap_list[1].lstrip("markdown\n") if session.mindmap: node_list = await mindmap_to_merge(session.mindmap, mindmap_str, f"- {message}") mindmap_str = "\n".join(node_list) res["mindmap"] = mindmap_str await complex_log.update_mindmap_by_id(message_id, mindmap_str) return res async def service_chat_mindmap(db, message_id, message, mindmap_chat_id, user_id): res = {} mindmap_query = "" complex_log = ComplexChatSessionDao(db) session = await complex_log.get_session_by_id(message_id) if session: token = await get_chat_token(db, session.chat_id) chat = ChatAgent() url = settings.dify_base_url + DF_CHAT_AGENT if session.mindmap: chat_request = json.loads(session.query) inputs = {"is_deep": chat_request.get("isDeep", 1)} if session.chat_mode == complex_knowledge_chat: inputs["query_json"] = json.dumps( {"query": chat_request.get("query", ""), "dataset_ids": chat_request.get("knowledgeId", [])}) try: async for ans in chat.chat_completions(url, await chat.complex_request_data(message, session.conversation_id, str(user_id), files=chat_request.get("files", []), inputs=inputs), await chat.get_headers(token)): if ans.get("event") == message_error: return res elif ans.get("event") == message_event: mindmap_query += ans.get("answer", "") else: continue except Exception as e: logger.error(e) return res else: mindmap_query = session.content try: mindmap_str = "" token = await get_chat_token(db, mindmap_chat_id) async for ans in chat.chat_completions(url, await chat.complex_request_data(mindmap_query, "", str(user_id)), await chat.get_headers(token)): if ans.get("event") == message_error: return res elif ans.get("event") == message_event: mindmap_str += ans.get("answer", "") else: continue except Exception as e: logger.error(e) return res if "```json" in mindmap_str: mindmap_list = mindmap_str.split("```") mindmap_str = mindmap_list[1].lstrip("json") mindmap_str = mindmap_str.replace("\n", "") if session.mindmap: mindmap_str = await mindmap_merge_dict(session.mindmap, mindmap_str, message) try: res_str = await mindmap_join_str(mindmap_str) res["mindmap"] = res_str except Exception as e: logger.error(e) return res await complex_log.update_mindmap_by_id(message_id, mindmap_str) return res async def mindmap_merge_dict(parent, child, target_node): parent_dict = json.loads(parent) if child: child_dict = json.loads(child) def iter_dict(node): if "items" not in node: if node["title"] == target_node: node["items"] = child_dict["items"] return else: for i in node["items"]: iter_dict(i) iter_dict(parent_dict) return json.dumps(parent_dict) async def mindmap_join_str(mindmap_json): try: parent_dict = json.loads(mindmap_json) except Exception as e: logger.error(e) return "" def join_node(node, level): mindmap_str = "" if level <= 2: mindmap_str += f"{'#'*level} {node['title']}\n" else: mindmap_str += f"{' '*(level-3)*2}- {node['title']}\n" for i in node.get("items", []): mindmap_str += join_node(i, level+1) return mindmap_str return join_node(parent_dict, 1) async def mindmap_to_merge(parent, child, target_node): level = 0 index = 0 new_node_list = [] parent_list= parent.split("\n") child_list= child.split("\n") child_list[0] = target_node for i, node in enumerate(parent_list): if node.endswith(target_node): level = len(node) - len(target_node) index = i break tmp_level = 0 for child in child_list: if "#" in child: childs = child.split("#") tmp_level = len(childs) - 2 new_node_list.append(" "*(level+tmp_level)+ "-"+childs[-1]) elif len(child) == 0: continue else: new_node_list.append(" "*(level+tmp_level)+child) return parent_list[:index]+new_node_list+parent_list[index+1:] if __name__ == '__main__': a = '{ "title": "全生命周期管理", "items": [ { "title": "设备规划与采购", "items": [ { "title": "需求分析与选型" ,"items": [{"title": "rererer"}, {"title": "trtrtrtrt"}] }, { "title": "供应商选择与合同管理" } ] }, { "title": "设备安装与调试", "items": [ { "title": "安装规范" }, { "title": "调试测试" } ] }, { "title": "设备使用", "items": [ { "title": "操作培训" }, { "title": "操作规程与记录" } ] }, { "title": "设备维护与维修", "items": [ { "title": "定期维护" }, { "title": "故障诊断" }, { "title": "备件管理" } ] }, { "title": "设备更新与改造", "items": [ { "title": "技术评估" }, { "title": "更新计划" }, { "title": "改造方案" } ] }, { "title": "设备报废", "items": [ { "title": "报废评估" }, { "title": "报废处理" } ] }, { "title": "信息化管理", "items": [ { "title": "设备管理系统" }, { "title": "数据分析" }, { "title": "远程监控" } ] }, { "title": "安全管理", "items": [ { "title": "安全培训" }, { "title": "安全检查" }, { "title": "应急预案" } ] }, { "title": "环境保护", "items": [ { "title": "环保设备" }, { "title": "废物处理" }, { "title": "节能减排" } ] }, { "title": "具体实践案例", "items": [ { "title": "高压开关设备润滑脂选用研究" }, { "title": "环保型 C4 混气 GIS 设备运维技术研究" } ] }, { "title": "总结", "items": [ { "title": "提高运营效率和竞争力" } ] } ]}' b = mindmap_merge_dict(a, {}, "设备规划与采购") print(b)