From dc478b065693dd24e4cae719186d6aafb2d24f6d Mon Sep 17 00:00:00 2001
From: zhaoqingang <zhaoqg0118@163.com>
Date: 星期一, 25 十一月 2024 10:05:10 +0800
Subject: [PATCH] difyq 接入
---
app/models/agent_model.py | 1
app/models/session_model.py | 1
app/config/config.py | 2
app/api/chat.py | 88 +++++++++++++++++
app/config/config.yaml | 4
app/service/difyService.py | 153 ++++++++++++++++++++++++++++++
app/service/session.py | 1
app/api/agent.py | 24 ++++
app/api/files.py | 17 +++
9 files changed, 290 insertions(+), 1 deletions(-)
diff --git a/app/api/agent.py b/app/api/agent.py
index 5842350..5e72c15 100644
--- a/app/api/agent.py
+++ b/app/api/agent.py
@@ -161,6 +161,30 @@
data.append(tmp_data)
return JSONResponse(status_code=200, content={"code": 200, "data": data})
+ elif agent.agent_type == AgentType.DIFY:
+ data = []
+ session = db.query(SessionModel).filter(SessionModel.id == conversation_id).first()
+ if session:
+ tmp_data = {}
+ for i in session.log_to_json().get("message", []):
+ if i.get("role") == "user":
+ tmp_data["question"] = i.get("content")
+ elif i.get("role") == "assistant":
+ if isinstance(i.get("content"), dict):
+ tmp_data["answer"] = i.get("content", {}).get("answer")
+ if "file_name" in i.get("content", {}):
+ tmp_data["files"] = [{"file_name": i.get("content", {}).get("file_name"),
+ "file_url": i.get("content", {}).get("file_url")}]
+ else:
+ tmp_data["answer"] = i.get("content")
+ data.append(tmp_data)
+ tmp_data = {}
+
+ if tmp_data:
+ data.append(tmp_data)
+
+ return JSONResponse(status_code=200, content={"code": 200, "data": data})
+
else:
return JSONResponse(status_code=200, content={"code": 200, "log": "Unsupported agent type"})
diff --git a/app/api/chat.py b/app/api/chat.py
index 076950e..4359eab 100644
--- a/app/api/chat.py
+++ b/app/api/chat.py
@@ -14,6 +14,7 @@
from app.models.user_model import UserModel
from app.service.dialog import update_session_history
from app.service.basic import BasicService
+from app.service.difyService import DifyService
from app.service.ragflow import RagflowService
from app.service.service_token import get_bisheng_token, get_ragflow_token
from app.service.session import SessionService
@@ -286,6 +287,93 @@
finally:
await websocket.close()
print(f"Client {agent_id} disconnected")
+ if agent_type == AgentType.DIFY:
+ dify_service = DifyService(settings.dify_base_url)
+ # token = get_dify_token(db, current_user.id)
+ token = settings.dify_api_token
+ try:
+ async def forward_to_dify():
+ while True:
+ conversation_id = ""
+ receive_message = await websocket.receive_json()
+ print(f"Received from client {chat_id}: {receive_message}")
+ upload_file_id = receive_message.get('upload_file_id', [])
+ question = receive_message.get('message', "")
+ if not question and not image_url:
+ await websocket.send_json({"message": "Invalid request", "type": "error"})
+ continue
+ try:
+ session = SessionService(db).create_session(
+ chat_id,
+ question,
+ agent_id,
+ AgentType.DIFY,
+ current_user.id
+ )
+ conversation_id = session.conversation_id
+ except Exception as e:
+ logger.error(e)
+ complete_response = ""
+ async for rag_response in dify_service.chat(token, chat_id, question, upload_file_id, conversation_id):
+ try:
+ if rag_response[:5] == "data:":
+ # 濡傛灉鏄紝鍒欐埅鍙栨帀鍓�5涓瓧绗︼紝骞跺幓闄ら灏剧┖鐧界
+ text = rag_response[5:].strip()
+ else:
+ # 鍚﹀垯锛屼繚鎸佸師鏍�
+ text = rag_response
+ complete_response += text
+ try:
+ data = json.loads(complete_response)
+ # data = json_data.get("data")
+ if "answer" not in data: # 淇℃伅杩囨护
+ continue
+ else: # 姝e父杈撳嚭
+ answer = data.get("answer", "")
+
+ result = {"message": answer, "type": "message"}
+ try:
+ SessionService(db).update_session(chat_id,
+ message={"role": "assistant", "content": data, "conversation_id": data.get("conversation_id")})
+ except Exception as e:
+ logger.error(e)
+ await websocket.send_json(result)
+ complete_response = ""
+ except json.JSONDecodeError as e:
+ print(f"Error decoding JSON: {e}")
+ # print(f"Response text: {text}")
+ except Exception as e2:
+ result = {"message": f"鍐呴儴閿欒锛� {e2}", "type": "close"}
+ await websocket.send_json(result)
+ print(f"Error process message of ragflow: {e2}")
+ try:
+ dialog_chat_history = await ragflow_service.get_session_history(token, chat_id, 1)
+ await update_session_history(db, dialog_chat_history, current_user.id)
+ except Exception as e:
+ logger.error(e)
+ logger.error("-----------------淇濆瓨ragflow鐨勫巻鍙蹭細璇濆紓甯�-----------------")
+
+ # 鍚姩浠诲姟澶勭悊瀹㈡埛绔秷鎭�
+ tasks = [
+ asyncio.create_task(forward_to_dify())
+ ]
+ await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
+ except WebSocketDisconnect as e1:
+ print(f"Client {chat_id} disconnected: {e1}")
+ await websocket.close()
+ except Exception as e:
+ print(f"Exception occurred: {e}")
+
+ finally:
+ print("Cleaning up resources of ragflow")
+ # 鍙栨秷鎵�鏈変换鍔�
+ for task in tasks:
+ if not task.done():
+ task.cancel()
+ try:
+ await task
+ except asyncio.CancelledError:
+ pass
else:
ret = {"message": "Agent not found", "type": "close"}
await websocket.send_json(ret)
diff --git a/app/api/files.py b/app/api/files.py
index 8faa011..dd9166a 100644
--- a/app/api/files.py
+++ b/app/api/files.py
@@ -15,6 +15,7 @@
from app.models.user_model import UserModel
from app.service.basic import BasicService
from app.service.bisheng import BishengService
+from app.service.difyService import DifyService
from app.service.ragflow import RagflowService
from app.service.service_token import get_ragflow_token, get_bisheng_token
import urllib.parse
@@ -93,6 +94,22 @@
service = BasicService(base_url=settings.basic_paper_url)
result = await service.paper_file_upload(chat_id, file.filename, file_content)
+ elif agent.agent_type == AgentType.DIFY:
+ file = file[0]
+ # 璇诲彇涓婁紶鐨勬枃浠跺唴瀹�
+ try:
+ file_content = await file.read()
+ except Exception as e:
+ return Response(code=400, msg=str(e))
+ dify_service = DifyService(base_url=settings.dify_base_url)
+ try:
+ token = get_bisheng_token(db, current_user.id)
+ result = await dify_service.upload(token, file.filename, file_content)
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=str(e))
+ result["file_name"] = file.filename
+ return Response(code=200, msg="", data=result)
+
return Response(code=200, msg="", data=result)
diff --git a/app/config/config.py b/app/config/config.py
index 271c588..3c97edc 100644
--- a/app/config/config.py
+++ b/app/config/config.py
@@ -17,6 +17,8 @@
PASSWORD_KEY: str
basic_base_url: str = ''
basic_paper_url: str = ''
+ dify_base_url: str = ''
+ dify_api_token: str = ''
def __init__(self, **kwargs):
# Check if all required fields are provided and set them
for field in self.__annotations__.keys():
diff --git a/app/config/config.yaml b/app/config/config.yaml
index 1c6842c..f8a53b0 100644
--- a/app/config/config.yaml
+++ b/app/config/config.yaml
@@ -14,4 +14,6 @@
fetch_fwr_agent: 鐭ヨ瘑闂瓟,鏅鸿兘闂瓟
PASSWORD_KEY: VKinqB-8XMrwCLLrcf_PyHyo12_4PVKvWzaHjNFions=
basic_base_url: http://192.168.20.231:8000
-basic_paper_url: http://192.168.20.231:8000
\ No newline at end of file
+basic_paper_url: http://192.168.20.231:8000
+dify_base_url: http://192.168.20.116
+dify_api_token: app-YmOAMDsPpDDlqryMHnc9TzTO
\ No newline at end of file
diff --git a/app/models/agent_model.py b/app/models/agent_model.py
index fbb2f45..f914cdd 100644
--- a/app/models/agent_model.py
+++ b/app/models/agent_model.py
@@ -9,6 +9,7 @@
RAGFLOW = 1
BISHENG = 2
BASIC = 3
+ DIFY = 4
class AgentModel(Base):
diff --git a/app/models/session_model.py b/app/models/session_model.py
index 9536471..fd513d2 100644
--- a/app/models/session_model.py
+++ b/app/models/session_model.py
@@ -17,6 +17,7 @@
update_date = Column(DateTime, default=current_time, onupdate=current_time) # 鏇存柊鏃堕棿锛岄粯璁ゅ�间负褰撳墠鏃跺尯鏃堕棿锛屾洿鏂版椂鑷姩鏇存柊
tenant_id = Column(Integer) # 鍒涘缓浜�
message = Column(TEXT) # 璇存槑
+ conversation_id = Column(String(64))
# to_dict 鏂规硶
def to_dict(self):
diff --git a/app/service/difyService.py b/app/service/difyService.py
new file mode 100644
index 0000000..6b6a59a
--- /dev/null
+++ b/app/service/difyService.py
@@ -0,0 +1,153 @@
+import json
+from datetime import datetime
+
+import httpx
+from typing import Union, Dict, List
+from fastapi import HTTPException
+from starlette import status
+from watchdog.observers.fsevents2 import message
+
+# from Log import logger
+from app.config.config import settings
+from app.utils.rsa_crypto import RagflowCrypto
+
+
+class DifyService:
+ def __init__(self, base_url: str):
+ self.base_url = base_url
+
+ def _handle_response(self, response: httpx.Response) -> Union[Dict, List]:
+ if response.status_code != 200:
+ return {}
+
+ data = response.json()
+ ret_code = data.get("retcode")
+ if ret_code == 401:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="鐧诲綍杩囨湡",
+ )
+ if ret_code != 0:
+ return {}
+
+ # 妫�鏌ヨ繑鍥炵殑鏁版嵁绫诲瀷
+ if isinstance(data.get("data"), dict):
+ return data.get("data", {})
+ elif isinstance(data.get("data"), list):
+ return data.get("data", [])
+ else:
+ return {}
+
+ async def register(self, username: str, password: str):
+ password = RagflowCrypto(settings.PUBLIC_KEY, settings.PRIVATE_KEY).encrypt(password)
+ async with httpx.AsyncClient() as client:
+ response = await client.post(
+ f"{self.base_url}/v1/user/register",
+ headers={'Content-Type': 'application/json'},
+ json={"nickname": username, "email": f"{username}@example.com", "password": password}
+ )
+ if response.status_code != 200:
+ raise Exception(f"Ragflow registration failed: {response.text}")
+ return self._handle_response(response)
+
+ async def login(self, username: str, password: str) -> str:
+ password = RagflowCrypto(settings.PUBLIC_KEY, settings.PRIVATE_KEY).encrypt(password)
+ async with httpx.AsyncClient() as client:
+ response = await client.post(
+ f"{self.base_url}/v1/user/login",
+ headers={'Content-Type': 'application/json'},
+ json={"email": f"{username}@example.com", "password": password}
+ )
+ if response.status_code != 200:
+ raise Exception(f"Ragflow login failed: {response.text}")
+ authorization = response.headers.get('Authorization')
+ if not authorization:
+ raise Exception("Authorization header not found in response")
+ return authorization
+
+ async def chat(self, token: str, chat_id: str, message: str, upload_file_id: str, conversation_id: str):
+
+ target_url = f"{self.base_url}/v1/chat-messages"
+ files = [
+ {
+ "type": "image",
+ "transfer_method": "remote_url",
+ "url": "https://cloud.dify.ai/logo/logo-site.png",
+ "upload_file_id":""
+ }
+ ]
+ if upload_file_id:
+ files[0]["transfer_method"] = "local_file"
+ files[0]["upload_file_id"] = upload_file_id
+ data = {
+ "inputs": {},
+ "query": message,
+ "response_mode": "streaming",
+ "conversation_id": conversation_id,
+ "user": chat_id,
+ "files": files
+ }
+
+ async with httpx.AsyncClient(timeout=300.0) as client:
+ headers = {
+ 'Content-Type': 'application/json',
+ 'Authorization': f'Bearer {token}'
+ }
+ async with client.stream("POST", target_url, data=json.dumps(data), headers=headers) as response:
+ if response.status_code == 200:
+ try:
+ async for answer in response.aiter_text():
+ print(f"response of ragflow chat: {answer}")
+ yield answer
+ except GeneratorExit as e:
+ print(e)
+ return
+ else:
+ yield f"Error: {response.status_code}"
+
+
+
+ async def get_session_history(self, token: str, chat_id: str, is_all: int=0):
+ url = f"{self.base_url}/v1/conversation/get?conversation_id={chat_id}"
+ headers = {"Authorization": token}
+ async with httpx.AsyncClient() as client:
+ response = await client.get(url, headers=headers)
+ data = self._handle_response(response)
+ # print("----------------data----------------------:", data)
+ if is_all:
+ return data
+ return data.get("message", [])
+
+ async def upload(self, token: str, filename: str, file: bytes) -> dict:
+ url = f"{self.base_url}/console/api/files/upload"
+ headers = {
+ 'Content-Type': 'application/json',
+ 'Authorization': f'Bearer {token}'
+ }
+
+ # 鍒涘缓琛ㄥ崟鏁版嵁锛屽寘鍚枃浠�
+ files = {"file": (filename, file)}
+ async with httpx.AsyncClient() as client:
+ response = await client.post(url, headers=headers, files=files)
+ data = self._handle_response(response)
+ # file_path = data.get("file_path", "")
+ result = {
+ "file_path": data
+ }
+
+ return result
+
+
+
+
+if __name__ == "__main__":
+ async def a():
+ a = DifyService("http://192.168.20.119:11080")
+ b = await a.get_knowledge_list("ImY3ZTZlZWQwYTY2NTExZWY5ZmFiMDI0MmFjMTMwMDA2Ig.Zzxwmw.uI_HAWzOkipQuga1aeQtoeIc3IM", 1,
+ 10)
+ print(b)
+
+ import asyncio
+
+ asyncio.run(a())
+
diff --git a/app/service/session.py b/app/service/session.py
index e43d8aa..e00bd3e 100644
--- a/app/service/session.py
+++ b/app/service/session.py
@@ -88,6 +88,7 @@
self.db.commit()
self.db.refresh(session)
except Exception as e:
+ logger.error(e)
self.db.rollback()
return session
--
Gitblit v1.8.0