From b7ad7ae90cd832281afe6d743776ad87f1e56e07 Mon Sep 17 00:00:00 2001
From: zhaoqingang <zhaoqg0118@163.com>
Date: 星期五, 03 一月 2025 16:55:47 +0800
Subject: [PATCH] 工作流日志
---
app/models/session_model.py | 2
app/config/config.py | 1
app/api/chat.py | 247 ++++++++++++++++++++++++----------------
app/service/session.py | 9 +
app/api/agent.py | 49 +++++++-
5 files changed, 197 insertions(+), 111 deletions(-)
diff --git a/app/api/agent.py b/app/api/agent.py
index 22ce8ef..786eb2f 100644
--- a/app/api/agent.py
+++ b/app/api/agent.py
@@ -223,17 +223,50 @@
for i in session.log_to_json().get("message", []):
if i.get("role") == "user":
tmp_data["question"] = i.get("content")
+ if "type" in i.get("content"):
+ tmp_data["type"] = i.get("content")["type"]
elif i.get("role") == "assistant":
if isinstance(i.get("content"), dict):
- tmp_data["answer"] = i.get("content", {}).get("answer")
- if "file_name" in i.get("content", {}):
- tmp_data["files"] = [{"file_name": i.get("content", {}).get("file_name"),
- "file_url": i.get("content", {}).get("file_url")}]
+ content = i.get("content", {})
+ tmp_data["answer"] = content.get("answer")
+ if "file_name" in content:
+ tmp_data["files"] = [{"file_name": content.get("file_name"),
+ "file_url": content.get("file_url")}]
if "images" in i.get("content", {}):
- tmp_data["images"] = i.get("content", {}).get("images")
+ tmp_data["images"] = content.get("images")
- if "download_url" in i.get("content", {}):
- tmp_data["download_url"] = i.get("content", {}).get("download_url")
+ if "download_url" in content:
+ tmp_data["download_url"] = content.get("download_url")
+
+ if "node_list" in content:
+ node_dict = {
+ "node_data": [], # {"title": "鍘婚櫎鍐椾綑", # 鑺傜偣鍚嶇О "status": "succeeded", # 鑺傜偣鐘舵��"created_at": 1735817337, # 寮�濮嬫椂闂�"finished_at": 1735817337, # 缁撴潫鏃堕棿"error": "" # 閿欒鏃ュ織}
+ "total_tokens": 0, # 鑺辫垂token鏁�
+ "created_at": 0, # 寮�濮嬫椂闂�
+ "finished_at": 0, # 缁撴潫鏃堕棿
+ "status": "succeeded", # 宸ヤ綔娴佺姸鎬�
+ "error": "", # 閿欒鏃ュ織
+ }
+ for node in content["node_list"]:
+ if node.get("event") == "node_finished":
+ node_dict["node_data"].append({
+ "title": node.get("data", {}).get("title", ""),
+ "status": node.get("data", {}).get("status", ""),
+ "created_at":node.get("data", {}).get("created_at", 0),
+ "finished_at":node.get("data", {}).get("finished_at", 0),
+ "node_type":node.get("data", {}).get("node_type", 0),
+ "elapsed_time":node.get("data", {}).get("elapsed_time", 0),
+ "error":node.get("data", {}).get("error", ""),
+ })
+ elif node.get("event") == "workflow_finished":
+ node_dict["total_tokens"] = node.get("data", {}).get("total_tokens", 0)
+ node_dict["created_at"] = node.get("data", {}).get("created_at", 0)
+ node_dict["finished_at"] = node.get("data", {}).get("finished_at", 0)
+ node_dict["status"] = node.get("data", {}).get("status", "")
+ node_dict["error"] = node.get("data", {}).get("error", "")
+ node_dict["elapsed_time"] = node.get("data", {}).get("elapsed_time", 0)
+ tmp_data["workflow"] = node_dict
+
else:
tmp_data["answer"] = i.get("content")
data.append(tmp_data)
@@ -255,3 +288,5 @@
return Response(code=404, msg="Agent not found")
return Response(code=200, msg="", data={"chat_id": uuid.uuid4().hex})
+
+
diff --git a/app/api/chat.py b/app/api/chat.py
index 2993b2a..eab8801 100644
--- a/app/api/chat.py
+++ b/app/api/chat.py
@@ -451,7 +451,9 @@
title if title else title_query,
agent_id,
AgentType.DIFY,
- current_user.id
+ current_user.id,
+ {"role": "user", "content": title if title else title_query, "type": workflow_type, "is_clean":is_clean},
+ workflow_type
)
conversation_id = session.conversation_id
except Exception as e:
@@ -475,12 +477,11 @@
"upload_file_id": ""
})
inputs_list = []
- token_list = []
if workflow_type == 1:
inputs["input_files"] = files
- inputs_list.append(inputs)
- token_list.append(token)
- elif workflow_type == 2 and is_clean == 0:
+ inputs["Completion_of_main_indicators"] = title
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
+ elif workflow_type == 2:
inputs["file_list"] = files
inputs["Completion_of_main_indicators"] = title
inputs["sub_titles"] = sub_titles
@@ -488,9 +489,8 @@
if not token:
await websocket.send_json(
{"message": "Invalid token document_to_report", "type": "error"})
- inputs_list.append(inputs)
- token_list.append(token)
- elif workflow_type == 3:
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
+ elif workflow_type == 3 and is_clean == 0 and tokens < max_token:
inputs["file_list"] = files
inputs["number_of_title"] = title_number
inputs["title_style"] = title_style
@@ -498,28 +498,32 @@
if not token:
await websocket.send_json(
{"message": "Invalid token document_to_title", "type": "error"})
- # inputs_list.append(inputs)
- # token_list.append(token)
- elif workflow_type == 2 and is_clean == 1:
- # inputs["input_files"] = files
- inputs_list.append(inputs)
- token_list.append(token)
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
+ elif workflow_type == 3 and is_clean == 1 or tokens >= max_token:
+ inputs["input_files"] = files
+ inputs["Completion_of_main_indicators"] = title
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": 1})
inputs1 = {}
- # inputs1["file_list"] = files
- inputs1["Completion_of_main_indicators"] = title
- inputs1["sub_titles"] = sub_titles
- token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_REPORT_TITLE)
+ inputs1["file_list"] = files
+ inputs1["number_of_title"] = title_number
+ inputs1["title_style"] = title_style
+ token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_TITLE)
if not token:
await websocket.send_json(
{"message": "Invalid token document_to_report", "type": "error"})
- inputs_list.append(inputs1)
- token_list.append(token)
+ inputs_list.append({"inputs": inputs, "token": token, "workflow_type": 3})
- complete_response = ""
- if workflow_type == 1 or workflow_type == 2:
- for inputs in inputs_list:
- inputs["input_files"] = files
- async for rag_response in dify_service.workflow(token, current_user.id, inputs):
+ # print(inputs_list)
+ for input in inputs_list:
+ i = input["inputs"]
+ if "file_list" in i:
+ i["file_list"] = files
+ node_list = []
+ complete_response = ""
+ workflow_list = []
+ workflow_dict = {}
+ if input["workflow_type"] == 1 or input["workflow_type"] == 2:
+ async for rag_response in dify_service.workflow(input["token"], current_user.id, i):
# print(rag_response)
try:
if rag_response[:5] == "data:":
@@ -532,8 +536,11 @@
complete_response += rag_response
try:
data = json.loads(complete_response)
+ # print(data)
+ node_list.append(data)
complete_response = ""
- if data.get("event") == "node_started" or data.get("event") == "node_finished": # "event": "message_end"
+ if data.get("event") == "node_started": # "event": "message_end"
+
if "data" not in data or not data["data"]: # 淇℃伅杩囨护
logger.error("闈炴硶鏁版嵁--------------------")
logger.error(data)
@@ -549,6 +556,16 @@
message = answer.get("title", "")
result = {"message": message, "type": "system"}
+ elif data.get("event") == "node_finished":
+ workflow_list.append({
+ "title": data.get("data", {}).get("title", ""),
+ "status": data.get("data", {}).get("status", ""),
+ "created_at":data.get("data", {}).get("created_at", 0),
+ "finished_at":data.get("data", {}).get("finished_at", 0),
+ "node_type":data.get("data", {}).get("node_type", 0),
+ "elapsed_time":data.get("data", {}).get("elapsed_time", 0),
+ "error":data.get("data", {}).get("error", ""),
+ })
elif data.get("event") == "workflow_finished":
answer = data.get("data", "")
if isinstance(answer, str):
@@ -570,12 +587,22 @@
"url": download_url,
"upload_file_id": ""
}]
+ workflow_dict = {
+ "node_data": workflow_list,
+ "total_tokens": answer.get("total_tokens", 0),
+ "created_at": answer.get("created_at", 0),
+ "finished_at": answer.get("finished_at", 0),
+ "status": answer.get("status", ""),
+ "error": answer.get("error", ""),
+ "elapsed_time": answer.get("elapsed_time", 0)
+ }
result = {"message": message, "type": "message", "download_url": download_url}
try:
SessionService(db).update_session(chat_id,
message={"role": "assistant",
"content": {
"answer": message,
+ "node_list": node_list,
"download_url": download_url}},
conversation_id=data.get(
"conversation_id"))
@@ -587,7 +614,8 @@
except Exception as e:
logger.error(e)
logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
- result = {"message": "", "type": "close", "download_url": ""}
+
+ result = {"message": "", "type": "close", "workflow": workflow_dict}
else:
@@ -605,88 +633,107 @@
result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
await websocket.send_json(result)
print(f"Error process message of ragflow: {e2}")
- elif workflow_type == 3:
- image_list = []
- # print(inputs)
- complete_response = ""
- async for rag_response in dify_service.chat(token, current_user.id, title_query, [],
- conversation_id, inputs):
- print(rag_response)
- try:
- if rag_response[:5] == "data:":
- # 濡傛灉鏄紝鍒欐埅鍙栨帀鍓�5涓瓧绗︼紝骞跺幓闄ら灏剧┖鐧界
- complete_response = rag_response[5:].strip()
- elif "event: ping" in rag_response:
- continue
- else:
- # 鍚﹀垯锛屼繚鎸佸師鏍�
- complete_response += rag_response
+ elif input["workflow_type"] == 3:
+ image_list = []
+ # print(inputs)
+ complete_response = ""
+ async for rag_response in dify_service.chat(input["token"], current_user.id, title_query, [],
+ conversation_id, i):
+ # print(rag_response)
try:
- data = json.loads(complete_response)
- complete_response = ""
- if data.get("event") == "node_started" or data.get(
- "event") == "node_finished": # "event": "message_end"
- if "data" not in data or not data["data"]: # 淇℃伅杩囨护
- logger.error("闈炴硶鏁版嵁--------------------")
- logger.error(data)
- continue
- else: # 姝e父杈撳嚭
- answer = data.get("data", "")
- if isinstance(answer, str):
- logger.error("----------------鏈煡鏁版嵁--------------------")
+ if rag_response[:5] == "data:":
+ # 濡傛灉鏄紝鍒欐埅鍙栨帀鍓�5涓瓧绗︼紝骞跺幓闄ら灏剧┖鐧界
+ complete_response = rag_response[5:].strip()
+ elif "event: ping" in rag_response:
+ continue
+ else:
+ # 鍚﹀垯锛屼繚鎸佸師鏍�
+ complete_response += rag_response
+ try:
+ data = json.loads(complete_response)
+ node_list.append(data)
+ complete_response = ""
+ if data.get("event") == "node_started": # "event": "message_end"
+ if "data" not in data or not data["data"]: # 淇℃伅杩囨护
+ logger.error("闈炴硶鏁版嵁--------------------")
logger.error(data)
continue
- elif isinstance(answer, dict):
+ else: # 姝e父杈撳嚭
+ answer = data.get("data", "")
+ if isinstance(answer, str):
+ logger.error("----------------鏈煡鏁版嵁--------------------")
+ logger.error(data)
+ continue
+ elif isinstance(answer, dict):
- message = answer.get("title", "")
+ message = answer.get("title", "")
- result = {"message": message, "type": "system"}
- elif data.get("event") == "message":
- message = data.get("answer", "")
- # try:
- # msg_dict = json.loads(answer)
- # message = msg_dict.get("output", "")
- # except Exception as e:
- # print(e)
- # continue
- result = {"message": message, "type": "message",
- "download_url": ""}
+ result = {"message": message, "type": "system"}
+ elif data.get("event") == "node_finished":
+ workflow_list.append({
+ "title": data.get("data", {}).get("title", ""),
+ "status": data.get("data", {}).get("status", ""),
+ "created_at":data.get("data", {}).get("created_at", 0),
+ "finished_at":data.get("data", {}).get("finished_at", 0),
+ "node_type":data.get("data", {}).get("node_type", 0),
+ "elapsed_time":data.get("data", {}).get("elapsed_time", 0),
+ "error":data.get("data", {}).get("error", ""),
+ })
+ elif data.get("event") == "message":
+ message = data.get("answer", "")
+ # try:
+ # msg_dict = json.loads(answer)
+ # message = msg_dict.get("output", "")
+ # except Exception as e:
+ # print(e)
+ # continue
+ result = {"message": message, "type": "message",
+ "download_url": ""}
+ try:
+ SessionService(db).update_session(chat_id,
+ message={"role": "assistant",
+ "content": {
+ "answer": message,
+ "node_list": node_list,
+ "download_url": ""}},
+ conversation_id=data.get(
+ "conversation_id"))
+ except Exception as e:
+ logger.error("淇濆瓨dify鐨勪細璇濆紓甯革紒")
+ logger.error(e)
+ # try:
+ # await websocket.send_json(result)
+ # except Exception as e:
+ # logger.error(e)
+ # logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
+ elif data.get("event") == "workflow_finished":
+ workflow_dict = {
+ "node_data": workflow_list,
+ "total_tokens": answer.get("total_tokens", 0),
+ "created_at": answer.get("created_at", 0),
+ "finished_at": answer.get("finished_at", 0),
+ "status": answer.get("status", ""),
+ "error": answer.get("error", ""),
+ "elapsed_time": answer.get("elapsed_time", 0)
+ }
+ elif data.get("event") == "message_end":
+ result = {"message": "", "type": "close", "workflow": workflow_dict}
+ else:
+ continue
try:
- SessionService(db).update_session(chat_id,
- message={"role": "assistant",
- "content": {
- "answer": message,
- "download_url": ""}},
- conversation_id=data.get(
- "conversation_id"))
+ await websocket.send_json(result)
except Exception as e:
- logger.error("淇濆瓨dify鐨勪細璇濆紓甯革紒")
logger.error(e)
- # try:
- # await websocket.send_json(result)
- # except Exception as e:
- # logger.error(e)
- # logger.error("杩斿洖瀹㈡埛绔秷鎭紓甯�!")
-
- elif data.get("event") == "message_end":
- result = {"message": "", "type": "close", "download_url": ""}
- else:
- continue
- try:
- await websocket.send_json(result)
- except Exception as e:
- logger.error(e)
- logger.error("dify杩斿洖瀹㈡埛绔秷鎭紓甯�!")
- complete_response = ""
- except json.JSONDecodeError as e:
- print(f"Error decoding JSON: {e}")
- # print(f"Response text: {text}")
- except Exception as e2:
- result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
- await websocket.send_json(result)
- print(f"Error process message of ragflow: {e2}")
+ logger.error("dify杩斿洖瀹㈡埛绔秷鎭紓甯�!")
+ complete_response = ""
+ except json.JSONDecodeError as e:
+ print(f"Error decoding JSON: {e}")
+ # print(f"Response text: {text}")
+ except Exception as e2:
+ result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
+ await websocket.send_json(result)
+ print(f"Error process message of ragflow: {e2}")
elif agent.type == "documentIa":
- print(122112)
token = DfTokenDao(db).get_token_by_id(DOCUMENT_IA_QUESTIONS)
# print(token)
if not token:
diff --git a/app/config/config.py b/app/config/config.py
index e925d4b..c5a10a0 100644
--- a/app/config/config.py
+++ b/app/config/config.py
@@ -23,7 +23,6 @@
dify_workflow_clean: str = ''
dify_workflow_report: str = ''
postgresql_database_url: str = ''
- max_report_tokens: int = 100000
def __init__(self, **kwargs):
# 鏇挎崲閰嶇疆涓殑IP鍦板潃
host_ip = os.getenv('HOST_IP', '127.0.0.1')
diff --git a/app/models/session_model.py b/app/models/session_model.py
index fd513d2..fd8bec6 100644
--- a/app/models/session_model.py
+++ b/app/models/session_model.py
@@ -18,6 +18,7 @@
tenant_id = Column(Integer) # 鍒涘缓浜�
message = Column(TEXT) # 璇存槑
conversation_id = Column(String(64))
+ # workflow = Column(Integer, default=0)
# to_dict 鏂规硶
def to_dict(self):
@@ -26,6 +27,7 @@
'name': self.name,
'agent_type': self.agent_type,
'agent_id': self.agent_id,
+ # 'workflow': self.workflow,
'create_date': self.create_date.strftime("%Y-%m-%d %H:%M:%S"),
'update_date': self.update_date.strftime("%Y-%m-%d %H:%M:%S"),
}
diff --git a/app/service/session.py b/app/service/session.py
index e00bd3e..bf96515 100644
--- a/app/service/session.py
+++ b/app/service/session.py
@@ -12,7 +12,7 @@
def __init__(self, db: Session):
self.db = db
- def create_session(self, session_id: str, name: str, agent_id: str, agent_type: AgentType, user_id: int) -> Type[
+ def create_session(self, session_id: str, name: str, agent_id: str, agent_type: AgentType, user_id: int, message:dict=None, workflow_type: int=0) -> Type[
SessionModel] | SessionModel:
"""
鍒涘缓涓�涓柊鐨勪細璇濊褰曘��
@@ -26,9 +26,11 @@
杩斿洖:
SessionModel: 鏂板垱寤虹殑浼氳瘽妯″瀷瀹炰緥锛屽鏋滀細璇滻D宸插瓨鍦ㄥ垯杩斿洖None銆�
"""
+ if not message:
+ message = {"role": "user", "content": name}
existing_session = self.get_session_by_id(session_id)
if existing_session:
- existing_session.add_message({"role": "user", "content": name})
+ existing_session.add_message(message)
existing_session.update_date = current_time()
self.db.commit()
self.db.refresh(existing_session)
@@ -40,7 +42,8 @@
agent_id=agent_id,
agent_type=agent_type,
tenant_id=user_id,
- message=json.dumps([{"role": "user", "content": name}])
+ # workflow=workflow_type,
+ message=json.dumps([message])
)
self.db.add(new_session)
self.db.commit()
--
Gitblit v1.8.0