From f95f801f35aa201cbaffd7d881c07edc9398b570 Mon Sep 17 00:00:00 2001
From: zhaoqingang <zhaoqg0118@163.com>
Date: 星期一, 03 三月 2025 16:03:51 +0800
Subject: [PATCH] 增加外接知识库中转接口

---
 app/service/ragflow.py             |    4 
 app/service/service_token.py       |    7 +
 app/api/chat.py                    |    3 
 app/models/v2/mindmap.py           |   12 +++
 app/api/v2/chat.py                 |   28 ++++++
 app/config/agent_base_url.py       |    1 
 app/models/v2/chat.py              |   13 +++
 app/service/auth.py                |    2 
 app/service/v2/chat.py             |   80 +++++++++++++++++++
 app/config/env_conf/config119.yaml |   12 +-
 app/service/dialog.py              |   32 +++++++
 app/task/fetch_agent.py            |   11 ++
 app/api/dialog.py                  |   14 ++
 main.py                            |    2 
 app/models/dialog_model.py         |    2 
 app/api/__init__.py                |    8 +
 app/config/env_conf/menu_conf.json |    4 
 17 files changed, 209 insertions(+), 26 deletions(-)

diff --git a/app/api/__init__.py b/app/api/__init__.py
index 09538a7..2a679e8 100644
--- a/app/api/__init__.py
+++ b/app/api/__init__.py
@@ -3,7 +3,7 @@
 
 import jwt
 # from cryptography.fernet import Fernet
-from fastapi import FastAPI, Depends, HTTPException
+from fastapi import FastAPI, Depends, HTTPException, Header
 from fastapi.security import OAuth2PasswordBearer
 from passlib.context import CryptContext
 from pydantic import BaseModel
@@ -116,6 +116,12 @@
                 # 璁板綍寮傚父淇℃伅锛屼絾缁х画澶勭悊鍏朵粬鏂囦欢
                 print(f"Error processing file URL: {e}")
 
+def get_api_key(authorization: str = Header(...)):
+    if not authorization.startswith("Bearer "):
+        raise HTTPException(status_code=401, detail="Invalid Authorization header format.")
+    return authorization.split(" ")[1]
+
+
 
 if __name__=="__main__":
 
diff --git a/app/api/chat.py b/app/api/chat.py
index 16003e2..d4b6dae 100644
--- a/app/api/chat.py
+++ b/app/api/chat.py
@@ -71,11 +71,12 @@
                     chat_history = message.get('chatHistory', [])
                     message["role"] = "user"
                     if len(chat_history) == 0:
+                        print("----------------------", token)
                         chat_history = await ragflow_service.get_session_history(token, chat_id)
                         if len(chat_history) == 0:
                             chat_history = await ragflow_service.set_session(token, agent_id,
                                                                              message, chat_id, True)
-                            # print("chat_history------------------------", chat_history)
+                            print("chat_history------------------------", chat_history)
                             if len(chat_history) == 0:
                                 result = {"message": "鍐呴儴閿欒锛氬垱寤轰細璇濆け璐�", "type": "close"}
                                 await websocket.send_json(result)
diff --git a/app/api/dialog.py b/app/api/dialog.py
index 814932a..b11ef36 100644
--- a/app/api/dialog.py
+++ b/app/api/dialog.py
@@ -6,7 +6,7 @@
 from app.models.base_model import get_db
 from app.models.user_model import UserModel
 from app.service.dialog import get_dialog_list, create_dialog_service, update_dialog_status_service, \
-    delete_dialog_service, update_dialog_icon_service, get_dialog_manage_list
+    delete_dialog_service, update_dialog_icon_service, get_dialog_manage_list, sync_dialog_service
 
 dialog_router = APIRouter()
 
@@ -62,7 +62,7 @@
 
 @dialog_router.put("/update_icon", response_model=Response)
 async def change_dialog_icon(dialog: dialogDataUpdate, current_user: UserModel = Depends(get_current_user), db=Depends(get_db)):
