From 2bed9e0f6fc8fd1971cc6861bc3f731534c021ae Mon Sep 17 00:00:00 2001
From: zhaoqingang <zhaoqg0118@163.com>
Date: 星期二, 11 二月 2025 11:16:42 +0800
Subject: [PATCH] tmp
---
app/models/session_model.py | 2
app/api/chat.py | 649 +++++++++++++++++++++++++++--------------------------
app/service/session.py | 9
app/api/agent.py | 31 ++
app/config/env_conf/menu_conf.json | 19 -
5 files changed, 366 insertions(+), 344 deletions(-)
diff --git a/app/api/agent.py b/app/api/agent.py
index 0fb5f10..fd31c96 100644
--- a/app/api/agent.py
+++ b/app/api/agent.py
@@ -233,6 +233,7 @@
tmp_data["question"] = i.get("content")
elif i.get("role") == "assistant":
if isinstance(i.get("content"), dict):
+ content = i.get("content", {})
tmp_data["answer"] = i.get("content", {}).get("answer")
if "file_name" in i.get("content", {}):
tmp_data["files"] = [{"file_name": i.get("content", {}).get("file_name"),
@@ -242,6 +243,36 @@
if "download_url" in i.get("content", {}):
tmp_data["download_url"] = i.get("content", {}).get("download_url")
+ if "node_list" in content:
+ node_dict = {
+ "node_data": [],
+ # {"title": "鍘婚櫎鍐椾綑", # 鑺傜偣鍚嶇О "status": "succeeded", # 鑺傜偣鐘舵��"created_at": 1735817337, # 寮�濮嬫椂闂�"finished_at": 1735817337, # 缁撴潫鏃堕棿"error": "" # 閿欒鏃ュ織}
+ "total_tokens": 0, # 鑺辫垂token鏁�
+ "created_at": 0, # 寮�濮嬫椂闂�
+ "finished_at": 0, # 缁撴潫鏃堕棿
+ "elapsed_time": 0, # 缁撴潫鏃堕棿
+ "status": "succeeded", # 宸ヤ綔娴佺姸鎬�
+ "error": "", # 閿欒鏃ュ織
+ }
+ for node in content["node_list"]:
+ if node.get("event") == "node_finished":
+ node_dict["node_data"].append({
+ "title": node.get("data", {}).get("title", ""),
+ "status": node.get("data", {}).get("status", ""),
+ "created_at": node.get("data", {}).get("created_at", 0),
+ "finished_at": node.get("data", {}).get("finished_at", 0),
+ "node_type": node.get("data", {}).get("node_type", 0),
+ "elapsed_time": node.get("data", {}).get("elapsed_time", 0),
+ "error": node.get("data", {}).get("error", ""),
+ })
+ elif node.get("event") == "workflow_finished":
+ node_dict["total_tokens"] = node.get("data", {}).get("total_tokens", 0)
+ node_dict["created_at"] = node.get("data", {}).get("created_at", 0)
+ node_dict["finished_at"] = node.get("data", {}).get("finished_at", 0)
+ node_dict["status"] = node.get("data", {}).get("status", "")
+ node_dict["error"] = node.get("data", {}).get("error", "")
+ node_dict["elapsed_time"] = node.get("data", {}).get("elapsed_time", 0)
+ tmp_data["workflow"] = node_dict
else:
tmp_data["answer"] = i.get("content")
data.append(tmp_data)
diff --git a/app/api/chat.py b/app/api/chat.py
index 2241161..4d480bd 100644
--- a/app/api/chat.py
+++ b/app/api/chat.py
@@ -1,6 +1,6 @@
import json
import re
-import uuid
+from copy import deepcopy
from fastapi import WebSocket, WebSocketDisconnect, APIRouter, Depends
import asyncio
@@ -230,7 +230,8 @@
question,
agent_id,
AgentType.BASIC,
- current_user.id
+ current_user.id,
+ {"role": "user", "content": question}
)
except Exception as e:
logger.error(e)
@@ -344,14 +345,13 @@
question,
agent_id,
AgentType.DIFY,
- current_user.id
+ current_user.id,
+ {"role": "user", "content": question}
)
conversation_id = session.conversation_id
except Exception as e:
logger.error(e)
# complete_response = ""
-
- answer_str = ""
files = []
if upload_file_id:
files.append({
@@ -360,6 +360,7 @@
"url": "",
"upload_file_id": upload_file_id
})
+ answer_str = ""
async for rag_response in dify_service.chat(token, current_user.id, question, files,
conversation_id, {}):
# print(rag_response)
@@ -430,28 +431,35 @@
print(f"Error process message of ragflow: {e2}")
elif chat_type == "reportWorkflow":
- token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_CLEANING)
- if not token:
- await websocket.send_json({"message": "Invalid token document_to_cleaning", "type": "error"})
while True:
receive_message = await websocket.receive_json()
print(f"Received from client {chat_id}: {receive_message}")
upload_files = receive_message.get('upload_files', [])
title = receive_message.get('title', "")
- workflow_type = receive_message.get('workflow', 1)
sub_titles = receive_message.get('sub_titles', "")
+ workflow_type = receive_message.get('workflow', 1)
title_number = receive_message.get('title_number', 8)
title_style = receive_message.get('title_style', "")
title_query = receive_message.get('title_query', "")
+ is_clean = receive_message.get('is_clean', 0)
+ file_type = receive_message.get('file_type', 1)
+ max_token = receive_message.get('max_tokens', 100000)
+ tokens = receive_message.get('tokens', 0)
if upload_files:
title_query = "start"
+ # if not upload_files:
+ # await websocket.send_json({"message": "Invalid request", "type": "error"})
+ # continue
try:
session = SessionService(db).create_session(
chat_id,
- title,
+ title if title else title_query,
agent_id,
AgentType.DIFY,
- current_user.id
+ current_user.id,
+ {"role": "user", "content": title if title else title_query, "type": workflow_type,
+ "is_clean": is_clean},
+ workflow_type
)
conversation_id = session.conversation_id
except Exception as e:
@@ -460,23 +468,40 @@
}
files = []
for file in upload_files:
- files.append({
- "type": "document",
- "transfer_method": "local_file",
- "url": "",
- "upload_file_id": file
- })
+ if file_type == 1:
+ files.append({
+ "type": "document",
+ "transfer_method": "local_file",
+ "url": "",
+ "upload_file_id": file
+ })
+ else:
+ files.append({
+ "type": "document",
+ "transfer_method": "remote_url",
+ "url": file,
+ "upload_file_id": ""
+ })
+ inputs_list = []
+ is_next = 0
if workflow_type == 1:
+ token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_CLEANING)
+ if not token:
+ await websocket.send_json(
+ {"message": "Invalid token document_to_cleaning", "type": "error"})
inputs["input_files"] = files
- if workflow_type == 2:
+ inputs["Completion_of_main_indicators"] = title
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
+ elif workflow_type == 2:
inputs["file_list"] = files
inputs["Completion_of_main_indicators"] = title
inputs["sub_titles"] = sub_titles
token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_REPORT_TITLE)
if not token:
await websocket.send_json(
- {"message": "Invalid token document_to_cleaning", "type": "error"})
- elif workflow_type == 3:
+ {"message": "Invalid token document_to_report", "type": "error"})
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
+ elif workflow_type == 3 and is_clean == 0 and tokens < max_token:
inputs["file_list"] = files
inputs["number_of_title"] = title_number
inputs["title_style"] = title_style
@@ -484,29 +509,93 @@
if not token:
await websocket.send_json(
{"message": "Invalid token document_to_title", "type": "error"})
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
+ elif workflow_type == 3 and is_clean == 1 or tokens >= max_token:
+ token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_CLEANING)
+ if not token:
+ await websocket.send_json(
+ {"message": "Invalid token document_to_cleaning", "type": "error"})
+ inputs["input_files"] = files
+ inputs["Completion_of_main_indicators"] = title
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": 1})
+ inputs1 = {}
+ inputs1["file_list"] = files
+ inputs1["number_of_title"] = title_number
+ inputs1["title_style"] = title_style
+ token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_TITLE)
+ if not token:
+ await websocket.send_json(
+ {"message": "Invalid token document_to_report", "type": "error"})
+ inputs_list.append({"inputs": inputs1, "token": token, "workflow_type": 3})
- complete_response = ""
- if workflow_type == 1 or workflow_type == 2:
- async for rag_response in dify_service.workflow(token, current_user.id, inputs):
- # print(rag_response)
- try:
- if rag_response[:5] == "data:":
- # 濡傛灉鏄紝鍒欐埅鍙栨帀鍓�5涓瓧绗︼紝骞跺幓闄ら灏剧┖鐧界
- complete_response = rag_response[5:].strip()
- elif "event: ping" in rag_response:
- continue
- else:
- # 鍚﹀垯锛屼繚鎸佸師鏍�
- complete_response += rag_response
+ # print(inputs_list)
+ for idx, input in enumerate(inputs_list):
+ # print(input)
+ if idx < len(inputs_list) - 1:
+ is_next = 1
+ else:
+ is_next = 0
+ i = input["inputs"]
+ if "file_list" in i:
+ i["file_list"] = files
+ # print(i)
+ node_list = []
+ complete_response = ""
+ workflow_list = []
+ workflow_dict = {}
+ if input["workflow_type"] == 1 or input["workflow_type"] == 2:
+ async for rag_response in dify_service.workflow(input["token"], current_user.id, i):
+ # print(rag_response)
try:
- data = json.loads(complete_response)
- complete_response = ""
- if data.get("event") == "node_started" or data.get("event") == "node_finished": # "event": "message_end"
- if "data" not in data or not data["data"]: # 淇℃伅杩囨护
- logger.error("闈炴硶鏁版嵁--------------------")
- logger.error(data)
- continue
- else: # 姝e父杈撳嚭
+ if rag_response[:5] == "data:":
+ # 濡傛灉鏄紝鍒欐埅鍙栨帀鍓�5涓瓧绗︼紝骞跺幓闄ら灏剧┖鐧界
+ complete_response = rag_response[5:].strip()
+ elif "event: ping" in rag_response:
+ continue
+ else:
+ # 鍚﹀垯锛屼繚鎸佸師鏍�
+ complete_response += rag_response
+ try:
+ data = json.loads(complete_response)
+ # print(data)
+ node_data = deepcopy(data)
+ if "data" in node_data:
+ if "outputs" in node_data["data"]:
+ node_data["data"]["outputs"] = {}
+ if "inputs" in node_data["data"]:
+ node_data["data"]["inputs"] = {}
+ # print(node_data)
+ node_list.append(node_data)
+
+ complete_response = ""
+ if data.get("event") == "node_started": # "event": "message_end"
+
+ if "data" not in data or not data["data"]: # 淇℃伅杩囨护
+ logger.error("闈炴硶鏁版嵁--------------------")
+ logger.error(data)
+ continue
+ else: # 姝e父杈撳嚭
+ answer = data.get("data", "")
+ if isinstance(answer, str):
+ logger.error("----------------鏈煡鏁版嵁--------------------")
+ logger.error(data)
+ continue
+ elif isinstance(answer, dict):
+
+ message = answer.get("title", "")
+
+ result = {"message": message, "type": "system",
+ "workflow": {"node_data": workflow_list}}
+ elif data.get("event") == "node_finished":
+ workflow_list.append({
+ "title": data.get("data", {}).get("title", ""),
+ "status": data.get("data", {}).get("status", ""),
+ "created_at": data.get("data", {}).get("created_at", 0),
+ "finished_at": data.get("data", {}).get("finished_at", 0),
+ "node_type": data.get("data", {}).get("node_type", 0),
+ "elapsed_time": data.get("data", {}).get("elapsed_time", 0),
+ "error": data.get("data", {}).get("error", ""),
+ })
answer = data.get("data", "")
if isinstance(answer, str):
logger.error("----------------鏈煡鏁版嵁--------------------")
@@ -515,136 +604,210 @@
elif isinstance(answer, dict):
message = answer.get("title", "")
+ if answer.get("status") == "failed":
+ message = answer.get("error", "")
+ result = {"message": message, "type": "system",
+ "workflow": {"node_data": workflow_list}}
- result = {"message": message, "type": "system"}
- elif data.get("event") == "workflow_finished":
- answer = data.get("data", "")
- if isinstance(answer, str):
- logger.error("----------------鏈煡鏁版嵁--------------------")
- logger.error(data)
- result = {"message": "", "type": "close", "download_url": ""}
- elif isinstance(answer, dict):
- download_url = ""
- outputs = answer.get("outputs", {})
- if outputs:
- message = outputs.get("output", "")
- download_url = outputs.get("download_url", "")
- else:
- message = answer.get("error", "")
+ elif data.get("event") == "workflow_finished":
+ answer = data.get("data", "")
+ if isinstance(answer, str):
+ logger.error("----------------鏈煡鏁版嵁--------------------")
+ logger.error(data)
+ result = {"message": "", "type": "close", "download_url": "",
+ "is_next": is_next}
+ elif isinstance(answer, dict):
+ download_url = ""
+ outputs = answer.get("outputs", {})
+ if outputs:
+ message = outputs.get("output", "")
+ download_url = outputs.get("download_url", "")
+ else:
+ message = answer.get("error", "")
+ if download_url:
+ files = [{
+ "type": "document",
+ "transfer_method": "remote_url",
+ "url": download_url,
+ "upload_file_id": ""
+ }]
+ workflow_dict = {
+ "node_data": workflow_list,
+ "total_tokens": answer.get("total_tokens", 0),
+ "created_at": answer.get("created_at", 0),
+ "finished_at": answer.get("finished_at", 0),
+ "status": answer.get("status", ""),
+ "error": answer.get("error", ""),
+ "elapsed_time": answer.get("elapsed_time", 0)
+ }
+ result = {"message": message, "type": "message",
+ "download_url": download_url, "workflow": workflow_dict}
+ try:
+ SessionService(db).update_session(chat_id,
+ message={"role": "assistant",
+ "content": {
+ "answer": message,
+ "node_list": node_list,
+ "download_url": download_url}},
+ conversation_id=data.get(
+ "conversation_id"))
+ node_list = []
+ except Exception as e:
+ logger.error("淇濆瓨dify鐨勪細璇濆紓甯革紒")
+ logger.error(e)
+ try:
+ await websocket.send_json(result)
+ except Exception as e:
+ logger.error(e)
+ logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
- result = {"message": message, "type": "message", "download_url": download_url}
+ result = {"message": "", "type": "close", "workflow": workflow_dict,
+ "is_next": is_next, "download_url": download_url}
+
+
+ else:
+ continue
+ try:
+ await websocket.send_json(result)
+ except Exception as e:
+ logger.error(e)
+ logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
+ complete_response = ""
+
+ except json.JSONDecodeError as e:
+ print(f"Error decoding JSON: {e}")
+ # print(f"Response text: {text}")
+ except Exception as e2:
+ result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
+ await websocket.send_json(result)
+ print(f"Error process message of ragflow: {e2}")
+ elif input["workflow_type"] == 3:
+ image_list = []
+ # print(inputs)
+ complete_response = ""
+ answer_str = ""
+ async for rag_response in dify_service.chat(input["token"], current_user.id,
+ title_query, [],
+ conversation_id, i):
+ # print(rag_response)
+ try:
+ if rag_response[:5] == "data:":
+ # 濡傛灉鏄紝鍒欐埅鍙栨帀鍓�5涓瓧绗︼紝骞跺幓闄ら灏剧┖鐧界
+ complete_response = rag_response[5:].strip()
+ elif "event: ping" in rag_response:
+ continue
+ else:
+ # 鍚﹀垯锛屼繚鎸佸師鏍�
+ complete_response += rag_response
+ try:
+ data = json.loads(complete_response)
+ node_data = deepcopy(data)
+ if "data" in node_data:
+ if "outputs" in node_data["data"]:
+ node_data["data"]["outputs"] = {}
+ if "inputs" in node_data["data"]:
+ node_data["data"]["inputs"] = {}
+ # print(node_data)
+ node_list.append(node_data)
+ complete_response = ""
+ if data.get("event") == "node_started": # "event": "message_end"
+ if "data" not in data or not data["data"]: # 淇℃伅杩囨护
+ logger.error("闈炴硶鏁版嵁--------------------")
+ logger.error(data)
+ continue
+ else: # 姝e父杈撳嚭
+ answer = data.get("data", "")
+ if isinstance(answer, str):
+ logger.error("----------------鏈煡鏁版嵁--------------------")
+ logger.error(data)
+ continue
+ elif isinstance(answer, dict):
+
+ message = answer.get("title", "")
+
+ result = {"message": message, "type": "system",
+ "workflow": {"node_data": workflow_list}}
+ elif data.get("event") == "node_finished":
+ workflow_list.append({
+ "title": data.get("data", {}).get("title", ""),
+ "status": data.get("data", {}).get("status", ""),
+ "created_at": data.get("data", {}).get("created_at", 0),
+ "finished_at": data.get("data", {}).get("finished_at", 0),
+ "node_type": data.get("data", {}).get("node_type", 0),
+ "elapsed_time": data.get("data", {}).get("elapsed_time", 0),
+ "error": data.get("data", {}).get("error", ""),
+ })
+
+ answer = data.get("data", "")
+ if isinstance(answer, str):
+ logger.error("----------------鏈煡鏁版嵁--------------------")
+ logger.error(data)
+ continue
+ elif isinstance(answer, dict):
+
+ message = answer.get("title", "")
+ if answer.get("status") == "failed":
+ message = answer.get("error", "")
+ result = {"message": message, "type": "system",
+ "workflow": {"node_data": workflow_list}}
+ elif data.get("event") == "message":
+ answer_str = data.get("answer", "")
+ # try:
+ # msg_dict = json.loads(answer)
+ # message = msg_dict.get("output", "")
+ # except Exception as e:
+ # print(e)
+ # continue
+ result = {"message": answer_str, "type": "message",
+ "download_url": "", "workflow": {"node_data": workflow_list}}
+
+ # try:
+ # await websocket.send_json(result)
+ # except Exception as e:
+ # logger.error(e)
+ # logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
+ elif data.get("event") == "workflow_finished":
+ workflow_dict = {
+ "node_data": workflow_list,
+ "total_tokens": data.get("data", {}).get("total_tokens", 0),
+ "created_at": data.get("data", {}).get("created_at", 0),
+ "finished_at": data.get("data", {}).get("finished_at", 0),
+ "status": data.get("data", {}).get("status", ""),
+ "error": data.get("data", {}).get("error", ""),
+ "elapsed_time": data.get("data", {}).get("elapsed_time", 0)
+ }
try:
SessionService(db).update_session(chat_id,
message={"role": "assistant",
"content": {
- "answer": message,
- "download_url": download_url}},
+ "answer": answer_str,
+ "node_list": node_list,
+ "download_url": ""}},
conversation_id=data.get(
"conversation_id"))
+ node_list = []
except Exception as e:
logger.error("淇濆瓨dify鐨勪細璇濆紓甯革紒")
logger.error(e)
- await websocket.send_json(result)
- result = {"message": "", "type": "close", "download_url": ""}
-
-
- else:
- continue
- try:
- await websocket.send_json(result)
- except Exception as e:
- logger.error(e)
- logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
- complete_response = ""
- except json.JSONDecodeError as e:
- print(f"Error decoding JSON: {e}")
- # print(f"Response text: {text}")
- except Exception as e2:
- result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
- await websocket.send_json(result)
- print(f"Error process message of ragflow: {e2}")
- elif workflow_type == 3:
- image_list = []
- # print(inputs)
- complete_response = ""
- async for rag_response in dify_service.chat(token, current_user.id, title_query, [],
- conversation_id, inputs):
- print(rag_response)
- try:
- if rag_response[:5] == "data:":
- # 濡傛灉鏄紝鍒欐埅鍙栨帀鍓�5涓瓧绗︼紝骞跺幓闄ら灏剧┖鐧界
- complete_response = rag_response[5:].strip()
- elif "event: ping" in rag_response:
- continue
- else:
- # 鍚﹀垯锛屼繚鎸佸師鏍�
- complete_response += rag_response
- try:
- data = json.loads(complete_response)
- complete_response = ""
- if data.get("event") == "node_started" or data.get(
- "event") == "node_finished": # "event": "message_end"
- if "data" not in data or not data["data"]: # 淇℃伅杩囨护
- logger.error("闈炴硶鏁版嵁--------------------")
- logger.error(data)
+ elif data.get("event") == "message_end":
+ result = {"message": "", "type": "close", "workflow": workflow_dict,
+ "is_next": is_next}
+ else:
continue
- else: # 姝e父杈撳嚭
- answer = data.get("data", "")
- if isinstance(answer, str):
- logger.error("----------------鏈煡鏁版嵁--------------------")
- logger.error(data)
- continue
- elif isinstance(answer, dict):
-
- message = answer.get("title", "")
-
- result = {"message": message, "type": "system"}
- elif data.get("event") == "message":
- message = data.get("answer", "")
- # try:
- # msg_dict = json.loads(answer)
- # message = msg_dict.get("output", "")
- # except Exception as e:
- # print(e)
- # continue
- result = {"message": message, "type": "message",
- "download_url": ""}
try:
- SessionService(db).update_session(chat_id,
- message={"role": "assistant",
- "content": {
- "answer": message,
- "download_url": ""}},
- conversation_id=data.get(
- "conversation_id"))
+ await websocket.send_json(result)
except Exception as e:
- logger.error("淇濆瓨dify鐨勪細璇濆紓甯革紒")
logger.error(e)
- # try:
- # await websocket.send_json(result)
- # except Exception as e:
- # logger.error(e)
- # logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
-
- elif data.get("event") == "message_end":
- result = {"message": "", "type": "close", "download_url": ""}
- else:
- continue
- try:
- await websocket.send_json(result)
- except Exception as e:
- logger.error(e)
- logger.error("dify杩斿洖瀹㈡埛绔秷鎭紓甯�!")
- complete_response = ""
- except json.JSONDecodeError as e:
- print(f"Error decoding JSON: {e}")
- # print(f"Response text: {text}")
- except Exception as e2:
- result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
- await websocket.send_json(result)
- print(f"Error process message of ragflow: {e2}")
+ logger.error("dify杩斿洖瀹㈡埛绔秷鎭紓甯�!")
+ complete_response = ""
+ except json.JSONDecodeError as e:
+ print(f"Error decoding JSON: {e}")
+ # print(f"Response text: {text}")
+ except Exception as e2:
+ result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
+ await websocket.send_json(result)
+ print(f"Error process message of ragflow: {e2}")
elif chat_type == "documentIa":
- # print(122112)
token = DfTokenDao(db).get_token_by_id(DOCUMENT_IA_QUESTIONS)
# print(token)
if not token:
@@ -666,7 +829,8 @@
question,
agent_id,
AgentType.DIFY,
- current_user.id
+ current_user.id,
+ {"role": "user", "content": question}
)
conversation_id = session.conversation_id
except Exception as e:
@@ -685,162 +849,6 @@
complete_response = ""
async for rag_response in dify_service.chat(token, current_user.id, question, files,
conversation_id, {}):
- print(rag_response)
- try:
- if rag_response[:5] == "data:":
- # 濡傛灉鏄紝鍒欐埅鍙栨帀鍓�5涓瓧绗︼紝骞跺幓闄ら灏剧┖鐧界
- complete_response = rag_response[5:].strip()
- elif "event: ping" in rag_response:
- continue
- else:
- # 鍚﹀垯锛屼繚鎸佸師鏍�
- complete_response += rag_response
- try:
- data = json.loads(complete_response)
- if data.get("event") == "node_started" or data.get(
- "event") == "node_finished": # "event": "message_end"
- if "data" not in data or not data["data"]: # 淇℃伅杩囨护
- logger.error("闈炴硶鏁版嵁--------------------")
- logger.error(data)
- continue
- else: # 姝e父杈撳嚭
- answer = data.get("data", "")
- if isinstance(answer, str):
- logger.error("----------------鏈煡鏁版嵁--------------------")
- logger.error(data)
- continue
- elif isinstance(answer, dict):
-
- message = answer.get("title", "")
- if answer.get("status") == "failed":
- message = answer.get("error")
-
- result = {"message": message, "type": "system"}
- # continue
- elif data.get("event") == "message": # "event": "message_end"
- # 姝e父杈撳嚭
- answer = data.get("answer", "")
- result = {"message": answer, "type": "stream"}
- elif data.get("event") == "error":
- answer = data.get("message", "")
- result = {"message": answer, "type": "system"}
- elif data.get("event") == "workflow_finished":
- answer = data.get("data", "")
- if isinstance(answer, str):
- logger.error("----------------鏈煡鏁版嵁--------------------")
- logger.error(data)
- # result = {"message": "", "type": "close", "download_url": ""}
- elif isinstance(answer, dict):
- download_url = ""
- outputs = answer.get("outputs", {})
- if outputs:
- message = outputs.get("answer", "")
- # download_url = outputs.get("download_url", "")
- else:
- message = answer.get("error", "")
-
- result = {"message": message, "type": "system",
- "download_url": download_url}
- try:
- SessionService(db).update_session(chat_id,
- message={"role": "assistant",
- "content": {
- "answer": message,
- "download_url": download_url}},
- conversation_id=data.get(
- "conversation_id"))
- except Exception as e:
- logger.error("淇濆瓨dify鐨勪細璇濆紓甯革紒")
- logger.error(e)
- # await websocket.send_json(result)
- # continue
- elif data.get("event") == "message_end":
- result = {"message": "", "type": "close"}
-
- else:
- continue
- try:
- await websocket.send_json(result)
- except Exception as e:
- logger.error(e)
- logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
- complete_response = ""
- except json.JSONDecodeError as e:
- print(f"Error decoding JSON: {e}")
- # print(f"Response text: {text}")
- except Exception as e2:
- result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
- await websocket.send_json(result)
- print(f"Error process message of ragflow: {e2}")
- elif chat_type == "paperTalk":
- token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_PAPER)
- # print(token)
- if not token:
- await websocket.send_json({"message": "Invalid token", "type": "error"})
-
- while True:
- conversation_id = ""
- inputs = {}
- # print(4343)
- receive_message = await websocket.receive_json()
- print(f"Received from client {chat_id}: {receive_message}")
- if "difficulty" in receive_message:
- inputs["Question_Difficulty"] = receive_message["difficulty"]
- if "is_paper" in receive_message:
- inputs["Generate_test_paper"] = receive_message["is_paper"]
- if "single_choice" in receive_message:
- inputs["Multiple_choice_questions"] = receive_message["single_choice"]
- if "gap_filling" in receive_message:
- inputs["Fill_in_blank"] = receive_message["gap_filling"]
- if "true_or_false" in receive_message:
- inputs["true_or_false"] = receive_message["true_or_false"]
- if "multiple_choice" in receive_message:
- inputs["Multiple_Choice"] = receive_message["multiple_choice"]
- if "easy_question" in receive_message:
- inputs["Short_Answer_Questions"] = receive_message["easy_question"]
- if "case_questions" in receive_message:
- inputs["Case_Questions"] = receive_message["case_questions"]
- if "key_words" in receive_message:
- inputs["key_words"] = receive_message["key_words"]
- upload_files = receive_message.get('upload_files', [])
- question = receive_message.get('message', "")
- session_log = SessionService(db).get_session_by_id(chat_id)
- if not session_log and not upload_files:
- await websocket.send_json({"message": "闇�瑕佷笂浼犳枃妗o紒", "type": "error"})
- continue
- try:
- session = SessionService(db).create_session(
- chat_id,
- question if question else "寮�濮嬪嚭棰�",
- agent_id,
- AgentType.DIFY,
- current_user.id
- )
- conversation_id = session.conversation_id
- except Exception as e:
- logger.error(e)
- # complete_response = ""
-
- files = []
- for fileId in upload_files:
- files.append({
- "type": "document",
- "transfer_method": "local_file",
- "url": "",
- "upload_file_id": fileId
- })
- if files:
- inputs["upload_files"] = files
- # print(inputs)
- if not question and not inputs:
- await websocket.send_json({"message": "Invalid request", "type": "error"})
- continue
-
- if not question:
- question = "寮�濮嬪嚭棰�"
- complete_response = ""
- async for rag_response in dify_service.chat(token, current_user.id, question, files,
- conversation_id, inputs):
# print(rag_response)
try:
if rag_response[:5] == "data:":
@@ -853,7 +861,6 @@
complete_response += rag_response
try:
data = json.loads(complete_response)
- # print(data)
if data.get("event") == "node_started" or data.get(
"event") == "node_finished": # "event": "message_end"
if "data" not in data or not data["data"]: # 淇℃伅杩囨护
@@ -871,14 +878,11 @@
message = answer.get("title", "")
result = {"message": message, "type": "system"}
- # continue
+ continue
elif data.get("event") == "message": # "event": "message_end"
- # 姝e父杈撳嚭
+ # 姝e父杈撳嚭
answer = data.get("answer", "")
result = {"message": answer, "type": "stream"}
- elif data.get("event") == "error":
- answer = data.get("message", "")
- result = {"message": answer, "type": "system"}
elif data.get("event") == "workflow_finished":
answer = data.get("data", "")
if isinstance(answer, str):
@@ -890,12 +894,12 @@
outputs = answer.get("outputs", {})
if outputs:
message = outputs.get("answer", "")
- download_url = outputs.get("download_url", "")
+ # download_url = outputs.get("download_url", "")
else:
message = answer.get("error", "")
- result = {"message": message, "type": "system",
- "download_url": download_url}
+ # result = {"message": message, "type": "message",
+ # "download_url": download_url}
try:
SessionService(db).update_session(chat_id,
message={"role": "assistant",
@@ -908,7 +912,7 @@
logger.error("淇濆瓨dify鐨勪細璇濆紓甯革紒")
logger.error(e)
# await websocket.send_json(result)
- # continue
+ continue
elif data.get("event") == "message_end":
result = {"message": "", "type": "close"}
@@ -927,7 +931,6 @@
result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
await websocket.send_json(result)
print(f"Error process message of ragflow: {e2}")
-
# 鍚姩浠诲姟澶勭悊瀹㈡埛绔秷鎭�
tasks = [
diff --git a/app/config/env_conf/menu_conf.json b/app/config/env_conf/menu_conf.json
index bf4fd22..13715d0 100644
--- a/app/config/env_conf/menu_conf.json
+++ b/app/config/env_conf/menu_conf.json
@@ -1,23 +1,6 @@
{
"data": [
{
- "id": 10,
- "title": "鎶ュ憡鐢熸垚",
- "icon": "2",
- "img": "/src/assets/index/2.png",
- "desc": "鍩轰簬鎮ㄥ垱寤虹殑鎶ュ憡鏍煎紡鍜岀煡璇嗗簱涓殑鏂囨。鍐呭锛屽揩閫熺敓鎴愬畾鍒舵姤鍛婏紝骞舵敮鎸佷竴閿笅杞姐��",
- "describe": "鍩轰簬鎮ㄥ垱寤虹殑鎶ュ憡鏍煎紡鍜岀煡璇嗗簱涓殑鏂囨。鍐呭锛屽揩閫熺敓鎴愬畾鍒舵姤鍛婏紝骞舵敮鎸佷竴閿笅杞姐��",
- "rank": 100,
- "dialog": [
- {
- "id": "80ee430a-e396-48c4-a12c-7c7cdf5eda51",
- "chat_id": "80ee430a-e396-48c4-a12c-7c7cdf5eda51",
- "chat_type": "report",
- "agentType": 2
- }
- ]
- },
- {
"id": 1,
"title": "鎶ヨ〃鍚堝苟",
"icon": "6",
@@ -155,7 +138,7 @@
},
{
"id": 9,
- "title": "鏂囨。鎶ュ憡",
+ "title": "",
"icon": "2",
"img": "/src/assets/index/2.png",
"desc": "鍩轰簬鎮ㄥ垱寤虹殑鎶ュ憡鏍煎紡鍜屼笂浼犵殑鏂囨。鍐呭锛屽揩閫熺敓鎴愬畾鍒舵姤鍛婏紝骞舵敮鎸佷竴閿笅杞姐��",
diff --git a/app/models/session_model.py b/app/models/session_model.py
index 87f63b7..5765a44 100644
--- a/app/models/session_model.py
+++ b/app/models/session_model.py
@@ -24,6 +24,7 @@
tenant_id = Column(Integer) # 鍒涘缓浜�
message = Column(TEXT) # 璇存槑
conversation_id = Column(String(64))
+ workflow = Column(Integer, default=0)
# to_dict 鏂规硶
def to_dict(self):
@@ -32,6 +33,7 @@
'name': self.name,
'agent_type': self.agent_type,
'agent_id': self.agent_id,
+ 'workflow': self.workflow if self.workflow else 0,
'create_date': self.create_date.strftime("%Y-%m-%d %H:%M:%S"),
'update_date': self.update_date.strftime("%Y-%m-%d %H:%M:%S"),
}
diff --git a/app/service/session.py b/app/service/session.py
index e00bd3e..dca20a1 100644
--- a/app/service/session.py
+++ b/app/service/session.py
@@ -12,7 +12,7 @@
def __init__(self, db: Session):
self.db = db
- def create_session(self, session_id: str, name: str, agent_id: str, agent_type: AgentType, user_id: int) -> Type[
+ def create_session(self, session_id: str, name: str, agent_id: str, agent_type: AgentType, user_id: int, message: dict = None, workflow_type: int = 0) -> Type[
SessionModel] | SessionModel:
"""
鍒涘缓涓�涓柊鐨勪細璇濊褰曘��
@@ -28,7 +28,8 @@
"""
existing_session = self.get_session_by_id(session_id)
if existing_session:
- existing_session.add_message({"role": "user", "content": name})
+ # existing_session.add_message({"role": "user", "content": name})
+ existing_session.add_message(message)
existing_session.update_date = current_time()
self.db.commit()
self.db.refresh(existing_session)
@@ -40,7 +41,9 @@
agent_id=agent_id,
agent_type=agent_type,
tenant_id=user_id,
- message=json.dumps([{"role": "user", "content": name}])
+ # message=json.dumps([{"role": "user", "content": name}])
+ workflow = workflow_type,
+ message = json.dumps([message])
)
self.db.add(new_session)
self.db.commit()
--
Gitblit v1.8.0