app/api/agent.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
app/api/chat.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
app/api/files.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
app/api/report.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
app/config/config.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
app/config/config.yaml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
app/service/bisheng.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
app/service/difyService.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
app/service/ragflow.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
app/api/agent.py
@@ -6,7 +6,7 @@ from pydantic import BaseModel from sqlalchemy.orm import Session from app.api import Response, get_current_user, ResponseList from app.api import Response, get_current_user, ResponseList, process_files from app.api.user import reset_user_pwd from app.config.config import settings from app.models.agent_model import AgentType, AgentModel @@ -113,15 +113,19 @@ except Exception as e: raise HTTPException(status_code=500, detail=str(e)) elif agent.agent_type == AgentType.BISHENG: is_join = False if agent.name == "报告生成": is_join = True bisheng_service = BishengService(base_url=settings.sgb_base_url) try: token = get_bisheng_token(db, current_user.id) result = await bisheng_service.get_session_log(token, agent_id, conversation_id) combined_logs = [] last_question = None answer_str = "" files = [] for session in result: print(session) # print(session) # 检查 session 是否为 None if session is None: continue @@ -138,19 +142,38 @@ message = message_json['query'] elif 'report_name' in message_json: message = message_json['report_name'] except json.JSONDecodeError: pass # 非 JSON 字符串,继续使用原始 message if session.get('files') and isinstance(session.get('files'), str): try: files = json.loads(session.get('files')) process_files(files, agent_id) except json.JSONDecodeError: pass # 非 JSON 字符串,继续使用原始 message # 检查 message 是否为 None if message is None: continue if is_join: ... if session.get('role') == 'question': last_question = message elif session.get('role') == 'answer': answer_str += message if session.get('role') == 'question': last_question = message elif session.get('role') == 'answer' and last_question: combined_logs.append(last_question + "\n" + message) last_question = None return JSONResponse(status_code=200, content={"code": 200, "data": {"question":"", "answer": "\n".join(combined_logs)}}) else: if session.get('role') == 'question': last_question = message elif session.get('role') == 'answer' and last_question: combined_logs.append({ 'question': last_question, 'answer': message }) last_question = None return JSONResponse(status_code=200, content={"code": 200, "data": combined_logs if combined_logs else [{'question': last_question, 'answer': answer_str, 'files': files}]}) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) elif agent.agent_type == AgentType.BASIC: @@ -208,6 +231,9 @@ "file_url": i.get("content", {}).get("file_url")}] if "images" in i.get("content", {}): tmp_data["images"] = i.get("content", {}).get("images") if "download_url" in i.get("content", {}): tmp_data["download_url"] = i.get("content", {}).get("download_url") else: tmp_data["answer"] = i.get("content") data.append(tmp_data) app/api/chat.py
@@ -235,7 +235,7 @@ excel_url = None if file_name: excel_url = f"/api/files/download/?agent_id=basic_question_talk&file_id={file_name}&file_type=word" result = {"message": output, "type": "message", "file_url": excel_url, "file_name":file_name} result = {"message": output, "type": "message", "file_url": excel_url, "file_name": file_name} try: SessionService(db).update_session(chat_id, message={"role": "assistant", "content": result}) @@ -268,6 +268,7 @@ return None return (f"/api/files/download/?agent_id={agent_id}&file_id={name}" f"&file_type={file_type}") if e_name: excel_url = build_file_url(e_name, 'excel') excel_name = e_name @@ -294,7 +295,7 @@ await websocket.send_json(data) if message_data: try: SessionService(db).update_session(chat_id,message=message_data) SessionService(db).update_session(chat_id, message=message_data) except Exception as e: logger.error(f"Unexpected error when update_session: {e}") except Exception as e: @@ -306,110 +307,214 @@ if agent_type == AgentType.DIFY: dify_service = DifyService(settings.dify_base_url) # token = get_dify_token(db, current_user.id) token = settings.dify_api_token try: async def forward_to_dify(): while True: image_list = [] is_image = False conversation_id = "" receive_message = await websocket.receive_json() print(f"Received from client {chat_id}: {receive_message}") upload_file_id = receive_message.get('upload_file_id', "") question = receive_message.get('message', "") if not question and not image_url: await websocket.send_json({"message": "Invalid request", "type": "error"}) continue try: session = SessionService(db).create_session( chat_id, question, agent_id, AgentType.DIFY, current_user.id ) conversation_id = session.conversation_id except Exception as e: logger.error(e) # complete_response = "" answer_str = "" async for rag_response in dify_service.chat(token, current_user.id, question, upload_file_id, conversation_id): # print("=============================================") # print(rag_response) if agent.type == "imageTalk": token = settings.dify_api_token while True: image_list = [] is_image = False conversation_id = "" receive_message = await websocket.receive_json() print(f"Received from client {chat_id}: {receive_message}") upload_file_id = receive_message.get('upload_file_id', "") question = receive_message.get('message', "") if not question and not image_url: await websocket.send_json({"message": "Invalid request", "type": "error"}) continue try: if rag_response[:5] == "data:": # 如果是,则截取掉前5个字符,并去除首尾空白符 complete_response = rag_response[5:].strip() else: # 否则,保持原样 complete_response = rag_response # complete_response += text session = SessionService(db).create_session( chat_id, question, agent_id, AgentType.DIFY, current_user.id ) conversation_id = session.conversation_id except Exception as e: logger.error(e) # complete_response = "" answer_str = "" async for rag_response in dify_service.chat(token, current_user.id, question, upload_file_id, conversation_id): try: data = json.loads(complete_response) complete_response = "" # data = json_data.get("data") if data.get("event") == "agent_message":# "event": "message_end" if "answer" not in data or not data["answer"]: # 信息过滤 logger.error("非法数据--------------------") # logger.error(data) if rag_response[:5] == "data:": # 如果是,则截取掉前5个字符,并去除首尾空白符 complete_response = rag_response[5:].strip() else: # 否则,保持原样 complete_response = rag_response try: data = json.loads(complete_response) if data.get("event") == "agent_message": # "event": "message_end" if "answer" not in data or not data["answer"]: # 信息过滤 logger.error("非法数据--------------------") # logger.error(data) continue else: # 正常输出 answer = data.get("answer", "") if isinstance(answer, str): if "]+\)' url_image = image_list.pop() new_answer = re.sub(pattern, url_image, answer) answer_str += new_answer else: answer_str += answer elif isinstance(answer, dict): logger.error("未知数据体:0---------------------------------") logger.error(answer) answer_str += answer.get("action_input", "") result = {"message": answer_str, "type": "message"} elif data.get("event") == "message_end": images_url = [] if image_list and not is_image: answer_str += image_list[-1] result = {"message": answer_str, "type": "close"} # , "message_files": images_url try: SessionService(db).update_session(chat_id, message={"role": "assistant", "content": {"answer": answer_str, "images": images_url}}, conversation_id=data.get( "conversation_id")) except Exception as e: logger.error("保存dify的会话异常!") logger.error(e) elif data.get("event") == "message_file": await dify_service.save_images(data.get("url"), data.get("id") + ".png") image_list.append(f"})") # result = {"message": answer_str, "type": "message"} continue else: # 正常输出 answer = data.get("answer", "") if isinstance(answer, str): if "]+\)' url_image = image_list.pop() new_answer = re.sub(pattern, url_image, answer) answer_str += new_answer else: answer_str += answer elif isinstance(answer, dict): logger.error("未知数据体:0---------------------------------") logger.error(answer) answer_str += answer.get("action_input", "") result = {"message": answer_str, "type": "message"} elif data.get("event") == "message_end": images_url = [] # res_msg = await dify_service.get_session_history(token, data.get("conversation_id"), str(current_user.id)) # if len(res_msg) > 0: # message_files = res_msg[-1].get("message_files") # for msg_file in message_files: # await dify_service.save_images(msg_file.get("url"), msg_file.get("id")+".png") # images_url.append(msg_file.get("id")) # result = {"message": answer_str, "type": "close"} # , "message_files": images_url if image_list and not is_image: answer_str += image_list[-1] result = {"message": answer_str, "type": "close"} # , "message_files": images_url try: SessionService(db).update_session(chat_id, message={"role": "assistant", "content": {"answer":answer_str, "images":images_url}},conversation_id=data.get("conversation_id")) except Exception as e: logger.error("保存dify的会话异常!") logger.error(e) elif data.get("event") == "message_file": await dify_service.save_images(data.get("url"), data.get("id") + ".png") image_list.append(f"})") # result = {"message": answer_str, "type": "message"} else: continue await websocket.send_json(result) complete_response = "" except json.JSONDecodeError as e: print(f"Error decoding JSON: {e}") # print(f"Response text: {text}") except Exception as e2: result = {"message": f"内部错误: {e2}", "type": "close"} await websocket.send_json(result) print(f"Error process message of ragflow: {e2}") elif agent.type == "reportWorkflow": print(2323333232) token = settings.dify_workflow_clean while True: receive_message = await websocket.receive_json() print(f"Received from client {chat_id}: {receive_message}") upload_files = receive_message.get('upload_files', []) title = receive_message.get('title', "") workflow_type = receive_message.get('workflow', 1) if not upload_files: await websocket.send_json({"message": "Invalid request", "type": "error"}) continue try: session = SessionService(db).create_session( chat_id, title, agent_id, AgentType.DIFY, current_user.id ) conversation_id = session.conversation_id except Exception as e: logger.error(e) inputs = { "input_files": [] } for file in upload_files: inputs["input_files"].append({ "type": "document", "transfer_method": "local_file", "url": "", "upload_file_id": file }) if workflow_type == 2: inputs["Completion_of_main_indicators"] = title token = settings.dify_workflow_report complete_response = "" async for rag_response in dify_service.workflow(token, current_user.id, inputs): print(rag_response) try: if rag_response[:5] == "data:": # 如果是,则截取掉前5个字符,并去除首尾空白符 complete_response = rag_response[5:].strip() elif "event: ping" in rag_response: continue else: continue await websocket.send_json(result) complete_response = "" except json.JSONDecodeError as e: print(f"Error decoding JSON: {e}") # print(f"Response text: {text}") except Exception as e2: result = {"message": f"内部错误: {e2}", "type": "close"} await websocket.send_json(result) print(f"Error process message of ragflow: {e2}") # 否则,保持原样 complete_response += rag_response try: data = json.loads(complete_response) complete_response = "" if data.get("event") == "node_started" or data.get("event") == "node_finished": # "event": "message_end" if "data" not in data or not data["data"]: # 信息过滤 logger.error("非法数据--------------------") logger.error(data) continue else: # 正常输出 answer = data.get("data", "") if isinstance(answer, str): logger.error("----------------未知数据--------------------") logger.error(data) continue elif isinstance(answer, dict): message = answer.get("title", "") result = {"message": message, "type": "system"} elif data.get("event") == "workflow_finished": answer = data.get("data", "") if isinstance(answer, str): logger.error("----------------未知数据--------------------") logger.error(data) result = {"message": "", "type": "close", "download_url": ""} elif isinstance(answer, dict): download_url = "" outputs = answer.get("outputs", {}) if outputs: message = outputs.get("output", "") download_url = outputs.get("download_url", "") else: message = answer.get("error", "") result = {"message": message, "type": "close", "download_url": download_url} try: SessionService(db).update_session(chat_id, message={"role": "assistant", "content": { "answer": message, "download_url": download_url}}, conversation_id=data.get( "conversation_id")) except Exception as e: logger.error("保存dify的会话异常!") logger.error(e) await websocket.send_json(result) result = {"message": "", "type": "close", "download_url": ""} else: continue try: await websocket.send_json(result) except Exception as e: logger.error(e) logger.error("返回客户端消息异常!") complete_response = "" except json.JSONDecodeError as e: print(f"Error decoding JSON: {e}") # print(f"Response text: {text}") except Exception as e2: result = {"message": f"内部错误: {e2}", "type": "close"} await websocket.send_json(result) print(f"Error process message of ragflow: {e2}") # 启动任务处理客户端消息 tasks = [ app/api/files.py
@@ -92,25 +92,37 @@ elif agent_id == "basic_paper_agent": service = BasicService(base_url=settings.basic_paper_url) result = await service.paper_file_upload(chat_id, file.filename, file_content) # result = await service.paper_file_upload(chat_id, file.filename, file_content) elif agent.agent_type == AgentType.DIFY: file = file[0] # 读取上传的文件内容 try: file_content = await file.read() except Exception as e: return Response(code=400, msg=str(e)) token = settings.dify_api_token dify_service = DifyService(base_url=settings.dify_base_url) try: token = settings.dify_api_token result = await dify_service.upload(token, file.filename, file_content, current_user.id) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if agent.type == "imageTalk": file = file[0] # 读取上传的文件内容 try: file_content = await file.read() except Exception as e: return Response(code=400, msg=str(e)) try: data = await dify_service.upload(token, file.filename, file_content, current_user.id) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) elif agent.type == "reportWorkflow": result = [] for f in file: try: file_content = await f.read() except Exception as e: return Response(code=400, msg=str(e)) try: file_upload = await dify_service.upload(token, f.filename, file_content, current_user.id) result.append(file_upload) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # result["file_name"] = file.filename return Response(code=200, msg="", data=result) return Response(code=200, msg="", data=result) data = {"files": result} return Response(code=200, msg="", data=data) @router.get("/download/", response_model=Response) app/api/report.py
@@ -61,49 +61,55 @@ async def forward_to_client(): is_answer = False while True: message = await service_websocket.recv() print(f"Received from bisheng: {message}") data = json.loads(message) files = data.get("files", []) steps = data.get("intermediate_steps", "") msg = data.get("message", "") category = data.get("category", "") try: message = await service_websocket.recv() # print(f"Received from bisheng: {message}") data = json.loads(message) files = data.get("files", []) steps = data.get("intermediate_steps", "") msg = data.get("message", "") category = data.get("category", "") process_files(files, agent_id) if category == "question" and steps: is_answer = False if not steps: steps = "\n" else: steps = steps + "\n" result = {"message": steps, "type": "stream", "files": files} await websocket.send_json(result) if category == "answer" and not is_answer: if not steps.endswith("\n"): steps += "\n\n" result = {"message": steps, "type": "stream", "files": files} await websocket.send_json(result) if category == "answer" and is_answer: # process_files(files, agent_id) result = {"message": "\n", "type": "stream", "files": files} await websocket.send_json(result) elif data["type"] == "close": # process_files(files, agent_id) result = {"message": "", "type": "close", "files": files} await websocket.send_json(result) elif category == "processing": # process_files(files, agent_id) is_answer = True result = {"message": msg, "type": "stream", "files": files} await websocket.send_json(result) elif files: # process_files(files, agent_id) result = {"message": "", "type": "stream", "files": files} await websocket.send_json(result) if category == "question" and steps: is_answer = False if not steps: steps = "\n" elif category == "system" and steps: result = {"message": steps, "type": "stream", "files": files} await websocket.send_json(result) else: steps =steps + "\n" result = {"message": steps, "type": "stream", "files": files} await websocket.send_json(result) if category == "answer" and not is_answer: process_files(files, agent_id) if not steps.endswith("\n"): steps+= "\n\n" result = {"message": steps, "type": "stream", "files": files} await websocket.send_json(result) if category == "answer" and is_answer: process_files(files, agent_id) result = {"message": "\n", "type": "stream", "files": files} await websocket.send_json(result) elif category == "processing": process_files(files, agent_id) is_answer = True result = {"message": msg, "type": "stream", "files": files} await websocket.send_json(result) elif files: process_files(files, agent_id) result = {"message": "", "type": "stream", "files": files} await websocket.send_json(result) elif data["type"] == "close": process_files(files, agent_id) result = {"message": "", "type": "close", "files": files} await websocket.send_json(result) else: logger.error("-------------------11111111111111--------------------------") logger.error(data) logger.error("-------------------11111111111111--------------------------") logger.error(data) except Exception as e: logger.error(e) await websocket.send_json({"message": "连接异常!", "type": "close", "files": []}) # if len(files) != 0 or (msg and category != "answer") or data["type"] == "close": # if data["type"] == "close": # t = "close" app/config/config.py
@@ -19,6 +19,8 @@ basic_paper_url: str = '' dify_base_url: str = '' dify_api_token: str = '' dify_workflow_clean: str = '' dify_workflow_report: str = '' postgresql_database_url: str = '' def __init__(self, **kwargs): # Check if all required fields are provided and set them app/config/config.yaml
@@ -17,4 +17,6 @@ basic_paper_url: http://192.168.20.231:8000 dify_base_url: http://192.168.20.116 dify_api_token: app-YmOAMDsPpDDlqryMHnc9TzTO postgresql_database_url: postgresql+asyncpg://kong:kongpass@192.168.20.119:5432/kong postgresql_database_url: postgresql+asyncpg://kong:kongpass@192.168.20.119:5432/kong dify_workflow_clean: app-OpF0drPu0XcgqcekQpT4FA8a dify_workflow_report: app-0MAkdFWqh9zxwmU69O0BFU1s app/service/bisheng.py
@@ -94,14 +94,7 @@ pass if not name: name = item.get("flow_name") return name def process_name_report(item): # logger.error("-----------------------process_name-------------------------------------") # logger.error(item) name = item.get("flow_name", "报告生成") return name return name[:50] result = [ { @@ -126,16 +119,15 @@ async with httpx.AsyncClient() as client: response = await client.get(url, headers=headers) response.raise_for_status() # print(response.text) data = self._check_response(response) session_log = [ { "message": message.get("intermediate_steps", ""), "intermediate_steps": message.get("message", ""), "role": message.get("category"), "message":message.get("message", "") if message.get("message", "") else message.get("intermediate_steps", ""), "files": message.get("files", ""), "role": "question" if message.get("category") == "question" and message.get("message", "") else "answer", "ts": message.get("create_time") } for message in data for message in data if message.get("category") != "system" ] # 把session_log 按ts 升序排序 app/service/difyService.py
@@ -154,6 +154,33 @@ f.write(response.content) async def workflow(self, token: str, user_id: int, inputs: dict): target_url = f"{self.base_url}/v1/workflows/run" data = { "inputs": inputs, "response_mode": "streaming", "user": str(user_id), "files":[] } async with httpx.AsyncClient(timeout=1800) as client: headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {token}' } async with client.stream("POST", target_url, data=json.dumps(data), headers=headers) as response: if response.status_code == 200: try: async for answer in response.aiter_text(): # print(f"response of ragflow chat: {answer}") yield answer except GeneratorExit as e: print(e) return else: yield f"Error: {response.status_code}" if __name__ == "__main__": app/service/ragflow.py
@@ -214,13 +214,14 @@ if __name__ == "__main__": async def a(): a = RagflowService("http://192.168.20.119:11080") b = await a.get_knowledge_list("ImY3ZTZlZWQwYTY2NTExZWY5ZmFiMDI0MmFjMTMwMDA2Ig.Zzxwmw.uI_HAWzOkipQuga1aeQtoeIc3IM", 1, 10) print(b) import asyncio asyncio.run(a()) # async def a(): # a = RagflowService("http://192.168.20.119:11080") # b = await a.get_knowledge_list("ImY3ZTZlZWQwYTY2NTExZWY5ZmFiMDI0MmFjMTMwMDA2Ig.Zzxwmw.uI_HAWzOkipQuga1aeQtoeIc3IM", 1, # 10) # print(b) # # import asyncio # # asyncio.run(a()) password = RagflowCrypto(settings.PUBLIC_KEY, settings.PRIVATE_KEY).encrypt("123456") print(password)