-    is_create = await update_dialog_icon_service(db, dialog.id, dialog.icon)
+    is_create = await update_dialog_icon_service(db, dialog.id, dialog.icon, dialog.name, dialog.description)
     if not is_create:
         return Response(code=500, msg="dialog update failure", data={})
     return Response(code=200, msg="dialog update success", data={})
@@ -72,4 +72,12 @@
 async def dialog_list_api(dialog:dialogList,
                       current_user: UserModel = Depends(get_current_user),
                       db=Depends(get_db)):
-    return Response(code=200, msg="", data=await get_dialog_manage_list(db, current_user.id, dialog.keyword, dialog.label, dialog.status, dialog.pageSize, dialog.current, dialog.mode))
\ No newline at end of file
+    return Response(code=200, msg="", data=await get_dialog_manage_list(db, current_user.id, dialog.keyword, dialog.label, dialog.status, dialog.pageSize, dialog.current, dialog.mode))
+
+
+@dialog_router.get("/sync", response_model=Response)
+async def sync_dialog_api(dialogId: str, current_user: UserModel = Depends(get_current_user), db=Depends(get_db)):
+    is_create = await sync_dialog_service(db, dialogId)
+    if not is_create:
+        return Response(code=500, msg="dialog update failure", data={})
+    return Response(code=200, msg="dialog update success", data={})
\ No newline at end of file
diff --git a/app/api/v2/chat.py b/app/api/v2/chat.py
index a09ab43..6f78896 100644
--- a/app/api/v2/chat.py
+++ b/app/api/v2/chat.py
@@ -7,15 +7,16 @@
 from starlette.responses import StreamingResponse, Response
 from werkzeug.http import HTTP_STATUS_CODES
 
-from app.api import get_current_user
+from app.api import get_current_user, get_api_key
 from app.config.const import dialog_chat, advanced_chat, base_chat, agent_chat, workflow_chat, basic_chat, \
     smart_message_error, http_400, http_500, http_200
 from app.models import UserModel
 from app.models.base_model import get_db
+from app.models.v2.chat import RetrievalRequest
 from app.models.v2.session_model import ChatData
 from app.service.v2.chat import service_chat_dialog, get_chat_info, service_chat_basic, \
     service_chat_workflow, service_chat_parameters, service_chat_sessions, service_chat_upload, \
-    service_chat_sessions_list, service_chat_session_log
+    service_chat_sessions_list, service_chat_session_log, service_chunk_retrieval, service_base_chunk_retrieval
 
 chat_router_v2 = APIRouter()
 
@@ -36,6 +37,7 @@
                                  media_type="text/event-stream")
     if not session_id:
         session = await service_chat_sessions(db, chatId, dialog.query)
+        print(session)
         if not session or session.get("code") != 0:
             error_msg = json.dumps(
                 {"message": smart_message_error, "error": "\n**ERROR**: chat agent error", "status": http_500})
@@ -123,4 +125,24 @@
 @chat_router_v2.get("/chat/session_log")
 async def api_chat_sessions(sessionId:str, current_user: UserModel = Depends(get_current_user), db: Session = Depends(get_db)): #  current_user: UserModel = Depends(get_current_user)
     data = await service_chat_session_log(db, sessionId)
-    return Response(data, media_type="application/json", status_code=http_200)
\ No newline at end of file
+    return Response(data, media_type="application/json", status_code=http_200)
+
+
+
+# @chat_router_v2.post("/conversation/mindmap")
+# async def api_conversation_mindmap(chatId:str, current:int=1, current_user: UserModel = Depends(get_current_user), db: Session = Depends(get_db)): #  current_user: UserModel = Depends(get_current_user)
+#     data = await service_chat_sessions_list(db, chatId, current, pageSize, current_user.id, keyword)
+#     return Response(data, media_type="application/json", status_code=http_200)
+
+
+
+@chat_router_v2.post("/multi/retrieval")
+async def retrieve_chunks(request_data: RetrievalRequest, api_key: str = Depends(get_api_key)):
+    records = await service_chunk_retrieval(request_data.query, request_data.retrieval_setting.top_k, request_data.retrieval_setting.score_threshold, api_key)
+    return {"records": records}
+
+
+@chat_router_v2.post("/retrieval")
+async def retrieve_chunks(request_data: RetrievalRequest, api_key: str = Depends(get_api_key)):
+    records = await service_base_chunk_retrieval(request_data.query,request_data.knowledge_id, request_data.retrieval_setting.top_k, request_data.retrieval_setting.score_threshold, api_key)
+    return {"records": records}
diff --git a/app/config/agent_base_url.py b/app/config/agent_base_url.py
index 3e4c48f..48a35f1 100644
--- a/app/config/agent_base_url.py
+++ b/app/config/agent_base_url.py
@@ -6,6 +6,7 @@
 RG_APP_TOKEN_LIST= "/v1/system/token_list"
 RG_USER_LOGIN= "/v1/user/login"
 RG_PING= "/v1/system/version"
