zhaoqingang
2025-02-25 f21bffe8c2d6d58e15b416f93ed0edbe4078f38c
app/api/chat.py
@@ -1,18 +1,29 @@
import json
import uuid
import re
from copy import deepcopy
from fastapi import WebSocket, WebSocketDisconnect, APIRouter, Depends
import asyncio
import websockets
from sqlalchemy.orm import Session
from Log import logger
from app.api import get_current_user_websocket
from app.config.config import settings
from app.config.const import IMAGE_TO_TEXT, DOCUMENT_TO_REPORT, DOCUMENT_TO_CLEANING, DOCUMENT_IA_QUESTIONS, \
    DOCUMENT_TO_REPORT_TITLE, DOCUMENT_TO_TITLE, DOCUMENT_TO_PAPER, DOCUMENT_IA_QUESTIONS_DS, \
    DOCUMENT_IA_QUESTIONS_EQUIPMENT
from app.models import MenuCapacityModel
from app.models.agent_model import AgentModel, AgentType
from app.models.base_model import get_db
from app.models.user_model import UserModel
from app.service.v2.api_token import DfTokenDao
from app.service.dialog import update_session_history
from app.service.basic import BasicService
from app.service.difyService import DifyService
from app.service.ragflow import RagflowService
from app.service.service_token import get_bisheng_token, get_ragflow_token
from app.service.session import SessionService
router = APIRouter()
@@ -27,25 +38,34 @@
    tasks = []
    await websocket.accept()
    print(f"Client {agent_id} connected")
    agent = db.query(AgentModel).filter(AgentModel.id == agent_id).first()
    agent = db.query(MenuCapacityModel).filter(MenuCapacityModel.chat_id == agent_id).first()
    if not agent:
        print("Agent not found")
        agent = db.query(AgentModel).filter(AgentModel.id == agent_id).first()
        agent_type = agent.agent_type
        chat_type = agent.type
    else:
        agent_type = agent.capacity_type
        chat_type = agent.chat_type
    # print(agent_type)
    # print(chat_type)
    if not agent:
        ret = {"message": "Agent not found", "type": "close"}
        await websocket.send_json(ret)
        return
    agent_type = agent.agent_type
    if chat_id == "" or chat_id == "0":
        ret = {"message": "Chat ID not found", "type": "close"}
        await websocket.send_json(ret)
        return
    # print(agent_type)
    # print(chat_type)
    if agent_type == AgentType.RAGFLOW:
        ragflow_service = RagflowService(settings.fwr_base_url)
        token = get_ragflow_token(db, current_user.id)
        token = await get_ragflow_token(db, current_user.id)
        try:
            async def forward_to_ragflow():
                while True:
                    is_new = False
                    message = await websocket.receive_json()
                    print(f"Received from client {chat_id}: {message}")
                    chat_history = message.get('chatHistory', [])
@@ -53,7 +73,6 @@
                    if len(chat_history) == 0:
                        chat_history = await ragflow_service.get_session_history(token, chat_id)
                        if len(chat_history) == 0:
                            is_new = True
                            chat_history = await ragflow_service.set_session(token, agent_id,
                                                                             message, chat_id, True)
                            # print("chat_history------------------------", chat_history)
