From 72a8a0a1ad6b79b8e9fb2facef121f9b5d584666 Mon Sep 17 00:00:00 2001
From: xuyonghao <898441624@qq.com>
Date: 星期六, 08 二月 2025 10:56:30 +0800
Subject: [PATCH] 报表合并历史记录功能
---
app/api/chat.py | 129 +++++++++++++++++++++++++++++++-----------
1 files changed, 94 insertions(+), 35 deletions(-)
diff --git a/app/api/chat.py b/app/api/chat.py
index eab8801..5914353 100644
--- a/app/api/chat.py
+++ b/app/api/chat.py
@@ -1,6 +1,7 @@
import json
import re
import uuid
+from copy import deepcopy
from fastapi import WebSocket, WebSocketDisconnect, APIRouter, Depends
import asyncio
@@ -423,9 +424,7 @@
print(f"Error process message of ragflow: {e2}")
elif agent.type == "reportWorkflow":
- token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_CLEANING)
- if not token:
- await websocket.send_json({"message": "Invalid token document_to_cleaning", "type": "error"})
+
while True:
receive_message = await websocket.receive_json()
print(f"Received from client {chat_id}: {receive_message}")
@@ -477,7 +476,12 @@
"upload_file_id": ""
})
inputs_list = []
+ is_next = 0
if workflow_type == 1:
+ token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_CLEANING)
+ if not token:
+ await websocket.send_json(
+ {"message": "Invalid token document_to_cleaning", "type": "error"})
inputs["input_files"] = files
inputs["Completion_of_main_indicators"] = title
inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
@@ -500,6 +504,10 @@
{"message": "Invalid token document_to_title", "type": "error"})
inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
elif workflow_type == 3 and is_clean == 1 or tokens >= max_token:
+ token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_CLEANING)
+ if not token:
+ await websocket.send_json(
+ {"message": "Invalid token document_to_cleaning", "type": "error"})
inputs["input_files"] = files
inputs["Completion_of_main_indicators"] = title
inputs_list.append({"inputs": inputs, "token": token, "workflow_type": 1})
@@ -511,13 +519,19 @@
if not token:
await websocket.send_json(
{"message": "Invalid token document_to_report", "type": "error"})
- inputs_list.append({"inputs": inputs, "token": token, "workflow_type": 3})
+ inputs_list.append({"inputs": inputs1, "token": token, "workflow_type": 3})
# print(inputs_list)
- for input in inputs_list:
+ for idx, input in enumerate(inputs_list):
+ # print(input)
+ if idx < len(inputs_list)-1:
+ is_next = 1
+ else:
+ is_next = 0
i = input["inputs"]
if "file_list" in i:
i["file_list"] = files
+ # print(i)
node_list = []
complete_response = ""
workflow_list = []
@@ -537,7 +551,15 @@
try:
data = json.loads(complete_response)
# print(data)
- node_list.append(data)
+ node_data = deepcopy(data)
+ if "data" in node_data:
+ if "outputs" in node_data["data"]:
+ node_data["data"]["outputs"] = {}
+ if "inputs" in node_data["data"]:
+ node_data["data"]["inputs"] = {}
+ # print(node_data)
+ node_list.append(node_data)
+
complete_response = ""
if data.get("event") == "node_started": # "event": "message_end"
@@ -555,7 +577,8 @@
message = answer.get("title", "")
- result = {"message": message, "type": "system"}
+
+ result = {"message": message, "type": "system", "workflow":{"node_data": workflow_list}}
elif data.get("event") == "node_finished":
workflow_list.append({
"title": data.get("data", {}).get("title", ""),
@@ -566,12 +589,24 @@
"elapsed_time":data.get("data", {}).get("elapsed_time", 0),
"error":data.get("data", {}).get("error", ""),
})
+ answer = data.get("data", "")
+ if isinstance(answer, str):
+ logger.error("----------------鏈煡鏁版嵁--------------------")
+ logger.error(data)
+ continue
+ elif isinstance(answer, dict):
+
+ message = answer.get("title", "")
+ if answer.get("status") == "failed":
+ message = answer.get("error", "")
+ result = {"message": message, "type": "system", "workflow":{"node_data": workflow_list}}
+
elif data.get("event") == "workflow_finished":
answer = data.get("data", "")
if isinstance(answer, str):
logger.error("----------------鏈煡鏁版嵁--------------------")
logger.error(data)
- result = {"message": "", "type": "close", "download_url": ""}
+ result = {"message": "", "type": "close", "download_url": "", "is_next": is_next}
elif isinstance(answer, dict):
download_url = ""
outputs = answer.get("outputs", {})
@@ -596,7 +631,7 @@
"error": answer.get("error", ""),
"elapsed_time": answer.get("elapsed_time", 0)
}
- result = {"message": message, "type": "message", "download_url": download_url}
+ result = {"message": message, "type": "message", "download_url": download_url, "workflow":workflow_dict}
try:
SessionService(db).update_session(chat_id,
message={"role": "assistant",
@@ -606,6 +641,7 @@
"download_url": download_url}},
conversation_id=data.get(
"conversation_id"))
+ node_list = []
except Exception as e:
logger.error("淇濆瓨dify鐨勪細璇濆紓甯革紒")
logger.error(e)
@@ -615,7 +651,7 @@
logger.error(e)
logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
- result = {"message": "", "type": "close", "workflow": workflow_dict}
+ result = {"message": "", "type": "close", "workflow": workflow_dict, "is_next": is_next, "download_url": download_url}
else:
@@ -626,6 +662,7 @@
logger.error(e)
logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
complete_response = ""
+
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
# print(f"Response text: {text}")
@@ -637,6 +674,7 @@
image_list = []
# print(inputs)
complete_response = ""
+ answer_str = ""
async for rag_response in dify_service.chat(input["token"], current_user.id, title_query, [],
conversation_id, i):
# print(rag_response)
@@ -651,7 +689,14 @@
complete_response += rag_response
try:
data = json.loads(complete_response)
- node_list.append(data)
+ node_data = deepcopy(data)
+ if "data" in node_data:
+ if "outputs" in node_data["data"]:
+ node_data["data"]["outputs"] = {}
+ if "inputs" in node_data["data"]:
+ node_data["data"]["inputs"] = {}
+ # print(node_data)
+ node_list.append(node_data)
complete_response = ""
if data.get("event") == "node_started": # "event": "message_end"
if "data" not in data or not data["data"]: # 淇℃伅杩囨护
@@ -668,7 +713,7 @@
message = answer.get("title", "")
- result = {"message": message, "type": "system"}
+ result = {"message": message, "type": "system", "workflow":{"node_data": workflow_list}}
elif data.get("event") == "node_finished":
workflow_list.append({
"title": data.get("data", {}).get("title", ""),
@@ -679,45 +724,59 @@
"elapsed_time":data.get("data", {}).get("elapsed_time", 0),
"error":data.get("data", {}).get("error", ""),
})
+
+ answer = data.get("data", "")
+ if isinstance(answer, str):
+ logger.error("----------------鏈煡鏁版嵁--------------------")
+ logger.error(data)
+ continue
+ elif isinstance(answer, dict):
+
+ message = answer.get("title", "")
+ if answer.get("status") == "failed":
+ message = answer.get("error", "")
+ result = {"message": message, "type": "system", "workflow":{"node_data": workflow_list}}
elif data.get("event") == "message":
- message = data.get("answer", "")
+ answer_str = data.get("answer", "")
# try:
# msg_dict = json.loads(answer)
# message = msg_dict.get("output", "")
# except Exception as e:
# print(e)
# continue
- result = {"message": message, "type": "message",
- "download_url": ""}
- try:
- SessionService(db).update_session(chat_id,
- message={"role": "assistant",
- "content": {
- "answer": message,
- "node_list": node_list,
- "download_url": ""}},
- conversation_id=data.get(
- "conversation_id"))
- except Exception as e:
- logger.error("淇濆瓨dify鐨勪細璇濆紓甯革紒")
- logger.error(e)
+ result = {"message": answer_str, "type": "message",
+ "download_url": "", "workflow": {"node_data": workflow_list}}
+
# try:
# await websocket.send_json(result)
# except Exception as e:
# logger.error(e)
# logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
- elif data.get("event") == "workflow_finished":
+ elif data.get("event") == "workflow_finished":
workflow_dict = {
"node_data": workflow_list,
- "total_tokens": answer.get("total_tokens", 0),
- "created_at": answer.get("created_at", 0),
- "finished_at": answer.get("finished_at", 0),
- "status": answer.get("status", ""),
- "error": answer.get("error", ""),
- "elapsed_time": answer.get("elapsed_time", 0)
+ "total_tokens": data.get("data", {}).get("total_tokens", 0),
+ "created_at": data.get("data", {}).get("created_at", 0),
+ "finished_at": data.get("data", {}).get("finished_at", 0),
+ "status": data.get("data", {}).get("status", ""),
+ "error": data.get("data", {}).get("error", ""),
+ "elapsed_time": data.get("data", {}).get("elapsed_time", 0)
}
+ try:
+ SessionService(db).update_session(chat_id,
+ message={"role": "assistant",
+ "content": {
+ "answer": answer_str,
+ "node_list": node_list,
+ "download_url": ""}},
+ conversation_id=data.get(
+ "conversation_id"))
+ node_list = []
+ except Exception as e:
+ logger.error("淇濆瓨dify鐨勪細璇濆紓甯革紒")
+ logger.error(e)
elif data.get("event") == "message_end":
- result = {"message": "", "type": "close", "workflow": workflow_dict}
+ result = {"message": "", "type": "close", "workflow": workflow_dict, "is_next": is_next}
else:
continue
try:
--
Gitblit v1.8.0