+RG_ORIGINAL_URL = "/api/v1/retrieval"
 
 ### ----------
 DF_CHAT_AGENT= "/v1/chat-messages"
diff --git a/app/config/env_conf/config116.yaml b/app/config/env_conf/config119.yaml
similarity index 82%
rename from app/config/env_conf/config116.yaml
rename to app/config/env_conf/config119.yaml
index 5bdca69..297cfab 100644
--- a/app/config/env_conf/config116.yaml
+++ b/app/config/env_conf/config119.yaml
@@ -1,8 +1,8 @@
 secret_key: your-secret-key
-sgb_base_url: http://192.168.20.116:13001
-sgb_websocket_url: ws://192.168.20.116:13001
+sgb_base_url: http://192.168.20.119:13001
+sgb_websocket_url: ws://192.168.20.119:13001
 fwr_base_url: http://192.168.20.116:11080
-database_url: mysql+pymysql://root:rag_gateway@192.168.20.116:23306/rag_gateway
+database_url: mysql+pymysql://root:infini_rag_flow@192.168.20.119:5455/rag_basic
 sgb_db_url: mysql+pymysql://root:1234@192.168.20.119:13306/bisheng
 fwr_db_url: mysql+pymysql://root:infini_rag_flow@192.168.20.116:15455/rag_flow
 PUBLIC_KEY: |
@@ -15,12 +15,12 @@
 PASSWORD_KEY: VKinqB-8XMrwCLLrcf_PyHyo12_4PVKvWzaHjNFions=
 basic_base_url: http://192.168.20.231:8000
 basic_paper_url: http://192.168.20.231:8000
-dify_base_url: http://192.168.20.116
+dify_base_url: http://192.168.20.119:13002
 dify_api_token: app-YmOAMDsPpDDlqryMHnc9TzTO
-postgresql_database_url: postgresql+asyncpg://kong:kongpass@192.168.20.116:15433/kong
+postgresql_database_url: postgresql+asyncpg://kong:kongpass@192.168.20.119:5432/kong
 dify_workflow_clean: app-OpF0drPu0XcgqcekQpT4FA8a
 dify_workflow_report: app-0MAkdFWqh9zxwmU69O0BFU1s
-dify_database_url: postgresql+psycopg2://postgres:difyai123456@192.168.20.116:15432/dify
+dify_database_url: postgresql+psycopg2://postgres:difyai123456@192.168.20.119:15432/dify
 
 
 