@@ -99,8 +118,12 @@
                            result = {"message": f"内部错误: {e2}", "type": "close"}
                            await websocket.send_json(result)
                            print(f"Error process message of ragflow: {e2}")
                    dialog_chat_history = await ragflow_service.get_session_history(token, chat_id, 1)
                    await update_session_history(db, dialog_chat_history, current_user.id, is_new)
                    try:
                        dialog_chat_history = await ragflow_service.get_session_history(token, chat_id, 1)
                        await update_session_history(db, dialog_chat_history, current_user.id)
                    except Exception as e:
                        logger.error(e)
                        logger.error("-----------------保存ragflow的历史会话异常-----------------")
            # 启动任务处理客户端消息
            tasks = [
@@ -125,7 +148,7 @@
                        pass
    elif agent_type == AgentType.BISHENG:
        token = get_bisheng_token(db, current_user.id)
        token = await get_bisheng_token(db, current_user.id)
        service_uri = f"{settings.sgb_websocket_url}/api/v1/assistant/chat/{agent_id}?t=&chat_id={chat_id}"
        headers = {'cookie': f"access_token_cookie={token};"}
@@ -196,6 +219,803 @@
                            await task
                        except asyncio.CancelledError:
                            pass
    elif agent_type == AgentType.BASIC:
        try:
            service = BasicService(base_url=settings.basic_base_url)
            while True:
                # 接收前端消息
                message = await websocket.receive_json()
                question = message.get("message")
                try:
                    SessionService(db).create_session(
                        chat_id,
                        question,
                        agent_id,
                        AgentType.BASIC,
                        current_user.id,
                        {"role": "user", "content": question}
                    )
                except Exception as e:
                    logger.error(e)
                if not question:
                    await websocket.send_json({"message": "Invalid request", "type": "error"})
                    continue
                # logger.error(agent.type)
                if chat_type == "questionTalk":
                    try:
                        data = await service.questions_talk(question, chat_id)
                        output = data.get("output", "")
                        file_name = data.get("filename", "")
                        excel_url = None
                        if file_name:
                            excel_url = f"/api/files/download/?agent_id=basic_question_talk&file_id={file_name}&file_type=word"
                        result = {"message": output, "type": "message", "file_url": excel_url, "file_name": file_name}
                        try:
                            SessionService(db).update_session(chat_id,
                                                              message={"role": "assistant", "content": result})
                        except Exception as e:
                            logger.error(e)
                        logger.error("-----------------返回数据--------------------")
                        await websocket.send_json(result)
                    except Exception as e2:
                        result = {"message": f"内部错误: {e2}", "type": "close"}
                        logger.error(str(e2))
                        logger.error(f"Error process message of basic chuti agent: {e2}")
                        await websocket.send_json(result)
                else:
                    message_data = {}
                    logger.error("---------------------excel_talk-----------------------------")
                    excel_url = ""
                    image_url = ""
                    image_name = ""
                    excel_name = ""
                    async for data in service.excel_talk(question, chat_id):
                        # logger.error(data)
                        output = data.get("output", "")
                        e_name = data.get("excel_name", "")
                        i_name = data.get("image_name", "")
                        def build_file_url(name, file_type):
                            if not name:
                                return None
                            return (f"/api/files/download/?agent_id={agent_id}&file_id={name}"
                                    f"&file_type={file_type}")
                        if e_name:
                            excel_url = build_file_url(e_name, 'excel')
                            excel_name = e_name
                        if i_name:
                            image_url = build_file_url(i_name, 'image')
                            image_name = i_name
                        if data["type"] == "message":
                            message_data = {
                                "content": output,
                                "excel_url": excel_url,
                                "image_url": image_url,
                                "image_name": image_name,
                                "excel_name": excel_name,
                                "sql": data.get("sql", ""),
                                "code": data.get("code", ""),
                                "e": data.get("e", ""),
                                "role": "assistant"}
                        # 发送结果给客户端
                        # data["type"] = "message"
                        data["message"] = output
                        data["excel_url"] = excel_url
                        data["image_url"] = image_url
                        await websocket.send_json(data)
                    if message_data:
                        try:
                            SessionService(db).update_session(chat_id, message=message_data)
                        except Exception as e:
                            logger.error(f"Unexpected error when update_session: {e}")
        except Exception as e:
            logger.error(e)
            await websocket.send_json({"message": "出现错误!", "type": "error"})
        finally:
            await websocket.close()
            print(f"Client {agent_id} disconnected")
    if agent_type == AgentType.DIFY:
        dify_service = DifyService(settings.dify_base_url)
        # token = get_dify_token(db, current_user.id)
        try:
            async def forward_to_dify():
                if chat_type == "imageTalk":
                    token = DfTokenDao(db).get_token_by_id(IMAGE_TO_TEXT)
                    if not token:
                        await websocket.send_json({"message": "Invalid token", "type": "error"})
                    while True:
                        image_list = []
                        is_image = False
                        conversation_id = ""
                        receive_message = await websocket.receive_json()
                        print(f"Received from client {chat_id}: {receive_message}")
                        upload_file_id = receive_message.get('upload_file_id', "")
                        question = receive_message.get('message', "")
                        if not question and not image_url:
                            await websocket.send_json({"message": "Invalid request", "type": "error"})
                            continue
                        try:
                            session = SessionService(db).create_session(
                                chat_id,
                                question,
                                agent_id,
                                AgentType.DIFY,
                                current_user.id,
                                {"role": "user", "content": question}
                            )
                            conversation_id = session.conversation_id
                        except Exception as e:
                            logger.error(e)
                        # complete_response = ""
                        files = []
                        if upload_file_id:
                            files.append({
                                "type": "image",
                                "transfer_method": "local_file",
                                "url": "",
                                "upload_file_id": upload_file_id
                            })
                        answer_str = ""
                        async for rag_response in dify_service.chat(token, current_user.id, question, files,
                                                                    conversation_id, {}):
                            # print(rag_response)
                            try:
                                if rag_response[:5] == "data:":
                                    # 如果是,则截取掉前5个字符,并去除首尾空白符
                                    complete_response = rag_response[5:].strip()
                                else:
                                    # 否则,保持原样
                                    complete_response = rag_response
                                try:
                                    data = json.loads(complete_response)
                                    if data.get("event") == "agent_message":  # "event": "message_end"
                                        if "answer" not in data or not data["answer"]:  # 信息过滤
                                            logger.error("非法数据--------------------")
                                            # logger.error(data)
                                            continue
                                        else:  # 正常输出
                                            answer = data.get("answer", "")
                                            if isinstance(answer, str):
                                                if "![](https://res.stepfun.com/" in answer and image_list:
                                                    is_image = True
                                                    pattern = r'!\[\] *\(https://res\.stepfun\.com/image_gen/[^)]+\)'
                                                    url_image = image_list.pop()
                                                    new_answer = re.sub(pattern, url_image, answer)
                                                    answer_str += new_answer
                                                else:
                                                    answer_str += answer
                                            elif isinstance(answer, dict):
                                                logger.error("未知数据体:0---------------------------------")
                                                logger.error(answer)
                                                answer_str += answer.get("action_input", "")
                                            result = {"message": answer_str, "type": "message"}
                                    elif data.get("event") == "message_end":
                                        images_url = []
                                        if image_list and not is_image:
                                            answer_str += image_list[-1]
                                        result = {"message": answer_str,
                                                  "type": "close"}  # , "message_files": images_url
                                        try:
                                            SessionService(db).update_session(chat_id,
                                                                              message={"role": "assistant",
                                                                                       "content": {"answer": answer_str,
                                                                                                   "images": images_url}},
                                                                              conversation_id=data.get(
                                                                                  "conversation_id"))
                                        except Exception as e:
                                            logger.error("保存dify的会话异常!")
                                            logger.error(e)
                                    elif data.get("event") == "message_file":
                                        await  dify_service.save_images(data.get("url"), data.get("id") + ".png")
                                        image_list.append(f"![](/api/files/image/{data.get('id')})")
                                        # result = {"message": answer_str, "type": "message"}
                                        continue
                                    else:
                                        continue
                                    await websocket.send_json(result)
                                    complete_response = ""
                                except json.JSONDecodeError as e:
                                    print(f"Error decoding JSON: {e}")
                                    # print(f"Response text: {text}")
                            except Exception as e2:
                                result = {"message": f"内部错误: {e2}", "type": "close"}
                                await websocket.send_json(result)
                                print(f"Error process message of ragflow: {e2}")
                elif chat_type == "reportWorkflow":
                    while True:
                        receive_message = await websocket.receive_json()
                        print(f"Received from client {chat_id}: {receive_message}")
                        upload_files = receive_message.get('upload_files', [])
                        upload_filenames = receive_message.get('upload_filenames', [])
                        title = receive_message.get('title', "")
                        sub_titles = receive_message.get('sub_titles', "")
                        workflow_type = receive_message.get('workflow', 1)
                        title_number = receive_message.get('title_number', 8)
                        title_style = receive_message.get('title_style', "")
                        title_query = receive_message.get('title_query', "")
                        is_clean = receive_message.get('is_clean', 0)
                        file_type = receive_message.get('file_type', 1)
                        max_token = receive_message.get('max_tokens', 100000)
                        tokens = receive_message.get('tokens', 0)
                        if upload_files:
                            title_query = "start"
                        # if not upload_files:
                        # await websocket.send_json({"message": "Invalid request", "type": "error"})
                        # continue
                        try:
                            session = SessionService(db).create_session(
                                chat_id,
                                title if title else title_query,
                                agent_id,
                                AgentType.DIFY,
                                current_user.id,
                                {"role": "user", "content": title if title else title_query, "type": workflow_type,
                                 "is_clean": is_clean, "upload_filenames":upload_filenames},
                                workflow_type
                            )
                            conversation_id = session.conversation_id
                        except Exception as e:
                            logger.error(e)
                        inputs = {
                        }
                        files = []
                        for file in upload_files:
                            if file_type == 1:
                                files.append({
                                    "type": "document",
                                    "transfer_method": "local_file",
                                    "url": "",
                                    "upload_file_id": file
                                })
                            else:
                                files.append({
                                    "type": "document",
                                    "transfer_method": "remote_url",
                                    "url": file,
                                    "upload_file_id": ""
                                })
                        inputs_list = []
                        is_next = 0
                        if workflow_type == 1:
                            token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_CLEANING)
                            if not token:
                                await websocket.send_json(
                                    {"message": "Invalid token document_to_cleaning", "type": "error"})
                            inputs["input_files"] = files
                            inputs["Completion_of_main_indicators"] = title
                            inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
                        elif workflow_type == 2:
                            inputs["file_list"] = files
                            inputs["Completion_of_main_indicators"] = title
                            inputs["sub_titles"] = sub_titles
                            token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_REPORT_TITLE)
                            if not token:
                                await websocket.send_json(
                                    {"message": "Invalid token document_to_report", "type": "error"})
                            inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
                        elif workflow_type == 3 and is_clean == 0 and tokens < max_token:
                            inputs["file_list"] = files
                            inputs["number_of_title"] = title_number
                            inputs["title_style"] = title_style
                            token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_TITLE)
                            if not token:
                                await websocket.send_json(
                                    {"message": "Invalid token document_to_title", "type": "error"})
                            inputs_list.append({"inputs": inputs, "token": token, "workflow_type": workflow_type})
                        elif workflow_type == 3 and is_clean == 1 or tokens >= max_token:
                            token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_CLEANING)
                            if not token:
                                await websocket.send_json(
                                    {"message": "Invalid token document_to_cleaning", "type": "error"})
                            inputs["input_files"] = files
                            inputs["Completion_of_main_indicators"] = title
                            inputs_list.append({"inputs": inputs, "token": token, "workflow_type": 1})
                            inputs1 = {}
                            inputs1["file_list"] = files
                            inputs1["number_of_title"] = title_number
                            inputs1["title_style"] = title_style
                            token = DfTokenDao(db).get_token_by_id(DOCUMENT_TO_TITLE)
                            if not token:
                                await websocket.send_json(
                                    {"message": "Invalid token document_to_report", "type": "error"})
                            inputs_list.append({"inputs": inputs1, "token": token, "workflow_type": 3})
                        # print(inputs_list)
                        for idx, input in enumerate(inputs_list):
                            # print(input)
                            if idx < len(inputs_list) - 1:
                                is_next = 1
                            else:
                                is_next = 0
                            i = input["inputs"]
                            if "file_list" in i:
                                i["file_list"] = files
                            # print(i)
                            node_list = []
                            complete_response = ""
                            workflow_list = []
                            workflow_dict = {}
                            if input["workflow_type"] == 1 or input["workflow_type"] == 2:
                                async for rag_response in dify_service.workflow(input["token"], current_user.id, i):
                                    # print(rag_response)
                                    try:
                                        if rag_response[:5] == "data:":
                                            # 如果是,则截取掉前5个字符,并去除首尾空白符
                                            complete_response = rag_response[5:].strip()
                                        elif "event: ping" in rag_response:
                                            continue
                                        else:
                                            # 否则,保持原样
                                            complete_response += rag_response
                                        try:
                                            data = json.loads(complete_response)
                                            # print(data)
                                            node_data = deepcopy(data)
                                            if "data" in node_data:
                                                if "outputs" in node_data["data"]:
                                                    node_data["data"]["outputs"] = {}
                                                if "inputs" in node_data["data"]:
                                                    node_data["data"]["inputs"] = {}
                                            # print(node_data)
                                            node_list.append(node_data)
                                            complete_response = ""
                                            if data.get("event") == "node_started":  # "event": "message_end"
                                                if "data" not in data or not data["data"]:  # 信息过滤
                                                    logger.error("非法数据--------------------")
                                                    logger.error(data)
                                                    continue
                                                else:  # 正常输出
                                                    answer = data.get("data", "")
                                                    if isinstance(answer, str):
                                                        logger.error("----------------未知数据--------------------")
                                                        logger.error(data)
                                                        continue
                                                    elif isinstance(answer, dict):
                                                        message = answer.get("title", "")
                                                    result = {"message": message, "type": "system",
                                                              "workflow": {"node_data": workflow_list}}
                                            elif data.get("event") == "node_finished":
                                                workflow_list.append({
                                                    "title": data.get("data", {}).get("title", ""),
                                                    "status": data.get("data", {}).get("status", ""),
                                                    "created_at": data.get("data", {}).get("created_at", 0),
                                                    "finished_at": data.get("data", {}).get("finished_at", 0),
                                                    "node_type": data.get("data", {}).get("node_type", 0),
                                                    "elapsed_time": data.get("data", {}).get("elapsed_time", 0),
                                                    "error": data.get("data", {}).get("error", ""),
                                                })
                                                answer = data.get("data", "")
                                                if isinstance(answer, str):
                                                    logger.error("----------------未知数据--------------------")
                                                    logger.error(data)
                                                    continue
                                                elif isinstance(answer, dict):
                                                    message = answer.get("title", "")
                                                    if answer.get("status") == "failed":
                                                        message = answer.get("error", "")
                                                        result = {"message": message, "type": "system",
                                                                  "workflow": {"node_data": workflow_list}}
                                            elif data.get("event") == "workflow_finished":
                                                answer = data.get("data", "")
                                                if isinstance(answer, str):
                                                    logger.error("----------------未知数据--------------------")
                                                    logger.error(data)
                                                    result = {"message": "", "type": "close", "download_url": "",
                                                              "is_next": is_next}
                                                elif isinstance(answer, dict):
                                                    download_url = ""
                                                    outputs = answer.get("outputs", {})
                                                    if outputs:
                                                        message = outputs.get("output", "")
                                                        download_url = outputs.get("download_url", "")
                                                    else:
                                                        message = answer.get("error", "")
                                                    if download_url:
                                                        files = [{
                                                            "type": "document",
                                                            "transfer_method": "remote_url",
                                                            "url": download_url,
                                                            "upload_file_id": ""
                                                        }]
                                                    workflow_dict = {
                                                        "node_data": workflow_list,
                                                        "total_tokens": answer.get("total_tokens", 0),
                                                        "created_at": answer.get("created_at", 0),
                                                        "finished_at": answer.get("finished_at", 0),
                                                        "status": answer.get("status", ""),
                                                        "error": answer.get("error", ""),
                                                        "elapsed_time": answer.get("elapsed_time", 0)
                                                    }
                                                    result = {"message": message, "type": "message",
                                                              "download_url": download_url, "workflow": workflow_dict}
                                                    try:
                                                        SessionService(db).update_session(chat_id,
                                                                                          message={"role": "assistant",
                                                                                                   "content": {
                                                                                                       "answer": message,
                                                                                                       "node_list": node_list,
                                                                                                       "download_url": download_url}},
                                                                                          conversation_id=data.get(
                                                                                              "conversation_id"))
                                                        node_list = []
                                                    except Exception as e:
                                                        logger.error("保存dify的会话异常!")
                                                        logger.error(e)
                                                    try:
                                                        await websocket.send_json(result)
                                                    except Exception as e:
                                                        logger.error(e)
                                                        logger.error("返回客户端消息异常!")
                                                    result = {"message": "", "type": "close", "workflow": workflow_dict,
                                                              "is_next": is_next, "download_url": download_url}
                                            else:
                                                continue
                                            try:
                                                await websocket.send_json(result)
                                            except Exception as e:
                                                logger.error(e)
                                                logger.error("返回客户端消息异常!")
                                            complete_response = ""
                                        except json.JSONDecodeError as e:
                                            print(f"Error decoding JSON: {e}")
                                            # print(f"Response text: {text}")
                                    except Exception as e2:
                                        result = {"message": f"内部错误: {e2}", "type": "close"}
                                        await websocket.send_json(result)
                                        print(f"Error process message of ragflow: {e2}")
                            elif input["workflow_type"] == 3:
                                image_list = []
                                # print(inputs)
                                complete_response = ""
                                answer_str = ""
                                async for rag_response in dify_service.chat(input["token"], current_user.id,
                                                                            title_query, [],
                                                                            conversation_id, i):
                                    # print(rag_response)
                                    try:
                                        if rag_response[:5] == "data:":
                                            # 如果是,则截取掉前5个字符,并去除首尾空白符
                                            complete_response = rag_response[5:].strip()
                                        elif "event: ping" in rag_response:
                                            continue
                                        else:
                                            # 否则,保持原样
                                            complete_response += rag_response
                                        try:
                                            data = json.loads(complete_response)
                                            node_data = deepcopy(data)
                                            if "data" in node_data:
                                                if "outputs" in node_data["data"]:
                                                    node_data["data"]["outputs"] = {}
                                                if "inputs" in node_data["data"]:
                                                    node_data["data"]["inputs"] = {}
                                            # print(node_data)
                                            node_list.append(node_data)
                                            complete_response = ""
                                            if data.get("event") == "node_started":  # "event": "message_end"
                                                if "data" not in data or not data["data"]:  # 信息过滤
                                                    logger.error("非法数据--------------------")
                                                    logger.error(data)
                                                    continue
                                                else:  # 正常输出
                                                    answer = data.get("data", "")
                                                    if isinstance(answer, str):
                                                        logger.error("----------------未知数据--------------------")
                                                        logger.error(data)
                                                        continue
                                                    elif isinstance(answer, dict):
                                                        message = answer.get("title", "")
                                                    result = {"message": message, "type": "system",
                                                              "workflow": {"node_data": workflow_list}}
                                            elif data.get("event") == "node_finished":
                                                workflow_list.append({
                                                    "title": data.get("data", {}).get("title", ""),
                                                    "status": data.get("data", {}).get("status", ""),
                                                    "created_at": data.get("data", {}).get("created_at", 0),
                                                    "finished_at": data.get("data", {}).get("finished_at", 0),
                                                    "node_type": data.get("data", {}).get("node_type", 0),
                                                    "elapsed_time": data.get("data", {}).get("elapsed_time", 0),
                                                    "error": data.get("data", {}).get("error", ""),
                                                })
                                                answer = data.get("data", "")
                                                if isinstance(answer, str):
                                                    logger.error("----------------未知数据--------------------")
                                                    logger.error(data)
                                                    continue
                                                elif isinstance(answer, dict):
                                                    message = answer.get("title", "")
                                                    if answer.get("status") == "failed":
                                                        message = answer.get("error", "")
                                                    result = {"message": message, "type": "system",
                                                              "workflow": {"node_data": workflow_list}}
                                            elif data.get("event") == "message":
                                                answer_str = data.get("answer", "")
                                                # try:
                                                #     msg_dict = json.loads(answer)
                                                #     message = msg_dict.get("output",  "")
                                                # except Exception as e:
                                                #     print(e)
                                                #     continue
                                                result = {"message": answer_str, "type": "message",
                                                          "download_url": "", "workflow": {"node_data": workflow_list}}
                                                # try:
                                                #     await websocket.send_json(result)
                                                # except Exception as e:
                                                #     logger.error(e)
                                                #     logger.error("返回客户端消息异常!")
                                            elif data.get("event") == "workflow_finished":
                                                workflow_dict = {
                                                    "node_data": workflow_list,
                                                    "total_tokens": data.get("data", {}).get("total_tokens", 0),
                                                    "created_at": data.get("data", {}).get("created_at", 0),
                                                    "finished_at": data.get("data", {}).get("finished_at", 0),
                                                    "status": data.get("data", {}).get("status", ""),
                                                    "error": data.get("data", {}).get("error", ""),
                                                    "elapsed_time": data.get("data", {}).get("elapsed_time", 0)
                                                }
                                                try:
                                                    SessionService(db).update_session(chat_id,
                                                                                      message={"role": "assistant",
                                                                                               "content": {
                                                                                                   "answer": answer_str,
                                                                                                   "node_list": node_list,
                                                                                                   "download_url": ""}},
                                                                                      conversation_id=data.get(
                                                                                          "conversation_id"))
                                                    node_list = []
                                                except Exception as e:
                                                    logger.error("保存dify的会话异常!")
                                                    logger.error(e)
                                            elif data.get("event") == "message_end":
                                                result = {"message": "", "type": "close", "workflow": workflow_dict,
                                                          "is_next": is_next}
                                            else:
                                                continue
                                            try:
                                                await websocket.send_json(result)
                                            except Exception as e:
                                                logger.error(e)
                                                logger.error("dify返回客户端消息异常!")
                                            complete_response = ""
                                        except json.JSONDecodeError as e:
                                            print(f"Error decoding JSON: {e}")
                                            # print(f"Response text: {text}")
                                    except Exception as e2:
                                        result = {"message": f"内部错误: {e2}", "type": "close"}
                                        await websocket.send_json(result)
                                        print(f"Error process message of ragflow: {e2}")
                elif chat_type == "documentIa" or chat_type == "documentIaDs" or chat_type == "documentIaEq":
                    token_dict = {
                        "documentIa": DOCUMENT_IA_QUESTIONS,
                        "documentIaDs": DOCUMENT_IA_QUESTIONS_DS,
                        "documentIaEq": DOCUMENT_IA_QUESTIONS_EQUIPMENT,
                    }
                    token = DfTokenDao(db).get_token_by_id(token_dict[chat_type])
                    # print(token)
                    node_list = []
                    workflow_list = []
                    workflow_dict = {}
                    if not token:
                        await websocket.send_json({"message": "Invalid token", "type": "error"})
                    while True:
                        conversation_id = ""
                        # print(4343)
                        receive_message = await websocket.receive_json()
                        print(f"Received from client {chat_id}: {receive_message}")
                        upload_file_id = receive_message.get('upload_file_id', [])
                        upload_filenames = receive_message.get('upload_filenames', [])
                        question = receive_message.get('message', "")
                        if not question and not image_url:
                            await websocket.send_json({"message": "Invalid request", "type": "error"})
                            continue
                        try:
                            session = SessionService(db).create_session(
                                chat_id,
                                question,
                                agent_id,
                                AgentType.DIFY,
                                current_user.id,
                                {"role": "user", "content": question, "upload_filenames": upload_filenames}
                            )
                            conversation_id = session.conversation_id
                        except Exception as e:
                            logger.error(e)
                        # complete_response = ""
                        files = []
                        for fileId in upload_file_id:
                            files.append({
                                "type": "document",
                                "transfer_method": "local_file",
                                "url": "",
                                "upload_file_id": fileId
                            })
                        answer_str = ""
                        complete_response = ""
                        async for rag_response in dify_service.chat(token, current_user.id, question, files,
                                                                    conversation_id, {}):
                            # print(rag_response)
                            try:
                                if rag_response[:5] == "data:":
                                    # 如果是,则截取掉前5个字符,并去除首尾空白符
                                    complete_response = rag_response[5:].strip()
                                elif "event: ping" in rag_response:
                                    continue
                                else:
                                    # 否则,保持原样
                                    complete_response += rag_response
                                try:
                                    data = json.loads(complete_response)
                                    node_data = deepcopy(data)
                                    if "data" in node_data:
                                        if "outputs" in node_data["data"]:
                                            node_data["data"]["outputs"] = {}
                                        if "inputs" in node_data["data"]:
                                            node_data["data"]["inputs"] = {}
                                    # print(node_data)
                                    node_list.append(node_data)
                                    if data.get("event") == "node_started":
                                        if "data" not in data or not data["data"]:  # 信息过滤
                                            logger.error("非法数据--------------------")
                                            logger.error(data)
                                            continue
                                        else:  # 正常输出
                                            answer = data.get("data", "")
                                            if isinstance(answer, str):
                                                logger.error("----------------未知数据--------------------")
                                                logger.error(data)
                                                continue
                                            elif isinstance(answer, dict):
                                                if answer.get("status") == "failed":
                                                    message = answer.get("error", "")
                                                else:
                                                    message = answer.get("title", "")
                                                result = {"message": message, "type": "system", "workflow": {"node_data": workflow_list}}
                                    elif data.get("event") == "node_finished":
                                        workflow_list.append({
                                            "title": data.get("data", {}).get("title", ""),
                                            "status": data.get("data", {}).get("status", ""),
                                            "created_at": data.get("data", {}).get("created_at", 0),
                                            "finished_at": data.get("data", {}).get("finished_at", 0),
                                            "node_type": data.get("data", {}).get("node_type", 0),
                                            "elapsed_time": data.get("data", {}).get("elapsed_time", 0),
                                            "error": data.get("data", {}).get("error", ""),
                                        })
                                        if "data" not in data or not data["data"]:  # 信息过滤
                                            logger.error("非法数据--------------------")
                                            logger.error(data)
                                            continue
                                        else:  # 正常输出
                                            answer = data.get("data", "")
                                            if isinstance(answer, str):
                                                logger.error("----------------未知数据--------------------")
                                                logger.error(data)
                                                continue
                                            elif isinstance(answer, dict):
                                                if answer.get("status") == "failed":
                                                    message = answer.get("error", "")
                                                else:
                                                    message = answer.get("title", "")
                                                result = {"message": message, "type": "system",
                                                          "workflow": {"node_data": workflow_list}}
                                    elif data.get("event") == "message":  # "event": "message_end"
                                        # 正常输出
                                        answer = data.get("answer", "")
                                        result = {"message": answer, "type": "stream", "workflow": {"node_data": workflow_list}}
                                    elif data.get("event") == "error":  # "event": "message_end"
                                        # 正常输出
                                        answer = data.get("message", "")
                                        result = {"message": answer, "type": "system", "workflow": {"node_data": workflow_list}}
                                    elif data.get("event") == "workflow_finished":
                                        answer = data.get("data", "")
                                        if isinstance(answer, str):
                                            logger.error("----------------未知数据--------------------")
                                            logger.error(data)
                                            result = {"message": "", "type": "close", "download_url": ""}
                                        elif isinstance(answer, dict):
                                            download_url = ""
                                            outputs = answer.get("outputs", {})
                                            if outputs:
                                                message = outputs.get("answer", "")
                                                # download_url = outputs.get("download_url", "")
                                            else:
                                                message = answer.get("error", "")
                                            # result = {"message": message, "type": "message",
                                            #           "download_url": download_url}
                                            try:
                                                SessionService(db).update_session(chat_id,
                                                                                  message={"role": "assistant",
                                                                                           "content": {
                                                                                               "answer": message,
                                                                                               "node_list": node_list,
                                                                                               "download_url": download_url}},
                                                                                  conversation_id=data.get(
                                                                                      "conversation_id"))
                                                node_list = []
                                            except Exception as e:
                                                logger.error("保存dify的会话异常!")
                                                logger.error(e)
                                            # await websocket.send_json(result)
                                        workflow_dict = {
                                            "node_data": workflow_list,
                                            "total_tokens": data.get("data", {}).get("total_tokens", 0),
                                            "created_at": data.get("data", {}).get("created_at", 0),
                                            "finished_at": data.get("data", {}).get("finished_at", 0),
                                            "status": data.get("data", {}).get("status", ""),
                                            "error": data.get("data", {}).get("error", ""),
                                            "elapsed_time": data.get("data", {}).get("elapsed_time", 0)
                                        }
                                        continue
                                    elif data.get("event") == "message_end":
                                        result = {"message": "", "type": "close","workflow": workflow_dict}
                                    else:
                                        continue
                                    try:
                                        await websocket.send_json(result)
                                    except Exception as e:
                                        logger.error(e)
                                        logger.error("返回客户端消息异常!")
                                    complete_response = ""
                                except json.JSONDecodeError as e:
                                    print(f"Error decoding JSON: {e}")
                                    # print(f"Response text: {text}")
                            except Exception as e2:
                                result = {"message": f"内部错误: {e2}", "type": "close"}
                                await websocket.send_json(result)
                                print(f"Error process message of ragflow: {e2}")
            # 启动任务处理客户端消息
            tasks = [
                asyncio.create_task(forward_to_dify())
            ]
            await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        except WebSocketDisconnect as e1:
            print(f"Client {chat_id} disconnected: {e1}")
            await websocket.close()
        except Exception as e:
            print(f"Exception occurred: {e}")
        finally:
            print("Cleaning up resources of ragflow")
            # 取消所有任务
            for task in tasks:
                if not task.done():
                    task.cancel()
                    try:
                        await task
                    except asyncio.CancelledError:
                        pass
    else:
        ret = {"message": "Agent not found", "type": "close"}
        await websocket.send_json(ret)