From 6bac1630e5af5890a6922bdc624e591eb19a12eb Mon Sep 17 00:00:00 2001 From: zhaoqingang <zhaoqg0118@163.com> Date: 星期四, 13 三月 2025 18:36:07 +0800 Subject: [PATCH] 知识库对接rg --- app/config/env_conf/default_agent_conf.json | 4 app/service/v2/app_driver/chat_dialog.py | 13 ++ app/service/v2/chat.py | 185 ++++++++++++++++++++++++------------ app/service/v2/initialize_data.py | 50 ++++++--- app/models/v2/chat.py | 4 5 files changed, 168 insertions(+), 88 deletions(-) diff --git a/app/config/env_conf/default_agent_conf.json b/app/config/env_conf/default_agent_conf.json index a37e185..1829e05 100644 --- a/app/config/env_conf/default_agent_conf.json +++ b/app/config/env_conf/default_agent_conf.json @@ -141,14 +141,14 @@ ], "complex": [ { - "id": "c703ad73-faed-4fab-b00c-717766ad71dd", + "id": "9992ba72fff111ef8d890242ac120004", "name": "鐭ヨ瘑搴撲笓瀹�", "description": "鐭ヨ瘑搴撲笓瀹�", "icon": "intellFrame4", "chat_mode": 3, "parameters": { }, - "dialogType": "4", + "dialogType": "1", "mode": "complex-chat" }, { diff --git a/app/models/v2/chat.py b/app/models/v2/chat.py index 7aed562..2945e87 100644 --- a/app/models/v2/chat.py +++ b/app/models/v2/chat.py @@ -6,7 +6,7 @@ from sqlalchemy import Column, Integer, String, BigInteger, ForeignKey, DateTime, Text, TEXT from sqlalchemy.orm import Session -from app.config.const import Dialog_STATSU_DELETE +from app.config.const import Dialog_STATSU_DELETE, Dialog_STATSU_ON from app.models.base_model import Base from app.utils.common import current_time @@ -138,7 +138,7 @@ return [i.id for i in session_list] async def get_complex_chat_by_mode(self, chat_mode: int) -> ComplexChatModel | None: - session = self.db.query(ComplexChatModel).filter(ComplexChatModel.chat_mode==chat_mode, ComplexChatModel.status!=Dialog_STATSU_DELETE).first() + session = self.db.query(ComplexChatModel).filter(ComplexChatModel.chat_mode==chat_mode, ComplexChatModel.status==Dialog_STATSU_ON).first() return session diff --git a/app/service/v2/app_driver/chat_dialog.py b/app/service/v2/app_driver/chat_dialog.py index bfbe645..9aa750e 100644 --- a/app/service/v2/app_driver/chat_dialog.py +++ b/app/service/v2/app_driver/chat_dialog.py @@ -22,6 +22,8 @@ yield json_data except json.JSONDecodeError as e: + # print(e) + # print(complete_response) logger.info("Invalid JSON data------------------") # print(e) @@ -43,9 +45,14 @@ "session_id": session_id } - - - + @staticmethod + async def complex_request_data(question, dataset_ids, session_id=""): + return { + "question": question, + "stream": True, + "session_id": session_id, + "kb_ids": dataset_ids + } if __name__ == "__main__": diff --git a/app/service/v2/chat.py b/app/service/v2/chat.py index 96672a3..38683a8 100644 --- a/app/service/v2/chat.py +++ b/app/service/v2/chat.py @@ -451,6 +451,9 @@ node_data = [] if not query_data: query_data = {} + # print(node_data) + # print("--------------------------------------------------------") + # print(query_data) try: complex_log = ComplexChatSessionDao(db) if not conversation_id: @@ -485,20 +488,20 @@ async def service_complex_chat(db, chat_id, mode, user_id, chat_request: ChatDataRequest): answer_event = "" answer_agent = "" + answer_dialog = "" answer_workflow = "" download_url = "" message_id = "" task_id = "" error = "" node_list = [] + reference= {} conversation_id = "" query_data = chat_request.to_dict() new_message_id = str(uuid.uuid4()) inputs = {"is_deep": chat_request.isDeep} files = chat_request.files - if chat_request.chatMode == complex_knowledge_chat: - inputs["query_json"] = json.dumps({"query": chat_request.query, "dataset_ids": chat_request.knowledgeId}) - elif chat_request.chatMode == complex_content_optimization_chat: + if chat_request.chatMode == complex_content_optimization_chat: inputs["type"] = chat_request.optimizeType elif chat_request.chatMode == complex_dialog_chat: if not files and chat_request.parentId: @@ -512,69 +515,125 @@ "error": "\n**ERROR**: 鍒涘缓浼氳瘽澶辫触锛�", "status": http_500}, ensure_ascii=False) + "\n\n" return + query_data["parentId"] = new_message_id try: - token = await get_chat_token(db, chat_id) - chat, url = await get_chat_object(mode) - async for ans in chat.chat_completions(url, - await chat.complex_request_data(chat_request.query, conversation_id, str(user_id), files=files, inputs=inputs), - await chat.get_headers(token)): - # print(ans) - data = {} - status = http_200 - conversation_id = ans.get("conversation_id") - task_id = ans.get("task_id") - if ans.get("event") == message_error: - error = ans.get("message", "鍙傛暟寮傚父锛�") - status = http_400 - event = smart_message_error - elif ans.get("event") == message_agent: - data = {"answer": ans.get("answer", ""), "id": ans.get("message_id", "")} - answer_agent += ans.get("answer", "") - message_id = ans.get("message_id", "") - event = smart_message_stream - elif ans.get("event") == message_event: - data = {"answer": ans.get("answer", ""), "id": ans.get("message_id", "")} - answer_event += ans.get("answer", "") - message_id = ans.get("message_id", "") - event = smart_message_stream - elif ans.get("event") == message_file: - data = {"url": ans.get("url", ""), "id": ans.get("id", ""), - "type": ans.get("type", "")} - files.append(data) - event = smart_message_file - elif ans.get("event") in [workflow_started, node_started, node_finished]: - data = ans.get("data", {}) - data["inputs"] = await data_process(data.get("inputs", {})) - data["outputs"] = await data_process(data.get("outputs", {})) - data["files"] = await data_process(data.get("files", [])) - data["process_data"] = "" - if data.get("status") == "failed": - status = http_500 - error = data.get("error", "") - node_list.append(ans) - event = [smart_workflow_started, smart_node_started, smart_node_finished][ - [workflow_started, node_started, node_finished].index(ans.get("event"))] - elif ans.get("event") == workflow_finished: - data = ans.get("data", {}) - answer_workflow = data.get("outputs", {}).get("output", data.get("outputs", {}).get("answer")) - download_url = data.get("outputs", {}).get("download_url") - event = smart_workflow_finished - if data.get("status") == "failed": - status = http_500 - error = data.get("error", "") - node_list.append(ans) + if chat_request.chatMode == complex_knowledge_chat: + if not conversation_id: + session = await service_chat_sessions(db, chat_id, chat_request.query) + # print(session) + if not session or session.get("code") != 0: + yield "data: " + json.dumps( + {"message": smart_message_error, "error": "\n**ERROR**: chat agent error", "status": http_500}) + return + conversation_id = session.get("data", {}).get("id") + token = await get_chat_token(db, rg_api_token) + url = settings.fwr_base_url + RG_CHAT_DIALOG.format(chat_id) + chat = ChatDialog() + try: + async for ans in chat.chat_completions(url, await chat.complex_request_data(chat_request.query, chat_request.knowledgeId, conversation_id), + await chat.get_headers(token)): + data = {} + error = "" + status = http_200 + if ans.get("code", None) == 102: + error = ans.get("message", "error锛�") + status = http_400 + event = smart_message_error + else: + if isinstance(ans.get("data"), bool) and ans.get("data") is True: + event = smart_message_end + else: + data = ans.get("data", {}) + # conversation_id = data.get("session_id", "") + if "session_id" in data: + del data["session_id"] + data["prompt"] = "" + if not message_id: + message_id = data.get("id", "") + answer_dialog = data.get("answer", "") + reference = data.get("reference", {}) + event = smart_message_cover + message_str = "data: " + json.dumps( + {"event": event, "data": data, "error": error, "status": status, "message_id":message_id, + "parent_id": new_message_id, + "session_id": chat_request.sessionId}, + ensure_ascii=False) + "\n\n" + for i in range(0, len(message_str), max_chunk_size): + chunk = message_str[i:i + max_chunk_size] + # print(chunk) + yield chunk # 鍙戦�佸垎鍧楁秷鎭� + except Exception as e: - elif ans.get("event") == message_end: - event = smart_message_end - else: - continue + logger.error(e) + try: + yield "data: " + json.dumps({"message": smart_message_error, + "error": "\n**ERROR**: " + str(e), "status": http_500}, + ensure_ascii=False) + "\n\n" + except: + ... + else: + token = await get_chat_token(db, chat_id) + chat, url = await get_chat_object(mode) + async for ans in chat.chat_completions(url, + await chat.complex_request_data(chat_request.query, conversation_id, str(user_id), files=files, inputs=inputs), + await chat.get_headers(token)): + # print(ans) + data = {} + status = http_200 + conversation_id = ans.get("conversation_id") + task_id = ans.get("task_id") + if ans.get("event") == message_error: + error = ans.get("message", "鍙傛暟寮傚父锛�") + status = http_400 + event = smart_message_error + elif ans.get("event") == message_agent: + data = {"answer": ans.get("answer", ""), "id": ans.get("message_id", "")} + answer_agent += ans.get("answer", "") + message_id = ans.get("message_id", "") + event = smart_message_stream + elif ans.get("event") == message_event: + data = {"answer": ans.get("answer", ""), "id": ans.get("message_id", "")} + answer_event += ans.get("answer", "") + message_id = ans.get("message_id", "") + event = smart_message_stream + elif ans.get("event") == message_file: + data = {"url": ans.get("url", ""), "id": ans.get("id", ""), + "type": ans.get("type", "")} + files.append(data) + event = smart_message_file + elif ans.get("event") in [workflow_started, node_started, node_finished]: + data = ans.get("data", {}) + data["inputs"] = await data_process(data.get("inputs", {})) + data["outputs"] = await data_process(data.get("outputs", {})) + data["files"] = await data_process(data.get("files", [])) + data["process_data"] = "" + if data.get("status") == "failed": + status = http_500 + error = data.get("error", "") + node_list.append(ans) + event = [smart_workflow_started, smart_node_started, smart_node_finished][ + [workflow_started, node_started, node_finished].index(ans.get("event"))] + elif ans.get("event") == workflow_finished: + data = ans.get("data", {}) + answer_workflow = data.get("outputs", {}).get("output", data.get("outputs", {}).get("answer")) + download_url = data.get("outputs", {}).get("download_url") + event = smart_workflow_finished + if data.get("status") == "failed": + status = http_500 + error = data.get("error", "") + node_list.append(ans) - yield "data: " + json.dumps( - {"event": event, "data": data, "error": error, "status": status, "task_id": task_id, "message_id":message_id, - "parent_id": new_message_id, - "session_id": chat_request.sessionId}, - ensure_ascii=False) + "\n\n" + elif ans.get("event") == message_end: + event = smart_message_end + else: + continue + + yield "data: " + json.dumps( + {"event": event, "data": data, "error": error, "status": status, "task_id": task_id, "message_id":message_id, + "parent_id": new_message_id, + "session_id": chat_request.sessionId}, + ensure_ascii=False) + "\n\n" except Exception as e: logger.error(e) @@ -591,7 +650,7 @@ # "node_list": node_list, "task_id": task_id, "id": message_id, # "error": error}, conversation_id) if message_id: - await add_complex_log(db, message_id, chat_id, chat_request.sessionId, chat_request.chatMode, answer_event or answer_agent or answer_workflow or error, user_id, mode, DF_TYPE, 2, conversation_id, node_data=node_list, query_data=query_data) + await add_complex_log(db, message_id, chat_id, chat_request.sessionId, chat_request.chatMode, answer_event or answer_agent or answer_workflow or answer_dialog or error, user_id, mode, DF_TYPE, 2, conversation_id, node_data=node_list or reference, query_data=query_data) async def service_complex_upload(db, chat_id, file, user_id): files = [] diff --git a/app/service/v2/initialize_data.py b/app/service/v2/initialize_data.py index eacdefa..9fb5792 100644 --- a/app/service/v2/initialize_data.py +++ b/app/service/v2/initialize_data.py @@ -161,8 +161,9 @@ except Exception as e: print(e) db.rollback() - + now_complex_list = [] for agent in complex_list: + now_complex_list.append(agent["id"]) dialog = db.query(ComplexChatModel).filter(ComplexChatModel.id == agent["id"]).first() if dialog: try: @@ -185,6 +186,15 @@ except Exception as e: print(e) db.rollback() + + for i in db.query(ComplexChatModel).filter(ComplexChatModel.status == "1").all(): + if i.id not in now_complex_list: + try: + db.query(ComplexChatModel).filter(ComplexChatModel.id==i.id).update(({"status": "0"})) + db.commit() + except: + ... + async def user_update_app(userid, db): @@ -391,24 +401,28 @@ for i in complex_list: user_token = db.query(ApiTokenModel).filter(ApiTokenModel.app_id == i.id).first() if not user_token: - chat = ChatBaseApply() - url = settings.dify_base_url + DF_CHAT_API_KEY.format(i.id) - access_token = await get_app_token(db, workflow_server) - param = await chat.chat_get(url, {}, await chat.get_headers(access_token)) - if param and param.get("data"): - token = param.get("data", [{}])[0].get("token") - token_id = param.get("data", [{}])[0].get("id") - # dialog.parameters = json.dumps(param) - else: - param = await chat.chat_post(url, {}, await chat.get_headers(access_token)) - if param: - token = param.get("token") - token_id = param.get("id") + try: + chat = ChatBaseApply() + url = settings.dify_base_url + DF_CHAT_API_KEY.format(i.id) + access_token = await get_app_token(db, workflow_server) + param = await chat.chat_get(url, {}, await chat.get_headers(access_token)) + if param and param.get("data"): + token = param.get("data", [{}])[0].get("token") + token_id = param.get("data", [{}])[0].get("id") + # dialog.parameters = json.dumps(param) + else: + param = await chat.chat_post(url, {}, await chat.get_headers(access_token)) + if param: + token = param.get("token") + token_id = param.get("id") - if token: - db.add(ApiTokenModel(id=token_id, app_id=i.id, type="app", token=token)) - db.commit() - print("df_api_token: 鏇存柊鎴愬姛锛�") + if token: + db.add(ApiTokenModel(id=token_id, app_id=i.id, type="app", token=token)) + db.commit() + print("df_api_token: 鏇存柊鎴愬姛锛�") + except Exception as e: + print(e) + except Exception as e: print(e) -- Gitblit v1.8.0