From b7ad7ae90cd832281afe6d743776ad87f1e56e07 Mon Sep 17 00:00:00 2001
From: zhaoqingang <zhaoqg0118@163.com>
Date: 星期五, 03 一月 2025 16:55:47 +0800
Subject: [PATCH] 工作流日志
---
app/api/chat.py | 247 +++++++++++++++++++++++++++++--------------------
1 files changed, 147 insertions(+), 100 deletions(-)
diff --git a/app/api/chat.py b/app/api/chat.py
index 2993b2a..eab8801 100644
--- a/app/api/chat.py
+++ b/app/api/chat.py
@@ -451,7 +451,9 @@
title if title else title_query,
agent_id,
AgentType.DIFY,
- current_user.id
+ current_user.id,
+ {"role": "user", "content": title if title else title_query, "type": workflow_type, "is_clean":is_clean},
+ workflow_type
)
conversation_id = session.conversation_id
except Exception as e:
@@ -475,12 +477,11 @@
"upload_file_id": ""
})
inputs_list = []
- token_list = []
if workflow_type == 1:
inputs["input_files"] = files
- inputs_list.append(inputs)
- token_list.append(token)
- elif workflow_type == 2 and is_clean == 0:
+ inputs["Completion_of_main_indicators"] = title
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
+ elif workflow_type == 2:
inputs["file_list"] = files
inputs["Completion_of_main_indicators"] = title
inputs["sub_titles"] = sub_titles
@@ -488,9 +489,8 @@
if not token:
await websocket.send_json(
{"message": "Invalid token document_to_report", "type": "error"})
- inputs_list.append(inputs)
- token_list.append(token)
- elif workflow_type == 3:
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
+ elif workflow_type == 3 and is_clean == 0 and tokens < max_token:
inputs["file_list"] = files
inputs["number_of_title"] = title_number
inputs["title_style"] = title_style
@@ -498,28 +498,32 @@
if not token:
await websocket.send_json(
{"message": "Invalid token document_to_title", "type": "error"})
- # inputs_list.append(inputs)
- # token_list.append(token)
- elif workflow_type == 2 and is_clean == 1:
- # inputs["input_files"] = files
- inputs_list.append(inputs)
- token_list.append(token)
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
+ elif workflow_type == 3 and is_clean == 1 or tokens >= max_token:
+ inputs["input_files"] = files
+ inputs["Completion_of_main_indicators"] = title
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": 1})
inputs1 = {}
- # inputs1["file_list"] = files
- inputs1["Completion_of_main_indicators"] = title
- inputs1["sub_titles"] = sub_titles
- token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_REPORT_TITLE)
+ inputs1["file_list"] = files
+ inputs1["number_of_title"] = title_number
+ inputs1["title_style"] = title_style
+ token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_TITLE)
if not token:
await websocket.send_json(
{"message": "Invalid token document_to_report", "type": "error"})
- inputs_list.append(inputs1)
- token_list.append(token)
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": 3})
- complete_response = ""
- if workflow_type == 1 or workflow_type == 2:
- for inputs in inputs_list:
- inputs["input_files"] = files
- async for rag_response in dify_service.workflow(token, current_user.id, inputs):
+ # print(inputs_list)
+ for input in inputs_list:
+ i = input["inputs"]
+ if "file_list" in i:
+ i["file_list"] = files
+ node_list = []
+ complete_response = ""
+ workflow_list = []
+ workflow_dict = {}
+ if input["workflow_type"] == 1 or input["workflow_type"] == 2:
+ async for rag_response in dify_service.workflow(input["token"], current_user.id, i):
# print(rag_response)
try:
if rag_response[:5] == "data:":
@@ -532,8 +536,11 @@
complete_response += rag_response
try:
data = json.loads(complete_response)
+ # print(data)
+ node_list.append(data)
complete_response = ""
- if data.get("event") == "node_started" or data.get("event") == "node_finished": # "event": "message_end"
+ if data.get("event") == "node_started": # "event": "message_end"
+
if "data" not in data or not data["data"]: # 淇℃伅杩囨护
logger.error("闈炴硶鏁版嵁--------------------")
logger.error(data)
@@ -549,6 +556,16 @@
message = answer.get("title", "")
result = {"message": message, "type": "system"}
+ elif data.get("event") == "node_finished":
+ workflow_list.append({
+ "title": data.get("data", {}).get("title", ""),
+ "status": data.get("data", {}).get("status", ""),
+ "created_at":data.get("data", {}).get("created_at", 0),
+ "finished_at":data.get("data", {}).get("finished_at", 0),
+ "node_type":data.get("data", {}).get("node_type", 0),
+ "elapsed_time":data.get("data", {}).get("elapsed_time", 0),
+ "error":data.get("data", {}).get("error", ""),
+ })
elif data.get("event") == "workflow_finished":
answer = data.get("data", "")
if isinstance(answer, str):
@@ -570,12 +587,22 @@
"url": download_url,
"upload_file_id": ""
}]
+ workflow_dict = {
+ "node_data": workflow_list,
+ "total_tokens": answer.get("total_tokens", 0),
+ "created_at": answer.get("created_at", 0),
+ "finished_at": answer.get("finished_at", 0),
+ "status": answer.get("status", ""),
+ "error": answer.get("error", ""),
+ "elapsed_time": answer.get("elapsed_time", 0)
+ }
result = {"message": message, "type": "message", "download_url": download_url}
try:
SessionService(db).update_session(chat_id,
message={"role": "assistant",
"content": {
"answer": message,
+ "node_list": node_list,
"download_url": download_url}},
conversation_id=data.get(
"conversation_id"))
@@ -587,7 +614,8 @@
except Exception as e:
logger.error(e)
logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
- result = {"message": "", "type": "close", "download_url": ""}
+
+ result = {"message": "", "type": "close", "workflow": workflow_dict}
else:
@@ -605,88 +633,107 @@
result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
await websocket.send_json(result)
print(f"Error process message of ragflow: {e2}")
- elif workflow_type == 3:
- image_list = []
- # print(inputs)
- complete_response = ""
- async for rag_response in dify_service.chat(token, current_user.id, title_query, [],
- conversation_id, inputs):
- print(rag_response)
- try:
- if rag_response[:5] == "data:":
- # 濡傛灉鏄紝鍒欐埅鍙栨帀鍓�5涓瓧绗︼紝骞跺幓闄ら灏剧┖鐧界
- complete_response = rag_response[5:].strip()
- elif "event: ping" in rag_response:
- continue
- else:
- # 鍚﹀垯锛屼繚鎸佸師鏍�
- complete_response += rag_response
+ elif input["workflow_type"] == 3:
+ image_list = []
+ # print(inputs)
+ complete_response = ""
+ async for rag_response in dify_service.chat(input["token"], current_user.id, title_query, [],
+ conversation_id, i):
+ # print(rag_response)
try:
- data = json.loads(complete_response)
- complete_response = ""
- if data.get("event") == "node_started" or data.get(
- "event") == "node_finished": # "event": "message_end"
- if "data" not in data or not data["data"]: # 淇℃伅杩囨护
- logger.error("闈炴硶鏁版嵁--------------------")
- logger.error(data)
- continue
- else: # 姝e父杈撳嚭
- answer = data.get("data", "")
- if isinstance(answer, str):
- logger.error("----------------鏈煡鏁版嵁--------------------")
+ if rag_response[:5] == "data:":
+ # 濡傛灉鏄紝鍒欐埅鍙栨帀鍓�5涓瓧绗︼紝骞跺幓闄ら灏剧┖鐧界
+ complete_response = rag_response[5:].strip()
+ elif "event: ping" in rag_response:
+ continue
+ else:
+ # 鍚﹀垯锛屼繚鎸佸師鏍�
+ complete_response += rag_response
+ try:
+ data = json.loads(complete_response)
+ node_list.append(data)
+ complete_response = ""
+ if data.get("event") == "node_started": # "event": "message_end"
+ if "data" not in data or not data["data"]: # 淇℃伅杩囨护
+ logger.error("闈炴硶鏁版嵁--------------------")
logger.error(data)
continue
- elif isinstance(answer, dict):
+ else: # 姝e父杈撳嚭
+ answer = data.get("data", "")
+ if isinstance(answer, str):
+ logger.error("----------------鏈煡鏁版嵁--------------------")
+ logger.error(data)
+ continue
+ elif isinstance(answer, dict):
- message = answer.get("title", "")
+ message = answer.get("title", "")
- result = {"message": message, "type": "system"}
- elif data.get("event") == "message":
- message = data.get("answer", "")
- # try:
- # msg_dict = json.loads(answer)
- # message = msg_dict.get("output", "")
- # except Exception as e:
- # print(e)
- # continue
- result = {"message": message, "type": "message",
- "download_url": ""}
+ result = {"message": message, "type": "system"}
+ elif data.get("event") == "node_finished":
+ workflow_list.append({
+ "title": data.get("data", {}).get("title", ""),
+ "status": data.get("data", {}).get("status", ""),
+ "created_at":data.get("data", {}).get("created_at", 0),
+ "finished_at":data.get("data", {}).get("finished_at", 0),
+ "node_type":data.get("data", {}).get("node_type", 0),
+ "elapsed_time":data.get("data", {}).get("elapsed_time", 0),
+ "error":data.get("data", {}).get("error", ""),
+ })
+ elif data.get("event") == "message":
+ message = data.get("answer", "")
+ # try:
+ # msg_dict = json.loads(answer)
+ # message = msg_dict.get("output", "")
+ # except Exception as e:
+ # print(e)
+ # continue
+ result = {"message": message, "type": "message",
+ "download_url": ""}
+ try:
+ SessionService(db).update_session(chat_id,
+ message={"role": "assistant",
+ "content": {
+ "answer": message,
+ "node_list": node_list,
+ "download_url": ""}},
+ conversation_id=data.get(
+ "conversation_id"))
+ except Exception as e:
+ logger.error("淇濆瓨dify鐨勪細璇濆紓甯革紒")
+ logger.error(e)
+ # try:
+ # await websocket.send_json(result)
+ # except Exception as e:
+ # logger.error(e)
+ # logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
+ elif data.get("event") == "workflow_finished":
+ workflow_dict = {
+ "node_data": workflow_list,
+ "total_tokens": answer.get("total_tokens", 0),
+ "created_at": answer.get("created_at", 0),
+ "finished_at": answer.get("finished_at", 0),
+ "status": answer.get("status", ""),
+ "error": answer.get("error", ""),
+ "elapsed_time": answer.get("elapsed_time", 0)
+ }
+ elif data.get("event") == "message_end":
+ result = {"message": "", "type": "close", "workflow": workflow_dict}
+ else:
+ continue
try:
- SessionService(db).update_session(chat_id,
- message={"role": "assistant",
- "content": {
- "answer": message,
- "download_url": ""}},
- conversation_id=data.get(
- "conversation_id"))
+ await websocket.send_json(result)
except Exception as e:
- logger.error("淇濆瓨dify鐨勪細璇濆紓甯革紒")
logger.error(e)
- # try:
- # await websocket.send_json(result)
- # except Exception as e:
- # logger.error(e)
- # logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
-
- elif data.get("event") == "message_end":
- result = {"message": "", "type": "close", "download_url": ""}
- else:
- continue
- try:
- await websocket.send_json(result)
- except Exception as e:
- logger.error(e)
- logger.error("dify杩斿洖瀹㈡埛绔秷鎭紓甯�!")
- complete_response = ""
- except json.JSONDecodeError as e:
- print(f"Error decoding JSON: {e}")
- # print(f"Response text: {text}")
- except Exception as e2:
- result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
- await websocket.send_json(result)
- print(f"Error process message of ragflow: {e2}")
+ logger.error("dify杩斿洖瀹㈡埛绔秷鎭紓甯�!")
+ complete_response = ""
+ except json.JSONDecodeError as e:
+ print(f"Error decoding JSON: {e}")
+ # print(f"Response text: {text}")
+ except Exception as e2:
+ result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
+ await websocket.send_json(result)
+ print(f"Error process message of ragflow: {e2}")
elif agent.type == "documentIa":
- print(122112)
token = DfTokenDao(db).get_token_by_id(DOCUMENT_IA_QUESTIONS)
# print(token)
if not token:
--
Gitblit v1.8.0