From f95f801f35aa201cbaffd7d881c07edc9398b570 Mon Sep 17 00:00:00 2001 From: zhaoqingang <zhaoqg0118@163.com> Date: 星期一, 03 三月 2025 16:03:51 +0800 Subject: [PATCH] 增加外接知识库中转接口 --- app/service/ragflow.py | 4 app/service/service_token.py | 7 + app/api/chat.py | 3 app/models/v2/mindmap.py | 12 +++ app/api/v2/chat.py | 28 ++++++ app/config/agent_base_url.py | 1 app/models/v2/chat.py | 13 +++ app/service/auth.py | 2 app/service/v2/chat.py | 80 +++++++++++++++++++ app/config/env_conf/config119.yaml | 12 +- app/service/dialog.py | 32 +++++++ app/task/fetch_agent.py | 11 ++ app/api/dialog.py | 14 ++ main.py | 2 app/models/dialog_model.py | 2 app/api/__init__.py | 8 + app/config/env_conf/menu_conf.json | 4 17 files changed, 209 insertions(+), 26 deletions(-) diff --git a/app/api/__init__.py b/app/api/__init__.py index 09538a7..2a679e8 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -3,7 +3,7 @@ import jwt # from cryptography.fernet import Fernet -from fastapi import FastAPI, Depends, HTTPException +from fastapi import FastAPI, Depends, HTTPException, Header from fastapi.security import OAuth2PasswordBearer from passlib.context import CryptContext from pydantic import BaseModel @@ -116,6 +116,12 @@ # 璁板綍寮傚父淇℃伅锛屼絾缁х画澶勭悊鍏朵粬鏂囦欢 print(f"Error processing file URL: {e}") +def get_api_key(authorization: str = Header(...)): + if not authorization.startswith("Bearer "): + raise HTTPException(status_code=401, detail="Invalid Authorization header format.") + return authorization.split(" ")[1] + + if __name__=="__main__": diff --git a/app/api/chat.py b/app/api/chat.py index 16003e2..d4b6dae 100644 --- a/app/api/chat.py +++ b/app/api/chat.py @@ -71,11 +71,12 @@ chat_history = message.get('chatHistory', []) message["role"] = "user" if len(chat_history) == 0: + print("----------------------", token) chat_history = await ragflow_service.get_session_history(token, chat_id) if len(chat_history) == 0: chat_history = await ragflow_service.set_session(token, agent_id, message, chat_id, True) - # print("chat_history------------------------", chat_history) + print("chat_history------------------------", chat_history) if len(chat_history) == 0: result = {"message": "鍐呴儴閿欒锛氬垱寤轰細璇濆け璐�", "type": "close"} await websocket.send_json(result) diff --git a/app/api/dialog.py b/app/api/dialog.py index 814932a..b11ef36 100644 --- a/app/api/dialog.py +++ b/app/api/dialog.py @@ -6,7 +6,7 @@ from app.models.base_model import get_db from app.models.user_model import UserModel from app.service.dialog import get_dialog_list, create_dialog_service, update_dialog_status_service, \ - delete_dialog_service, update_dialog_icon_service, get_dialog_manage_list + delete_dialog_service, update_dialog_icon_service, get_dialog_manage_list, sync_dialog_service dialog_router = APIRouter() @@ -62,7 +62,7 @@ @dialog_router.put("/update_icon", response_model=Response) async def change_dialog_icon(dialog: dialogDataUpdate, current_user: UserModel = Depends(get_current_user), db=Depends(get_db)): - is_create = await update_dialog_icon_service(db, dialog.id, dialog.icon) + is_create = await update_dialog_icon_service(db, dialog.id, dialog.icon, dialog.name, dialog.description) if not is_create: return Response(code=500, msg="dialog update failure", data={}) return Response(code=200, msg="dialog update success", data={}) @@ -72,4 +72,12 @@ async def dialog_list_api(dialog:dialogList, current_user: UserModel = Depends(get_current_user), db=Depends(get_db)): - return Response(code=200, msg="", data=await get_dialog_manage_list(db, current_user.id, dialog.keyword, dialog.label, dialog.status, dialog.pageSize, dialog.current, dialog.mode)) \ No newline at end of file + return Response(code=200, msg="", data=await get_dialog_manage_list(db, current_user.id, dialog.keyword, dialog.label, dialog.status, dialog.pageSize, dialog.current, dialog.mode)) + + +@dialog_router.get("/sync", response_model=Response) +async def sync_dialog_api(dialogId: str, current_user: UserModel = Depends(get_current_user), db=Depends(get_db)): + is_create = await sync_dialog_service(db, dialogId) + if not is_create: + return Response(code=500, msg="dialog update failure", data={}) + return Response(code=200, msg="dialog update success", data={}) \ No newline at end of file diff --git a/app/api/v2/chat.py b/app/api/v2/chat.py index a09ab43..6f78896 100644 --- a/app/api/v2/chat.py +++ b/app/api/v2/chat.py @@ -7,15 +7,16 @@ from starlette.responses import StreamingResponse, Response from werkzeug.http import HTTP_STATUS_CODES -from app.api import get_current_user +from app.api import get_current_user, get_api_key from app.config.const import dialog_chat, advanced_chat, base_chat, agent_chat, workflow_chat, basic_chat, \ smart_message_error, http_400, http_500, http_200 from app.models import UserModel from app.models.base_model import get_db +from app.models.v2.chat import RetrievalRequest from app.models.v2.session_model import ChatData from app.service.v2.chat import service_chat_dialog, get_chat_info, service_chat_basic, \ service_chat_workflow, service_chat_parameters, service_chat_sessions, service_chat_upload, \ - service_chat_sessions_list, service_chat_session_log + service_chat_sessions_list, service_chat_session_log, service_chunk_retrieval, service_base_chunk_retrieval chat_router_v2 = APIRouter() @@ -36,6 +37,7 @@ media_type="text/event-stream") if not session_id: session = await service_chat_sessions(db, chatId, dialog.query) + print(session) if not session or session.get("code") != 0: error_msg = json.dumps( {"message": smart_message_error, "error": "\n**ERROR**: chat agent error", "status": http_500}) @@ -123,4 +125,24 @@ @chat_router_v2.get("/chat/session_log") async def api_chat_sessions(sessionId:str, current_user: UserModel = Depends(get_current_user), db: Session = Depends(get_db)): # current_user: UserModel = Depends(get_current_user) data = await service_chat_session_log(db, sessionId) - return Response(data, media_type="application/json", status_code=http_200) \ No newline at end of file + return Response(data, media_type="application/json", status_code=http_200) + + + +# @chat_router_v2.post("/conversation/mindmap") +# async def api_conversation_mindmap(chatId:str, current:int=1, current_user: UserModel = Depends(get_current_user), db: Session = Depends(get_db)): # current_user: UserModel = Depends(get_current_user) +# data = await service_chat_sessions_list(db, chatId, current, pageSize, current_user.id, keyword) +# return Response(data, media_type="application/json", status_code=http_200) + + + +@chat_router_v2.post("/multi/retrieval") +async def retrieve_chunks(request_data: RetrievalRequest, api_key: str = Depends(get_api_key)): + records = await service_chunk_retrieval(request_data.query, request_data.retrieval_setting.top_k, request_data.retrieval_setting.score_threshold, api_key) + return {"records": records} + + +@chat_router_v2.post("/retrieval") +async def retrieve_chunks(request_data: RetrievalRequest, api_key: str = Depends(get_api_key)): + records = await service_base_chunk_retrieval(request_data.query,request_data.knowledge_id, request_data.retrieval_setting.top_k, request_data.retrieval_setting.score_threshold, api_key) + return {"records": records} diff --git a/app/config/agent_base_url.py b/app/config/agent_base_url.py index 3e4c48f..48a35f1 100644 --- a/app/config/agent_base_url.py +++ b/app/config/agent_base_url.py @@ -6,6 +6,7 @@ RG_APP_TOKEN_LIST= "/v1/system/token_list" RG_USER_LOGIN= "/v1/user/login" RG_PING= "/v1/system/version" +RG_ORIGINAL_URL = "/api/v1/retrieval" ### ---------- DF_CHAT_AGENT= "/v1/chat-messages" diff --git a/app/config/env_conf/config116.yaml b/app/config/env_conf/config119.yaml similarity index 82% rename from app/config/env_conf/config116.yaml rename to app/config/env_conf/config119.yaml index 5bdca69..297cfab 100644 --- a/app/config/env_conf/config116.yaml +++ b/app/config/env_conf/config119.yaml @@ -1,8 +1,8 @@ secret_key: your-secret-key -sgb_base_url: http://192.168.20.116:13001 -sgb_websocket_url: ws://192.168.20.116:13001 +sgb_base_url: http://192.168.20.119:13001 +sgb_websocket_url: ws://192.168.20.119:13001 fwr_base_url: http://192.168.20.116:11080 -database_url: mysql+pymysql://root:rag_gateway@192.168.20.116:23306/rag_gateway +database_url: mysql+pymysql://root:infini_rag_flow@192.168.20.119:5455/rag_basic sgb_db_url: mysql+pymysql://root:1234@192.168.20.119:13306/bisheng fwr_db_url: mysql+pymysql://root:infini_rag_flow@192.168.20.116:15455/rag_flow PUBLIC_KEY: | @@ -15,12 +15,12 @@ PASSWORD_KEY: VKinqB-8XMrwCLLrcf_PyHyo12_4PVKvWzaHjNFions= basic_base_url: http://192.168.20.231:8000 basic_paper_url: http://192.168.20.231:8000 -dify_base_url: http://192.168.20.116 +dify_base_url: http://192.168.20.119:13002 dify_api_token: app-YmOAMDsPpDDlqryMHnc9TzTO -postgresql_database_url: postgresql+asyncpg://kong:kongpass@192.168.20.116:15433/kong +postgresql_database_url: postgresql+asyncpg://kong:kongpass@192.168.20.119:5432/kong dify_workflow_clean: app-OpF0drPu0XcgqcekQpT4FA8a dify_workflow_report: app-0MAkdFWqh9zxwmU69O0BFU1s -dify_database_url: postgresql+psycopg2://postgres:difyai123456@192.168.20.116:15432/dify +dify_database_url: postgresql+psycopg2://postgres:difyai123456@192.168.20.119:15432/dify diff --git a/app/config/env_conf/menu_conf.json b/app/config/env_conf/menu_conf.json index abd265d..9418fb8 100644 --- a/app/config/env_conf/menu_conf.json +++ b/app/config/env_conf/menu_conf.json @@ -27,8 +27,8 @@ "rank": 105, "dialog": [ { - "id": "6b8ee426c67511efb1510242ac1b0006", - "chat_id": "6b8ee426c67511efb1510242ac1b0006", + "id": "ee0a3e38f5c211efb7600242ac1a0006", + "chat_id": "ee0a3e38f5c211efb7600242ac1a0006", "chat_type": "knowledgeQA", "agentType": 1 } diff --git a/app/models/dialog_model.py b/app/models/dialog_model.py index 2abb060..50a8f8e 100644 --- a/app/models/dialog_model.py +++ b/app/models/dialog_model.py @@ -83,6 +83,8 @@ id: str status: Optional[str] = "1" icon: Optional[str] = "" + name: Optional[str] = "" + description: Optional[str] = None class dialogList(BaseModel): diff --git a/app/models/v2/chat.py b/app/models/v2/chat.py new file mode 100644 index 0000000..3abe304 --- /dev/null +++ b/app/models/v2/chat.py @@ -0,0 +1,13 @@ +from pydantic import BaseModel + + + +class RetrievalSetting(BaseModel): + top_k: int + score_threshold: float + + +class RetrievalRequest(BaseModel): + knowledge_id: str + query: str + retrieval_setting: RetrievalSetting \ No newline at end of file diff --git a/app/models/v2/mindmap.py b/app/models/v2/mindmap.py new file mode 100644 index 0000000..cead556 --- /dev/null +++ b/app/models/v2/mindmap.py @@ -0,0 +1,12 @@ +import json +from typing import Optional, Type, List +from pydantic import BaseModel + + + + +class ChatData(BaseModel): + sessionId: Optional[str] = "" + + class Config: + extra = 'allow' # 鍏佽鍏朵粬鍔ㄦ�佸瓧娈� \ No newline at end of file diff --git a/app/service/auth.py b/app/service/auth.py index dcf1ebb..ffdece9 100644 --- a/app/service/auth.py +++ b/app/service/auth.py @@ -22,7 +22,7 @@ SECRET_KEY = settings.secret_key ALGORITHM = "HS256" -ACCESS_TOKEN_EXPIRE_MINUTES = 3000 +ACCESS_TOKEN_EXPIRE_MINUTES = 24*60 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") diff --git a/app/service/dialog.py b/app/service/dialog.py index 3b965f2..34e4711 100644 --- a/app/service/dialog.py +++ b/app/service/dialog.py @@ -5,13 +5,14 @@ from app.config.agent_base_url import DF_CHAT_PARAMETERS, DF_CHAT_API_KEY from app.config.config import settings -from app.config.const import Dialog_STATSU_DELETE, DF_TYPE, Dialog_STATSU_ON, workflow_server +from app.config.const import Dialog_STATSU_DELETE, DF_TYPE, Dialog_STATSU_ON, workflow_server, RG_TYPE from app.models import KnowledgeModel, GroupModel, DialogModel, ConversationModel, group_dialog_table, LabelWorkerModel, \ LabelModel, ApiTokenModel from app.models.user_model import UserModel, UserTokenModel from Log import logger from app.service.v2.app_driver.chat_data import ChatBaseApply from app.service.v2.chat import get_chat_token, add_chat_token, get_app_token +from app.task.fetch_agent import get_one_from_ragflow_dialog async def get_dialog_list(db, user_id, keyword, label, status, page_size, page_index): @@ -173,9 +174,14 @@ return True -async def update_dialog_icon_service(db, dialog_id, icon): +async def update_dialog_icon_service(db, dialog_id, icon, name, description): + update = {"icon": icon, "update_date": datetime.now()} + if name: + update["name"] = name + if description or description == "": + update["description"] = description try: - db.query(DialogModel).filter_by(id=dialog_id).update({"icon": icon, "update_date": datetime.now()}) + db.query(DialogModel).filter_by(id=dialog_id).update(update) db.commit() except Exception as e: logger.error(e) @@ -228,3 +234,23 @@ r["user"] = user_dict.get(r["user_id"], {}) r["label"] = label_dict.get(r["id"], []) return {"total": total, "rows": rows} + + + +async def sync_dialog_service(db, dialog_id): + dialog = db.query(DialogModel).filter(DialogModel.id == dialog_id).first() + if dialog and dialog.dialog_type == RG_TYPE: + try: + app_dialog = get_one_from_ragflow_dialog(dialog_id) + if app_dialog: + dialog.name = app_dialog["name"] + dialog.description = app_dialog["description"] + dialog.update_date = datetime.now() + db.add(dialog) + db.commit() + db.refresh(dialog) + except Exception as e: + logger.error(e) + db.rollback() + return False + return True diff --git a/app/service/ragflow.py b/app/service/ragflow.py index f0e6fd8..45961a6 100644 --- a/app/service/ragflow.py +++ b/app/service/ragflow.py @@ -19,7 +19,7 @@ return {} data = response.json() - ret_code = data.get("retcode") + ret_code = data.get("retcode", data.get("code")) if ret_code == 401: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, @@ -136,6 +136,8 @@ } async with httpx.AsyncClient() as client: response = await client.post(url, headers=headers, json=data) + print(response.status_code) + print(response.text) data = self._handle_response(response) return [ { diff --git a/app/service/service_token.py b/app/service/service_token.py index 6383b6a..ccbe489 100644 --- a/app/service/service_token.py +++ b/app/service/service_token.py @@ -1,7 +1,7 @@ from Log import logger from app.config.config import settings -from app.config.const import BISHENG, RAGFLOW, DIFY -from app.models import UserModel, UserAppModel +from app.config.const import BISHENG, RAGFLOW, DIFY, chat_server +from app.models import UserModel, UserAppModel, UserTokenModel from app.models.token_model import TokenModel from app.service.auth import UserAppDao from app.service.bisheng import BishengService @@ -25,13 +25,14 @@ async def get_ragflow_token(db, user_id: int): # token = await UserAppDao(db).get_data_by_id(user_id, RAGFLOW) token = db.query(TokenModel).filter(TokenModel.user_id == user_id).first() + token = db.query(UserTokenModel).filter(UserTokenModel.id == chat_server).first() if not token: token = await UserAppDao(db).get_data_by_id(user_id, RAGFLOW) if not token: return None return token.access_token else: - return token.ragflow_token + return token.access_token async def get_dify_token(db, user_id: int): diff --git a/app/service/v2/chat.py b/app/service/v2/chat.py index a24f88d..33634a9 100644 --- a/app/service/v2/chat.py +++ b/app/service/v2/chat.py @@ -1,11 +1,13 @@ +import asyncio import io import json import fitz +from fastapi import HTTPException from Log import logger from app.config.agent_base_url import RG_CHAT_DIALOG, DF_CHAT_AGENT, DF_CHAT_PARAMETERS, RG_CHAT_SESSIONS, \ - DF_CHAT_WORKFLOW, DF_UPLOAD_FILE + DF_CHAT_WORKFLOW, DF_UPLOAD_FILE, RG_ORIGINAL_URL from app.config.config import settings from app.config.const import * from app.models import DialogModel, ApiTokenModel, UserTokenModel @@ -258,6 +260,7 @@ async def service_chat_sessions(db, chat_id, name): token = await get_chat_token(db, rg_api_token) + # print(token) if not token: return {} url = settings.fwr_base_url + RG_CHAT_SESSIONS.format(chat_id) @@ -343,3 +346,78 @@ text = await read_word(file) return await get_str_token(text) + + +async def service_chunk_retrieval(query, top_k, similarity_threshold, api_key): + print(query) + + try: + request_data = json.loads(query) + except json.JSONDecodeError as e: + fixed_json = query.replace("'", '"') + print("Fixed JSON:", fixed_json) + request_data = json.loads(fixed_json) + payload = { + "question": request_data.get("query", ""), + "dataset_ids": request_data.get("dataset_ids", []), + "page_size": top_k, + "similarity_threshold": similarity_threshold + } + url = settings.fwr_base_url + RG_ORIGINAL_URL + # url = "http://192.168.20.116:11080/" + RG_ORIGINAL_URL + chat = ChatBaseApply() + response = await chat.chat_post(url, payload, await chat.get_headers(api_key)) + if not response: + raise HTTPException(status_code=500, detail="鏈嶅姟寮傚父锛�") + print(response) + records = [ + { + "content": chunk["content"], + "score": chunk["similarity"], + "title": chunk.get("document_keyword", "Unknown Document"), + "metadata": {"document_id": chunk["document_id"], + "path": f"{settings.fwr_base_url}/document/{chunk['document_id']}?ext={chunk.get('document_keyword').split('.')[-1]}&prefix=document", + 'highlight': chunk.get("highlight") , "image_id": chunk.get("image_id"), "positions": chunk.get("positions"),} + } + for chunk in response.get("data", {}).get("chunks", []) + ] + return records + +async def service_base_chunk_retrieval(query, knowledge_id, top_k, similarity_threshold, api_key): + # request_data = json.loads(query) + payload = { + "question": query, + "dataset_ids": [knowledge_id], + "page_size": top_k, + "similarity_threshold": similarity_threshold + } + url = settings.fwr_base_url + RG_ORIGINAL_URL + # url = "http://192.168.20.116:11080/" + RG_ORIGINAL_URL + chat = ChatBaseApply() + response = await chat.chat_post(url, payload, await chat.get_headers(api_key)) + if not response: + raise HTTPException(status_code=500, detail="鏈嶅姟寮傚父锛�") + records = [ + { + "content": chunk["content"], + "score": chunk["similarity"], + "title": chunk.get("document_keyword", "Unknown Document"), + "metadata": {"document_id": chunk["document_id"]} + } + for chunk in response.get("data", {}).get("chunks", []) + ] + return records + + + +if __name__ == "__main__": + q = json.dumps({"query": "璁惧", "dataset_ids": ["fc68db52f43111efb94a0242ac120004"]}) + top_k = 2 + similarity_threshold = 0.5 + api_key = "ragflow-Y4MGYwY2JlZjM2YjExZWY4ZWU5MDI0Mm" + # a = service_chunk_retrieval(q, top_k, similarity_threshold, api_key) + # print(a) + async def a(): + b = await service_chunk_retrieval(q, top_k, similarity_threshold, api_key) + print(b) + asyncio.run(a()) \ No newline at end of file diff --git a/app/task/fetch_agent.py b/app/task/fetch_agent.py index fde6d3a..a5a7bfb 100644 --- a/app/task/fetch_agent.py +++ b/app/task/fetch_agent.py @@ -404,6 +404,17 @@ db.close() +def get_one_from_ragflow_dialog(dialog_id): + db = SessionRagflow() + try: + row = db.query(Dialog.id, Dialog.name, Dialog.description, Dialog.status, Dialog.tenant_id) \ + .filter(Dialog.id==dialog_id).first() + return {"id": row[0], "name": row[1], "description": row[2], "status": str(row[3]), + "user_id": str(row[4])} if row else {} + finally: + db.close() + + def sync_knowledge(): db = SessionLocal() diff --git a/main.py b/main.py index 05494b9..baad0e8 100644 --- a/main.py +++ b/main.py @@ -44,7 +44,7 @@ # sync_agents() - await sync_default_data() + # await sync_default_data() # Todo sync_agents_v2() # 鏅鸿兘浣� sync_knowledge() # 鐭ヨ瘑搴� sync_resources_from_json() -- Gitblit v1.8.0