From f95f801f35aa201cbaffd7d881c07edc9398b570 Mon Sep 17 00:00:00 2001
From: zhaoqingang <zhaoqg0118@163.com>
Date: 星期一, 03 三月 2025 16:03:51 +0800
Subject: [PATCH] 增加外接知识库中转接口
---
app/service/ragflow.py | 4
app/service/service_token.py | 7 +
app/api/chat.py | 3
app/models/v2/mindmap.py | 12 +++
app/api/v2/chat.py | 28 ++++++
app/config/agent_base_url.py | 1
app/models/v2/chat.py | 13 +++
app/service/auth.py | 2
app/service/v2/chat.py | 80 +++++++++++++++++++
app/config/env_conf/config119.yaml | 12 +-
app/service/dialog.py | 32 +++++++
app/task/fetch_agent.py | 11 ++
app/api/dialog.py | 14 ++
main.py | 2
app/models/dialog_model.py | 2
app/api/__init__.py | 8 +
app/config/env_conf/menu_conf.json | 4
17 files changed, 209 insertions(+), 26 deletions(-)
diff --git a/app/api/__init__.py b/app/api/__init__.py
index 09538a7..2a679e8 100644
--- a/app/api/__init__.py
+++ b/app/api/__init__.py
@@ -3,7 +3,7 @@
import jwt
# from cryptography.fernet import Fernet
-from fastapi import FastAPI, Depends, HTTPException
+from fastapi import FastAPI, Depends, HTTPException, Header
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
from pydantic import BaseModel
@@ -116,6 +116,12 @@
# 璁板綍寮傚父淇℃伅锛屼絾缁х画澶勭悊鍏朵粬鏂囦欢
print(f"Error processing file URL: {e}")
+def get_api_key(authorization: str = Header(...)):
+ if not authorization.startswith("Bearer "):
+ raise HTTPException(status_code=401, detail="Invalid Authorization header format.")
+ return authorization.split(" ")[1]
+
+
if __name__=="__main__":
diff --git a/app/api/chat.py b/app/api/chat.py
index 16003e2..d4b6dae 100644
--- a/app/api/chat.py
+++ b/app/api/chat.py
@@ -71,11 +71,12 @@
chat_history = message.get('chatHistory', [])
message["role"] = "user"
if len(chat_history) == 0:
+ print("----------------------", token)
chat_history = await ragflow_service.get_session_history(token, chat_id)
if len(chat_history) == 0:
chat_history = await ragflow_service.set_session(token, agent_id,
message, chat_id, True)
- # print("chat_history------------------------", chat_history)
+ print("chat_history------------------------", chat_history)
if len(chat_history) == 0:
result = {"message": "鍐呴儴閿欒锛氬垱寤轰細璇濆け璐�", "type": "close"}
await websocket.send_json(result)
diff --git a/app/api/dialog.py b/app/api/dialog.py
index 814932a..b11ef36 100644
--- a/app/api/dialog.py
+++ b/app/api/dialog.py
@@ -6,7 +6,7 @@
from app.models.base_model import get_db
from app.models.user_model import UserModel
from app.service.dialog import get_dialog_list, create_dialog_service, update_dialog_status_service, \
- delete_dialog_service, update_dialog_icon_service, get_dialog_manage_list
+ delete_dialog_service, update_dialog_icon_service, get_dialog_manage_list, sync_dialog_service
dialog_router = APIRouter()
@@ -62,7 +62,7 @@
@dialog_router.put("/update_icon", response_model=Response)
async def change_dialog_icon(dialog: dialogDataUpdate, current_user: UserModel = Depends(get_current_user), db=Depends(get_db)):
- is_create = await update_dialog_icon_service(db, dialog.id, dialog.icon)
+ is_create = await update_dialog_icon_service(db, dialog.id, dialog.icon, dialog.name, dialog.description)
if not is_create:
return Response(code=500, msg="dialog update failure", data={})
return Response(code=200, msg="dialog update success", data={})
@@ -72,4 +72,12 @@
async def dialog_list_api(dialog:dialogList,
current_user: UserModel = Depends(get_current_user),
db=Depends(get_db)):
- return Response(code=200, msg="", data=await get_dialog_manage_list(db, current_user.id, dialog.keyword, dialog.label, dialog.status, dialog.pageSize, dialog.current, dialog.mode))
\ No newline at end of file
+ return Response(code=200, msg="", data=await get_dialog_manage_list(db, current_user.id, dialog.keyword, dialog.label, dialog.status, dialog.pageSize, dialog.current, dialog.mode))
+
+
+@dialog_router.get("/sync", response_model=Response)
+async def sync_dialog_api(dialogId: str, current_user: UserModel = Depends(get_current_user), db=Depends(get_db)):
+ is_create = await sync_dialog_service(db, dialogId)
+ if not is_create:
+ return Response(code=500, msg="dialog update failure", data={})
+ return Response(code=200, msg="dialog update success", data={})
\ No newline at end of file
diff --git a/app/api/v2/chat.py b/app/api/v2/chat.py
index a09ab43..6f78896 100644
--- a/app/api/v2/chat.py
+++ b/app/api/v2/chat.py
@@ -7,15 +7,16 @@
from starlette.responses import StreamingResponse, Response
from werkzeug.http import HTTP_STATUS_CODES
-from app.api import get_current_user
+from app.api import get_current_user, get_api_key
from app.config.const import dialog_chat, advanced_chat, base_chat, agent_chat, workflow_chat, basic_chat, \
smart_message_error, http_400, http_500, http_200
from app.models import UserModel
from app.models.base_model import get_db
+from app.models.v2.chat import RetrievalRequest
from app.models.v2.session_model import ChatData
from app.service.v2.chat import service_chat_dialog, get_chat_info, service_chat_basic, \
service_chat_workflow, service_chat_parameters, service_chat_sessions, service_chat_upload, \
- service_chat_sessions_list, service_chat_session_log
+ service_chat_sessions_list, service_chat_session_log, service_chunk_retrieval, service_base_chunk_retrieval
chat_router_v2 = APIRouter()
@@ -36,6 +37,7 @@
media_type="text/event-stream")
if not session_id:
session = await service_chat_sessions(db, chatId, dialog.query)
+ print(session)
if not session or session.get("code") != 0:
error_msg = json.dumps(
{"message": smart_message_error, "error": "\n**ERROR**: chat agent error", "status": http_500})
@@ -123,4 +125,24 @@
@chat_router_v2.get("/chat/session_log")
async def api_chat_sessions(sessionId:str, current_user: UserModel = Depends(get_current_user), db: Session = Depends(get_db)): # current_user: UserModel = Depends(get_current_user)
data = await service_chat_session_log(db, sessionId)
- return Response(data, media_type="application/json", status_code=http_200)
\ No newline at end of file
+ return Response(data, media_type="application/json", status_code=http_200)
+
+
+
+# @chat_router_v2.post("/conversation/mindmap")
+# async def api_conversation_mindmap(chatId:str, current:int=1, current_user: UserModel = Depends(get_current_user), db: Session = Depends(get_db)): # current_user: UserModel = Depends(get_current_user)
+# data = await service_chat_sessions_list(db, chatId, current, pageSize, current_user.id, keyword)
+# return Response(data, media_type="application/json", status_code=http_200)
+
+
+
+@chat_router_v2.post("/multi/retrieval")
+async def retrieve_chunks(request_data: RetrievalRequest, api_key: str = Depends(get_api_key)):
+ records = await service_chunk_retrieval(request_data.query, request_data.retrieval_setting.top_k, request_data.retrieval_setting.score_threshold, api_key)
+ return {"records": records}
+
+
+@chat_router_v2.post("/retrieval")
+async def retrieve_chunks(request_data: RetrievalRequest, api_key: str = Depends(get_api_key)):
+ records = await service_base_chunk_retrieval(request_data.query,request_data.knowledge_id, request_data.retrieval_setting.top_k, request_data.retrieval_setting.score_threshold, api_key)
+ return {"records": records}
diff --git a/app/config/agent_base_url.py b/app/config/agent_base_url.py
index 3e4c48f..48a35f1 100644
--- a/app/config/agent_base_url.py
+++ b/app/config/agent_base_url.py
@@ -6,6 +6,7 @@
RG_APP_TOKEN_LIST= "/v1/system/token_list"
RG_USER_LOGIN= "/v1/user/login"
RG_PING= "/v1/system/version"
+RG_ORIGINAL_URL = "/api/v1/retrieval"
### ----------
DF_CHAT_AGENT= "/v1/chat-messages"
diff --git a/app/config/env_conf/config116.yaml b/app/config/env_conf/config119.yaml
similarity index 82%
rename from app/config/env_conf/config116.yaml
rename to app/config/env_conf/config119.yaml
index 5bdca69..297cfab 100644
--- a/app/config/env_conf/config116.yaml
+++ b/app/config/env_conf/config119.yaml
@@ -1,8 +1,8 @@
secret_key: your-secret-key
-sgb_base_url: http://192.168.20.116:13001
-sgb_websocket_url: ws://192.168.20.116:13001
+sgb_base_url: http://192.168.20.119:13001
+sgb_websocket_url: ws://192.168.20.119:13001
fwr_base_url: http://192.168.20.116:11080
-database_url: mysql+pymysql://root:rag_gateway@192.168.20.116:23306/rag_gateway
+database_url: mysql+pymysql://root:infini_rag_flow@192.168.20.119:5455/rag_basic
sgb_db_url: mysql+pymysql://root:1234@192.168.20.119:13306/bisheng
fwr_db_url: mysql+pymysql://root:infini_rag_flow@192.168.20.116:15455/rag_flow
PUBLIC_KEY: |
@@ -15,12 +15,12 @@
PASSWORD_KEY: VKinqB-8XMrwCLLrcf_PyHyo12_4PVKvWzaHjNFions=
basic_base_url: http://192.168.20.231:8000
basic_paper_url: http://192.168.20.231:8000
-dify_base_url: http://192.168.20.116
+dify_base_url: http://192.168.20.119:13002
dify_api_token: app-YmOAMDsPpDDlqryMHnc9TzTO
-postgresql_database_url: postgresql+asyncpg://kong:kongpass@192.168.20.116:15433/kong
+postgresql_database_url: postgresql+asyncpg://kong:kongpass@192.168.20.119:5432/kong
dify_workflow_clean: app-OpF0drPu0XcgqcekQpT4FA8a
dify_workflow_report: app-0MAkdFWqh9zxwmU69O0BFU1s
-dify_database_url: postgresql+psycopg2://postgres:difyai123456@192.168.20.116:15432/dify
+dify_database_url: postgresql+psycopg2://postgres:difyai123456@192.168.20.119:15432/dify
diff --git a/app/config/env_conf/menu_conf.json b/app/config/env_conf/menu_conf.json
index abd265d..9418fb8 100644
--- a/app/config/env_conf/menu_conf.json
+++ b/app/config/env_conf/menu_conf.json
@@ -27,8 +27,8 @@
"rank": 105,
"dialog": [
{
- "id": "6b8ee426c67511efb1510242ac1b0006",
- "chat_id": "6b8ee426c67511efb1510242ac1b0006",
+ "id": "ee0a3e38f5c211efb7600242ac1a0006",
+ "chat_id": "ee0a3e38f5c211efb7600242ac1a0006",
"chat_type": "knowledgeQA",
"agentType": 1
}
diff --git a/app/models/dialog_model.py b/app/models/dialog_model.py
index 2abb060..50a8f8e 100644
--- a/app/models/dialog_model.py
+++ b/app/models/dialog_model.py
@@ -83,6 +83,8 @@
id: str
status: Optional[str] = "1"
icon: Optional[str] = ""
+ name: Optional[str] = ""
+ description: Optional[str] = None
class dialogList(BaseModel):
diff --git a/app/models/v2/chat.py b/app/models/v2/chat.py
new file mode 100644
index 0000000..3abe304
--- /dev/null
+++ b/app/models/v2/chat.py
@@ -0,0 +1,13 @@
+from pydantic import BaseModel
+
+
+
+class RetrievalSetting(BaseModel):
+ top_k: int
+ score_threshold: float
+
+
+class RetrievalRequest(BaseModel):
+ knowledge_id: str
+ query: str
+ retrieval_setting: RetrievalSetting
\ No newline at end of file
diff --git a/app/models/v2/mindmap.py b/app/models/v2/mindmap.py
new file mode 100644
index 0000000..cead556
--- /dev/null
+++ b/app/models/v2/mindmap.py
@@ -0,0 +1,12 @@
+import json
+from typing import Optional, Type, List
+from pydantic import BaseModel
+
+
+
+
+class ChatData(BaseModel):
+ sessionId: Optional[str] = ""
+
+ class Config:
+ extra = 'allow' # 鍏佽鍏朵粬鍔ㄦ�佸瓧娈�
\ No newline at end of file
diff --git a/app/service/auth.py b/app/service/auth.py
index dcf1ebb..ffdece9 100644
--- a/app/service/auth.py
+++ b/app/service/auth.py
@@ -22,7 +22,7 @@
SECRET_KEY = settings.secret_key
ALGORITHM = "HS256"
-ACCESS_TOKEN_EXPIRE_MINUTES = 3000
+ACCESS_TOKEN_EXPIRE_MINUTES = 24*60
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
diff --git a/app/service/dialog.py b/app/service/dialog.py
index 3b965f2..34e4711 100644
--- a/app/service/dialog.py
+++ b/app/service/dialog.py
@@ -5,13 +5,14 @@
from app.config.agent_base_url import DF_CHAT_PARAMETERS, DF_CHAT_API_KEY
from app.config.config import settings
-from app.config.const import Dialog_STATSU_DELETE, DF_TYPE, Dialog_STATSU_ON, workflow_server
+from app.config.const import Dialog_STATSU_DELETE, DF_TYPE, Dialog_STATSU_ON, workflow_server, RG_TYPE
from app.models import KnowledgeModel, GroupModel, DialogModel, ConversationModel, group_dialog_table, LabelWorkerModel, \
LabelModel, ApiTokenModel
from app.models.user_model import UserModel, UserTokenModel
from Log import logger
from app.service.v2.app_driver.chat_data import ChatBaseApply
from app.service.v2.chat import get_chat_token, add_chat_token, get_app_token
+from app.task.fetch_agent import get_one_from_ragflow_dialog
async def get_dialog_list(db, user_id, keyword, label, status, page_size, page_index):
@@ -173,9 +174,14 @@
return True
-async def update_dialog_icon_service(db, dialog_id, icon):
+async def update_dialog_icon_service(db, dialog_id, icon, name, description):
+ update = {"icon": icon, "update_date": datetime.now()}
+ if name:
+ update["name"] = name
+ if description or description == "":
+ update["description"] = description
try:
- db.query(DialogModel).filter_by(id=dialog_id).update({"icon": icon, "update_date": datetime.now()})
+ db.query(DialogModel).filter_by(id=dialog_id).update(update)
db.commit()
except Exception as e:
logger.error(e)
@@ -228,3 +234,23 @@
r["user"] = user_dict.get(r["user_id"], {})
r["label"] = label_dict.get(r["id"], [])
return {"total": total, "rows": rows}
+
+
+
+async def sync_dialog_service(db, dialog_id):
+ dialog = db.query(DialogModel).filter(DialogModel.id == dialog_id).first()
+ if dialog and dialog.dialog_type == RG_TYPE:
+ try:
+ app_dialog = get_one_from_ragflow_dialog(dialog_id)
+ if app_dialog:
+ dialog.name = app_dialog["name"]
+ dialog.description = app_dialog["description"]
+ dialog.update_date = datetime.now()
+ db.add(dialog)
+ db.commit()
+ db.refresh(dialog)
+ except Exception as e:
+ logger.error(e)
+ db.rollback()
+ return False
+ return True
diff --git a/app/service/ragflow.py b/app/service/ragflow.py
index f0e6fd8..45961a6 100644
--- a/app/service/ragflow.py
+++ b/app/service/ragflow.py
@@ -19,7 +19,7 @@
return {}
data = response.json()
- ret_code = data.get("retcode")
+ ret_code = data.get("retcode", data.get("code"))
if ret_code == 401:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@@ -136,6 +136,8 @@
}
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=data)
+ print(response.status_code)
+ print(response.text)
data = self._handle_response(response)
return [
{
diff --git a/app/service/service_token.py b/app/service/service_token.py
index 6383b6a..ccbe489 100644
--- a/app/service/service_token.py
+++ b/app/service/service_token.py
@@ -1,7 +1,7 @@
from Log import logger
from app.config.config import settings
-from app.config.const import BISHENG, RAGFLOW, DIFY
-from app.models import UserModel, UserAppModel
+from app.config.const import BISHENG, RAGFLOW, DIFY, chat_server
+from app.models import UserModel, UserAppModel, UserTokenModel
from app.models.token_model import TokenModel
from app.service.auth import UserAppDao
from app.service.bisheng import BishengService
@@ -25,13 +25,14 @@
async def get_ragflow_token(db, user_id: int):
# token = await UserAppDao(db).get_data_by_id(user_id, RAGFLOW)
token = db.query(TokenModel).filter(TokenModel.user_id == user_id).first()
+ token = db.query(UserTokenModel).filter(UserTokenModel.id == chat_server).first()
if not token:
token = await UserAppDao(db).get_data_by_id(user_id, RAGFLOW)
if not token:
return None
return token.access_token
else:
- return token.ragflow_token
+ return token.access_token
async def get_dify_token(db, user_id: int):
diff --git a/app/service/v2/chat.py b/app/service/v2/chat.py
index a24f88d..33634a9 100644
--- a/app/service/v2/chat.py
+++ b/app/service/v2/chat.py
@@ -1,11 +1,13 @@
+import asyncio
import io
import json
import fitz
+from fastapi import HTTPException
from Log import logger
from app.config.agent_base_url import RG_CHAT_DIALOG, DF_CHAT_AGENT, DF_CHAT_PARAMETERS, RG_CHAT_SESSIONS, \
- DF_CHAT_WORKFLOW, DF_UPLOAD_FILE
+ DF_CHAT_WORKFLOW, DF_UPLOAD_FILE, RG_ORIGINAL_URL
from app.config.config import settings
from app.config.const import *
from app.models import DialogModel, ApiTokenModel, UserTokenModel
@@ -258,6 +260,7 @@
async def service_chat_sessions(db, chat_id, name):
token = await get_chat_token(db, rg_api_token)
+ # print(token)
if not token:
return {}
url = settings.fwr_base_url + RG_CHAT_SESSIONS.format(chat_id)
@@ -343,3 +346,78 @@
text = await read_word(file)
return await get_str_token(text)
+
+
+async def service_chunk_retrieval(query, top_k, similarity_threshold, api_key):
+ print(query)
+
+ try:
+ request_data = json.loads(query)
+ except json.JSONDecodeError as e:
+ fixed_json = query.replace("'", '"')
+ print("Fixed JSON:", fixed_json)
+ request_data = json.loads(fixed_json)
+ payload = {
+ "question": request_data.get("query", ""),
+ "dataset_ids": request_data.get("dataset_ids", []),
+ "page_size": top_k,
+ "similarity_threshold": similarity_threshold
+ }
+ url = settings.fwr_base_url + RG_ORIGINAL_URL
+ # url = "http://192.168.20.116:11080/" + RG_ORIGINAL_URL
+ chat = ChatBaseApply()
+ response = await chat.chat_post(url, payload, await chat.get_headers(api_key))
+ if not response:
+ raise HTTPException(status_code=500, detail="鏈嶅姟寮傚父锛�")
+ print(response)
+ records = [
+ {
+ "content": chunk["content"],
+ "score": chunk["similarity"],
+ "title": chunk.get("document_keyword", "Unknown Document"),
+ "metadata": {"document_id": chunk["document_id"],
+ "path": f"{settings.fwr_base_url}/document/{chunk['document_id']}?ext={chunk.get('document_keyword').split('.')[-1]}&prefix=document",
+ 'highlight': chunk.get("highlight") , "image_id": chunk.get("image_id"), "positions": chunk.get("positions"),}
+ }
+ for chunk in response.get("data", {}).get("chunks", [])
+ ]
+ return records
+
+async def service_base_chunk_retrieval(query, knowledge_id, top_k, similarity_threshold, api_key):
+ # request_data = json.loads(query)
+ payload = {
+ "question": query,
+ "dataset_ids": [knowledge_id],
+ "page_size": top_k,
+ "similarity_threshold": similarity_threshold
+ }
+ url = settings.fwr_base_url + RG_ORIGINAL_URL
+ # url = "http://192.168.20.116:11080/" + RG_ORIGINAL_URL
+ chat = ChatBaseApply()
+ response = await chat.chat_post(url, payload, await chat.get_headers(api_key))
+ if not response:
+ raise HTTPException(status_code=500, detail="鏈嶅姟寮傚父锛�")
+ records = [
+ {
+ "content": chunk["content"],
+ "score": chunk["similarity"],
+ "title": chunk.get("document_keyword", "Unknown Document"),
+ "metadata": {"document_id": chunk["document_id"]}
+ }
+ for chunk in response.get("data", {}).get("chunks", [])
+ ]
+ return records
+
+
+
+if __name__ == "__main__":
+ q = json.dumps({"query": "璁惧", "dataset_ids": ["fc68db52f43111efb94a0242ac120004"]})
+ top_k = 2
+ similarity_threshold = 0.5
+ api_key = "ragflow-Y4MGYwY2JlZjM2YjExZWY4ZWU5MDI0Mm"
+ # a = service_chunk_retrieval(q, top_k, similarity_threshold, api_key)
+ # print(a)
+ async def a():
+ b = await service_chunk_retrieval(q, top_k, similarity_threshold, api_key)
+ print(b)
+ asyncio.run(a())
\ No newline at end of file
diff --git a/app/task/fetch_agent.py b/app/task/fetch_agent.py
index fde6d3a..a5a7bfb 100644
--- a/app/task/fetch_agent.py
+++ b/app/task/fetch_agent.py
@@ -404,6 +404,17 @@
db.close()
+def get_one_from_ragflow_dialog(dialog_id):
+ db = SessionRagflow()
+ try:
+ row = db.query(Dialog.id, Dialog.name, Dialog.description, Dialog.status, Dialog.tenant_id) \
+ .filter(Dialog.id==dialog_id).first()
+ return {"id": row[0], "name": row[1], "description": row[2], "status": str(row[3]),
+ "user_id": str(row[4])} if row else {}
+ finally:
+ db.close()
+
+
def sync_knowledge():
db = SessionLocal()
diff --git a/main.py b/main.py
index 05494b9..baad0e8 100644
--- a/main.py
+++ b/main.py
@@ -44,7 +44,7 @@
# sync_agents()
- await sync_default_data()
+ # await sync_default_data() # Todo
sync_agents_v2() # 鏅鸿兘浣�
sync_knowledge() # 鐭ヨ瘑搴�
sync_resources_from_json()
--
Gitblit v1.8.0