diff --git a/app/config/env_conf/menu_conf.json b/app/config/env_conf/menu_conf.json
index abd265d..9418fb8 100644
--- a/app/config/env_conf/menu_conf.json
+++ b/app/config/env_conf/menu_conf.json
@@ -27,8 +27,8 @@
       "rank": 105,
       "dialog": [
         {
-          "id": "6b8ee426c67511efb1510242ac1b0006",
-          "chat_id": "6b8ee426c67511efb1510242ac1b0006",
+          "id": "ee0a3e38f5c211efb7600242ac1a0006",
+          "chat_id": "ee0a3e38f5c211efb7600242ac1a0006",
           "chat_type": "knowledgeQA",
           "agentType": 1
         }
diff --git a/app/models/dialog_model.py b/app/models/dialog_model.py
index 2abb060..50a8f8e 100644
--- a/app/models/dialog_model.py
+++ b/app/models/dialog_model.py
@@ -83,6 +83,8 @@
     id: str
     status: Optional[str] = "1"
     icon: Optional[str] = ""
+    name: Optional[str] = ""
+    description: Optional[str] = None
 
 
 class dialogList(BaseModel):
diff --git a/app/models/v2/chat.py b/app/models/v2/chat.py
new file mode 100644
index 0000000..3abe304
--- /dev/null
+++ b/app/models/v2/chat.py
@@ -0,0 +1,13 @@
+from pydantic import BaseModel
+
+
+
+class RetrievalSetting(BaseModel):
+    top_k: int
+    score_threshold: float
+
+
+class RetrievalRequest(BaseModel):
+    knowledge_id: str
+    query: str
+    retrieval_setting: RetrievalSetting
\ No newline at end of file
diff --git a/app/models/v2/mindmap.py b/app/models/v2/mindmap.py
new file mode 100644
index 0000000..cead556
--- /dev/null
+++ b/app/models/v2/mindmap.py
@@ -0,0 +1,12 @@
+import json
+from typing import Optional, Type, List
+from pydantic import BaseModel
+
+
+
+
+class ChatData(BaseModel):
+    sessionId: Optional[str] = ""
+
+    class Config:
+        extra = 'allow'  # 鍏佽鍏朵粬鍔ㄦ�佸瓧娈�
\ No newline at end of file
diff --git a/app/service/auth.py b/app/service/auth.py
index dcf1ebb..ffdece9 100644
--- a/app/service/auth.py
+++ b/app/service/auth.py
@@ -22,7 +22,7 @@
 
 SECRET_KEY = settings.secret_key
 ALGORITHM = "HS256"
-ACCESS_TOKEN_EXPIRE_MINUTES = 3000
+ACCESS_TOKEN_EXPIRE_MINUTES = 24*60
 
 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
 
diff --git a/app/service/dialog.py b/app/service/dialog.py
index 3b965f2..34e4711 100644
--- a/app/service/dialog.py
+++ b/app/service/dialog.py
@@ -5,13 +5,14 @@
 
 from app.config.agent_base_url import DF_CHAT_PARAMETERS, DF_CHAT_API_KEY
 from app.config.config import settings
-from app.config.const import Dialog_STATSU_DELETE, DF_TYPE, Dialog_STATSU_ON, workflow_server
+from app.config.const import Dialog_STATSU_DELETE, DF_TYPE, Dialog_STATSU_ON, workflow_server, RG_TYPE
 from app.models import KnowledgeModel, GroupModel, DialogModel, ConversationModel, group_dialog_table, LabelWorkerModel, \
     LabelModel, ApiTokenModel
 from app.models.user_model import UserModel, UserTokenModel
 from Log import logger
 from app.service.v2.app_driver.chat_data import ChatBaseApply
 from app.service.v2.chat import get_chat_token, add_chat_token, get_app_token
+from app.task.fetch_agent import get_one_from_ragflow_dialog
 
 
 async def get_dialog_list(db, user_id, keyword, label, status, page_size, page_index):
@@ -173,9 +174,14 @@
     return True
 
 
-async def update_dialog_icon_service(db, dialog_id, icon):
+async def update_dialog_icon_service(db, dialog_id, icon, name, description):
+    update = {"icon": icon, "update_date": datetime.now()}
+    if name:
+        update["name"] = name
+    if description or description == "":
+        update["description"] = description
     try:
-        db.query(DialogModel).filter_by(id=dialog_id).update({"icon": icon, "update_date": datetime.now()})
+        db.query(DialogModel).filter_by(id=dialog_id).update(update)
         db.commit()
     except Exception as e:
         logger.error(e)
@@ -228,3 +234,23 @@
         r["user"] = user_dict.get(r["user_id"], {})
         r["label"] = label_dict.get(r["id"], [])
     return {"total": total, "rows": rows}
+
+
+
+async def sync_dialog_service(db, dialog_id):
+    dialog = db.query(DialogModel).filter(DialogModel.id == dialog_id).first()
+    if dialog and dialog.dialog_type == RG_TYPE:
+        try:
+            app_dialog = get_one_from_ragflow_dialog(dialog_id)
+            if app_dialog:
+                dialog.name = app_dialog["name"]
+                dialog.description = app_dialog["description"]
+                dialog.update_date = datetime.now()
+                db.add(dialog)
+                db.commit()
+                db.refresh(dialog)
+        except Exception as e:
+            logger.error(e)
+            db.rollback()
+            return False
+        return True
diff --git a/app/service/ragflow.py b/app/service/ragflow.py
index f0e6fd8..45961a6 100644
--- a/app/service/ragflow.py
+++ b/app/service/ragflow.py
@@ -19,7 +19,7 @@
             return {}
 
         data = response.json()
-        ret_code = data.get("retcode")
+        ret_code = data.get("retcode", data.get("code"))
         if ret_code == 401:
             raise HTTPException(
                 status_code=status.HTTP_401_UNAUTHORIZED,
@@ -136,6 +136,8 @@
         }
         async with httpx.AsyncClient() as client:
             response = await client.post(url, headers=headers, json=data)
+            print(response.status_code)
+            print(response.text)
             data = self._handle_response(response)
             return [
                 {
diff --git a/app/service/service_token.py b/app/service/service_token.py
index 6383b6a..ccbe489 100644
--- a/app/service/service_token.py
+++ b/app/service/service_token.py
@@ -1,7 +1,7 @@
 from Log import logger
 from app.config.config import settings
-from app.config.const import BISHENG, RAGFLOW, DIFY
-from app.models import UserModel, UserAppModel
+from app.config.const import BISHENG, RAGFLOW, DIFY, chat_server
+from app.models import UserModel, UserAppModel, UserTokenModel
 from app.models.token_model import TokenModel
 from app.service.auth import UserAppDao
 from app.service.bisheng import BishengService
@@ -25,13 +25,14 @@
 async def get_ragflow_token(db, user_id: int):
     # token = await UserAppDao(db).get_data_by_id(user_id, RAGFLOW)
     token = db.query(TokenModel).filter(TokenModel.user_id == user_id).first()
+    token = db.query(UserTokenModel).filter(UserTokenModel.id == chat_server).first()
     if not token:
         token = await UserAppDao(db).get_data_by_id(user_id, RAGFLOW)
         if not token:
             return None
         return token.access_token
     else:
-        return token.ragflow_token
+        return token.access_token
 
 
 async def get_dify_token(db, user_id: int):
diff --git a/app/service/v2/chat.py b/app/service/v2/chat.py
index a24f88d..33634a9 100644
--- a/app/service/v2/chat.py
+++ b/app/service/v2/chat.py
@@ -1,11 +1,13 @@
+import asyncio
 import io
 import json
 
 import fitz
+from fastapi import HTTPException
 
 from Log import logger
 from app.config.agent_base_url import RG_CHAT_DIALOG, DF_CHAT_AGENT, DF_CHAT_PARAMETERS, RG_CHAT_SESSIONS, \
-    DF_CHAT_WORKFLOW, DF_UPLOAD_FILE
+    DF_CHAT_WORKFLOW, DF_UPLOAD_FILE, RG_ORIGINAL_URL
 from app.config.config import settings
 from app.config.const import *
 from app.models import DialogModel, ApiTokenModel, UserTokenModel
@@ -258,6 +260,7 @@
 
 async def service_chat_sessions(db, chat_id, name):
     token = await get_chat_token(db, rg_api_token)
+    # print(token)
     if not token:
         return {}
     url = settings.fwr_base_url + RG_CHAT_SESSIONS.format(chat_id)
@@ -343,3 +346,78 @@
         text = await read_word(file)
 
     return await get_str_token(text)
+
+
+async def service_chunk_retrieval(query, top_k, similarity_threshold, api_key):
+    print(query)
+
+    try:
+        request_data = json.loads(query)
+    except json.JSONDecodeError as e:
+        fixed_json = query.replace("'", '"')
+        print("Fixed JSON:", fixed_json)
+        request_data = json.loads(fixed_json)
+    payload = {
+        "question": request_data.get("query", ""),
+        "dataset_ids": request_data.get("dataset_ids", []),
+        "page_size": top_k,
+        "similarity_threshold": similarity_threshold
+    }
+    url = settings.fwr_base_url + RG_ORIGINAL_URL
+    # url = "http://192.168.20.116:11080/" + RG_ORIGINAL_URL
+    chat = ChatBaseApply()
+    response = await  chat.chat_post(url, payload, await chat.get_headers(api_key))
+    if not response:
+        raise HTTPException(status_code=500, detail="鏈嶅姟寮傚父锛�")
+    print(response)
+    records = [
+        {
+            "content": chunk["content"],
+            "score": chunk["similarity"],
+            "title": chunk.get("document_keyword", "Unknown Document"),
+            "metadata": {"document_id": chunk["document_id"],
+                         "path": f"{settings.fwr_base_url}/document/{chunk['document_id']}?ext={chunk.get('document_keyword').split('.')[-1]}&prefix=document",
+                         'highlight': chunk.get("highlight") , "image_id":  chunk.get("image_id"), "positions": chunk.get("positions"),}
+        }
+        for chunk in response.get("data", {}).get("chunks", [])
+    ]
+    return records
+
+async def service_base_chunk_retrieval(query, knowledge_id, top_k, similarity_threshold, api_key):
+    # request_data = json.loads(query)
+    payload = {
+        "question": query,
+        "dataset_ids": [knowledge_id],
+        "page_size": top_k,
+        "similarity_threshold": similarity_threshold
+    }
+    url = settings.fwr_base_url + RG_ORIGINAL_URL
+    # url = "http://192.168.20.116:11080/" + RG_ORIGINAL_URL
+    chat = ChatBaseApply()
+    response = await chat.chat_post(url, payload, await chat.get_headers(api_key))
+    if not response:
+        raise HTTPException(status_code=500, detail="鏈嶅姟寮傚父锛�")
+    records = [
+        {
+            "content": chunk["content"],
+            "score": chunk["similarity"],
+            "title": chunk.get("document_keyword", "Unknown Document"),
+            "metadata": {"document_id": chunk["document_id"]}
+        }
+        for chunk in response.get("data", {}).get("chunks", [])
+    ]
+    return records
+
+
+
+if __name__ == "__main__":
+    q = json.dumps({"query": "璁惧", "dataset_ids": ["fc68db52f43111efb94a0242ac120004"]})
+    top_k = 2
+    similarity_threshold = 0.5
+    api_key = "ragflow-Y4MGYwY2JlZjM2YjExZWY4ZWU5MDI0Mm"
+    # a = service_chunk_retrieval(q, top_k, similarity_threshold, api_key)
+    # print(a)
+    async def a():
+        b = await service_chunk_retrieval(q, top_k, similarity_threshold, api_key)
+        print(b)
+    asyncio.run(a())
\ No newline at end of file
diff --git a/app/task/fetch_agent.py b/app/task/fetch_agent.py
index fde6d3a..a5a7bfb 100644
--- a/app/task/fetch_agent.py
+++ b/app/task/fetch_agent.py
@@ -404,6 +404,17 @@
         db.close()
 
 
+def get_one_from_ragflow_dialog(dialog_id):
+    db = SessionRagflow()
+    try:
+        row = db.query(Dialog.id, Dialog.name, Dialog.description, Dialog.status, Dialog.tenant_id) \
+            .filter(Dialog.id==dialog_id).first()
+        return {"id": row[0], "name": row[1], "description": row[2], "status": str(row[3]),
+                "user_id": str(row[4])} if row else {}
+    finally:
+        db.close()
+
+
 def sync_knowledge():
     db = SessionLocal()
 
diff --git a/main.py b/main.py
index 05494b9..baad0e8 100644
--- a/main.py
+++ b/main.py
@@ -44,7 +44,7 @@
     # sync_agents()
 
 
-    await sync_default_data()
+    # await sync_default_data()  # Todo
     sync_agents_v2() # 鏅鸿兘浣�
     sync_knowledge() # 鐭ヨ瘑搴�
     sync_resources_from_json()

--
Gitblit v1.8.0