From b7ad7ae90cd832281afe6d743776ad87f1e56e07 Mon Sep 17 00:00:00 2001
From: zhaoqingang <zhaoqg0118@163.com>
Date: 星期五, 03 一月 2025 16:55:47 +0800
Subject: [PATCH] 工作流日志
---
app/api/chat.py | 385 +++++++++++++++++++++++++++++++++---------------------
1 files changed, 237 insertions(+), 148 deletions(-)
diff --git a/app/api/chat.py b/app/api/chat.py
index 3c8e9c7..eab8801 100644
--- a/app/api/chat.py
+++ b/app/api/chat.py
@@ -436,6 +436,10 @@
title_number = receive_message.get('title_number', 8)
title_style = receive_message.get('title_style', "")
title_query = receive_message.get('title_query', "")
+ is_clean = receive_message.get('is_clean', 0)
+ file_type = receive_message.get('file_type', 1)
+ max_token = receive_message.get('max_tokens', 100000)
+ tokens = receive_message.get('tokens', 0)
if upload_files:
title_query = "start"
# if not upload_files:
@@ -447,7 +451,9 @@
title if title else title_query,
agent_id,
AgentType.DIFY,
- current_user.id
+ current_user.id,
+ {"role": "user", "content": title if title else title_query, "type": workflow_type, "is_clean":is_clean},
+ workflow_type
)
conversation_id = session.conversation_id
except Exception as e:
@@ -456,14 +462,25 @@
}
files = []
for file in upload_files:
- files.append({
- "type": "document",
- "transfer_method": "local_file",
- "url": "",
- "upload_file_id": file
- })
+ if file_type == 1:
+ files.append({
+ "type": "document",
+ "transfer_method": "local_file",
+ "url": "",
+ "upload_file_id": file
+ })
+ else:
+ files.append({
+ "type": "document",
+ "transfer_method": "remote_url",
+ "url": file,
+ "upload_file_id": ""
+ })
+ inputs_list = []
if workflow_type == 1:
inputs["input_files"] = files
+ inputs["Completion_of_main_indicators"] = title
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
elif workflow_type == 2:
inputs["file_list"] = files
inputs["Completion_of_main_indicators"] = title
@@ -472,7 +489,8 @@
if not token:
await websocket.send_json(
{"message": "Invalid token document_to_report", "type": "error"})
- elif workflow_type == 3:
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
+ elif workflow_type == 3 and is_clean == 0 and tokens < max_token:
inputs["file_list"] = files
inputs["number_of_title"] = title_number
inputs["title_style"] = title_style
@@ -480,171 +498,242 @@
if not token:
await websocket.send_json(
{"message": "Invalid token document_to_title", "type": "error"})
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
+ elif workflow_type == 3 and is_clean == 1 or tokens >= max_token:
+ inputs["input_files"] = files
+ inputs["Completion_of_main_indicators"] = title
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": 1})
+ inputs1 = {}
+ inputs1["file_list"] = files
+ inputs1["number_of_title"] = title_number
+ inputs1["title_style"] = title_style
+ token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_TITLE)
+ if not token:
+ await websocket.send_json(
+ {"message": "Invalid token document_to_report", "type": "error"})
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": 3})
- complete_response = ""
- if workflow_type == 1 or workflow_type == 2:
- async for rag_response in dify_service.workflow(token, current_user.id, inputs):
- # print(rag_response)
- try:
- if rag_response[:5] == "data:":
- # 濡傛灉鏄紝鍒欐埅鍙栨帀鍓�5涓瓧绗︼紝骞跺幓闄ら灏剧┖鐧界
- complete_response = rag_response[5:].strip()
- elif "event: ping" in rag_response:
- continue
- else:
- # 鍚﹀垯锛屼繚鎸佸師鏍�
- complete_response += rag_response
+ # print(inputs_list)
+ for input in inputs_list:
+ i = input["inputs"]
+ if "file_list" in i:
+ i["file_list"] = files
+ node_list = []
+ complete_response = ""
+ workflow_list = []
+ workflow_dict = {}
+ if input["workflow_type"] == 1 or input["workflow_type"] == 2:
+ async for rag_response in dify_service.workflow(input["token"], current_user.id, i):
+ # print(rag_response)
try:
- data = json.loads(complete_response)
- complete_response = ""
- if data.get("event") == "node_started" or data.get("event") == "node_finished": # "event": "message_end"
- if "data" not in data or not data["data"]: # 淇℃伅杩囨护
- logger.error("闈炴硶鏁版嵁--------------------")
- logger.error(data)
- continue
- else: # 姝e父杈撳嚭
+ if rag_response[:5] == "data:":
+ # 濡傛灉鏄紝鍒欐埅鍙栨帀鍓�5涓瓧绗︼紝骞跺幓闄ら灏剧┖鐧界
+ complete_response = rag_response[5:].strip()
+ elif "event: ping" in rag_response:
+ continue
+ else:
+ # 鍚﹀垯锛屼繚鎸佸師鏍�
+ complete_response += rag_response
+ try:
+ data = json.loads(complete_response)
+ # print(data)
+ node_list.append(data)
+ complete_response = ""
+ if data.get("event") == "node_started": # "event": "message_end"
+
+ if "data" not in data or not data["data"]: # 淇℃伅杩囨护
+ logger.error("闈炴硶鏁版嵁--------------------")
+ logger.error(data)
+ continue
+ else: # 姝e父杈撳嚭
+ answer = data.get("data", "")
+ if isinstance(answer, str):
+ logger.error("----------------鏈煡鏁版嵁--------------------")
+ logger.error(data)
+ continue
+ elif isinstance(answer, dict):
+
+ message = answer.get("title", "")
+
+ result = {"message": message, "type": "system"}
+ elif data.get("event") == "node_finished":
+ workflow_list.append({
+ "title": data.get("data", {}).get("title", ""),
+ "status": data.get("data", {}).get("status", ""),
+ "created_at":data.get("data", {}).get("created_at", 0),
+ "finished_at":data.get("data", {}).get("finished_at", 0),
+ "node_type":data.get("data", {}).get("node_type", 0),
+ "elapsed_time":data.get("data", {}).get("elapsed_time", 0),
+ "error":data.get("data", {}).get("error", ""),
+ })
+ elif data.get("event") == "workflow_finished":
answer = data.get("data", "")
if isinstance(answer, str):
logger.error("----------------鏈煡鏁版嵁--------------------")
logger.error(data)
- continue
+ result = {"message": "", "type": "close", "download_url": ""}
elif isinstance(answer, dict):
+ download_url = ""
+ outputs = answer.get("outputs", {})
+ if outputs:
+ message = outputs.get("output", "")
+ download_url = outputs.get("download_url", "")
+ else:
+ message = answer.get("error", "")
+ if download_url:
+ files = [{
+ "type": "document",
+ "transfer_method": "remote_url",
+ "url": download_url,
+ "upload_file_id": ""
+ }]
+ workflow_dict = {
+ "node_data": workflow_list,
+ "total_tokens": answer.get("total_tokens", 0),
+ "created_at": answer.get("created_at", 0),
+ "finished_at": answer.get("finished_at", 0),
+ "status": answer.get("status", ""),
+ "error": answer.get("error", ""),
+ "elapsed_time": answer.get("elapsed_time", 0)
+ }
+ result = {"message": message, "type": "message", "download_url": download_url}
+ try:
+ SessionService(db).update_session(chat_id,
+ message={"role": "assistant",
+ "content": {
+ "answer": message,
+ "node_list": node_list,
+ "download_url": download_url}},
+ conversation_id=data.get(
+ "conversation_id"))
+ except Exception as e:
+ logger.error("淇濆瓨dify鐨勪細璇濆紓甯革紒")
+ logger.error(e)
+ try:
+ await websocket.send_json(result)
+ except Exception as e:
+ logger.error(e)
+ logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
- message = answer.get("title", "")
+ result = {"message": "", "type": "close", "workflow": workflow_dict}
- result = {"message": message, "type": "system"}
- elif data.get("event") == "workflow_finished":
- answer = data.get("data", "")
- if isinstance(answer, str):
- logger.error("----------------鏈煡鏁版嵁--------------------")
- logger.error(data)
- result = {"message": "", "type": "close", "download_url": ""}
- elif isinstance(answer, dict):
- download_url = ""
- outputs = answer.get("outputs", {})
- if outputs:
- message = outputs.get("output", "")
- download_url = outputs.get("download_url", "")
- else:
- message = answer.get("error", "")
- result = {"message": message, "type": "message", "download_url": download_url}
+ else:
+ continue
+ try:
+ await websocket.send_json(result)
+ except Exception as e:
+ logger.error(e)
+ logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
+ complete_response = ""
+ except json.JSONDecodeError as e:
+ print(f"Error decoding JSON: {e}")
+ # print(f"Response text: {text}")
+ except Exception as e2:
+ result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
+ await websocket.send_json(result)
+ print(f"Error process message of ragflow: {e2}")
+ elif input["workflow_type"] == 3:
+ image_list = []
+ # print(inputs)
+ complete_response = ""
+ async for rag_response in dify_service.chat(input["token"], current_user.id, title_query, [],
+ conversation_id, i):
+ # print(rag_response)
+ try:
+ if rag_response[:5] == "data:":
+ # 濡傛灉鏄紝鍒欐埅鍙栨帀鍓�5涓瓧绗︼紝骞跺幓闄ら灏剧┖鐧界
+ complete_response = rag_response[5:].strip()
+ elif "event: ping" in rag_response:
+ continue
+ else:
+ # 鍚﹀垯锛屼繚鎸佸師鏍�
+ complete_response += rag_response
+ try:
+ data = json.loads(complete_response)
+ node_list.append(data)
+ complete_response = ""
+ if data.get("event") == "node_started": # "event": "message_end"
+ if "data" not in data or not data["data"]: # 淇℃伅杩囨护
+ logger.error("闈炴硶鏁版嵁--------------------")
+ logger.error(data)
+ continue
+ else: # 姝e父杈撳嚭
+ answer = data.get("data", "")
+ if isinstance(answer, str):
+ logger.error("----------------鏈煡鏁版嵁--------------------")
+ logger.error(data)
+ continue
+ elif isinstance(answer, dict):
+
+ message = answer.get("title", "")
+
+ result = {"message": message, "type": "system"}
+ elif data.get("event") == "node_finished":
+ workflow_list.append({
+ "title": data.get("data", {}).get("title", ""),
+ "status": data.get("data", {}).get("status", ""),
+ "created_at":data.get("data", {}).get("created_at", 0),
+ "finished_at":data.get("data", {}).get("finished_at", 0),
+ "node_type":data.get("data", {}).get("node_type", 0),
+ "elapsed_time":data.get("data", {}).get("elapsed_time", 0),
+ "error":data.get("data", {}).get("error", ""),
+ })
+ elif data.get("event") == "message":
+ message = data.get("answer", "")
+ # try:
+ # msg_dict = json.loads(answer)
+ # message = msg_dict.get("output", "")
+ # except Exception as e:
+ # print(e)
+ # continue
+ result = {"message": message, "type": "message",
+ "download_url": ""}
try:
SessionService(db).update_session(chat_id,
message={"role": "assistant",
"content": {
"answer": message,
- "download_url": download_url}},
+ "node_list": node_list,
+ "download_url": ""}},
conversation_id=data.get(
"conversation_id"))
except Exception as e:
logger.error("淇濆瓨dify鐨勪細璇濆紓甯革紒")
logger.error(e)
- try:
- await websocket.send_json(result)
- except Exception as e:
- logger.error(e)
- logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
- result = {"message": "", "type": "close", "download_url": ""}
-
-
- else:
- continue
- try:
- await websocket.send_json(result)
- except Exception as e:
- logger.error(e)
- logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
- complete_response = ""
- except json.JSONDecodeError as e:
- print(f"Error decoding JSON: {e}")
- # print(f"Response text: {text}")
- except Exception as e2:
- result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
- await websocket.send_json(result)
- print(f"Error process message of ragflow: {e2}")
- elif workflow_type == 3:
- image_list = []
- # print(inputs)
- complete_response = ""
- async for rag_response in dify_service.chat(token, current_user.id, title_query, [],
- conversation_id, inputs):
- print(rag_response)
- try:
- if rag_response[:5] == "data:":
- # 濡傛灉鏄紝鍒欐埅鍙栨帀鍓�5涓瓧绗︼紝骞跺幓闄ら灏剧┖鐧界
- complete_response = rag_response[5:].strip()
- elif "event: ping" in rag_response:
- continue
- else:
- # 鍚﹀垯锛屼繚鎸佸師鏍�
- complete_response += rag_response
- try:
- data = json.loads(complete_response)
- complete_response = ""
- if data.get("event") == "node_started" or data.get(
- "event") == "node_finished": # "event": "message_end"
- if "data" not in data or not data["data"]: # 淇℃伅杩囨护
- logger.error("闈炴硶鏁版嵁--------------------")
- logger.error(data)
+ # try:
+ # await websocket.send_json(result)
+ # except Exception as e:
+ # logger.error(e)
+ # logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
+ elif data.get("event") == "workflow_finished":
+ workflow_dict = {
+ "node_data": workflow_list,
+ "total_tokens": answer.get("total_tokens", 0),
+ "created_at": answer.get("created_at", 0),
+ "finished_at": answer.get("finished_at", 0),
+ "status": answer.get("status", ""),
+ "error": answer.get("error", ""),
+ "elapsed_time": answer.get("elapsed_time", 0)
+ }
+ elif data.get("event") == "message_end":
+ result = {"message": "", "type": "close", "workflow": workflow_dict}
+ else:
continue
- else: # 姝e父杈撳嚭
- answer = data.get("data", "")
- if isinstance(answer, str):
- logger.error("----------------鏈煡鏁版嵁--------------------")
- logger.error(data)
- continue
- elif isinstance(answer, dict):
-
- message = answer.get("title", "")
-
- result = {"message": message, "type": "system"}
- elif data.get("event") == "message":
- message = data.get("answer", "")
- # try:
- # msg_dict = json.loads(answer)
- # message = msg_dict.get("output", "")
- # except Exception as e:
- # print(e)
- # continue
- result = {"message": message, "type": "message",
- "download_url": ""}
try:
- SessionService(db).update_session(chat_id,
- message={"role": "assistant",
- "content": {
- "answer": message,
- "download_url": ""}},
- conversation_id=data.get(
- "conversation_id"))
+ await websocket.send_json(result)
except Exception as e:
- logger.error("淇濆瓨dify鐨勪細璇濆紓甯革紒")
logger.error(e)
- # try:
- # await websocket.send_json(result)
- # except Exception as e:
- # logger.error(e)
- # logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
-
- elif data.get("event") == "message_end":
- result = {"message": "", "type": "close", "download_url": ""}
- else:
- continue
- try:
- await websocket.send_json(result)
- except Exception as e:
- logger.error(e)
- logger.error("dify杩斿洖瀹㈡埛绔秷鎭紓甯�!")
- complete_response = ""
- except json.JSONDecodeError as e:
- print(f"Error decoding JSON: {e}")
- # print(f"Response text: {text}")
- except Exception as e2:
- result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
- await websocket.send_json(result)
- print(f"Error process message of ragflow: {e2}")
+ logger.error("dify杩斿洖瀹㈡埛绔秷鎭紓甯�!")
+ complete_response = ""
+ except json.JSONDecodeError as e:
+ print(f"Error decoding JSON: {e}")
+ # print(f"Response text: {text}")
+ except Exception as e2:
+ result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
+ await websocket.send_json(result)
+ print(f"Error process message of ragflow: {e2}")
elif agent.type == "documentIa":
- print(122112)
token = DfTokenDao(db).get_token_by_id(DOCUMENT_IA_QUESTIONS)
# print(token)
if not token:
--
Gitblit v1.8.0