From 4e301c1fcf4a0b6e2e797acaa3169d5ff4fe92f5 Mon Sep 17 00:00:00 2001 From: zhaoqingang <zhaoqg0118@163.com> Date: 星期三, 08 一月 2025 13:50:52 +0800 Subject: [PATCH] sse --- app/service/v2/app_driver/chat_dialog.py | 50 ++++++++++++ app/service/v2/chat.py | 60 ++++++++++++-- app/config/const.py | 6 + app/models/v2/session_model.py | 82 ++++++------------- app/api/v2/chat.py | 6 app/config/agent_base_url.py | 1 6 files changed, 135 insertions(+), 70 deletions(-) diff --git a/app/api/v2/chat.py b/app/api/v2/chat.py index 95d2962..037b0ef 100644 --- a/app/api/v2/chat.py +++ b/app/api/v2/chat.py @@ -10,6 +10,6 @@ chat1_router = APIRouter() -@chat1_router.get("/chat_dialog") -async def api_chat_dialog(dialog: ChatDialogData, db: Session = Depends(get_db), current_user: UserModel = Depends(get_current_user)): - return StreamingResponse(await service_chat_dialog(dialog.chatId ,dialog.question, dialog.sessionId), media_type="text/event-stream") \ No newline at end of file +@chat1_router.post("/chat_dialog") +async def api_chat_dialog(dialog: ChatDialogData, db: Session = Depends(get_db)): # current_user: UserModel = Depends(get_current_user) + return StreamingResponse(service_chat_dialog(db, dialog.chatId ,dialog.question, dialog.sessionId, 1), media_type="text/event-stream") \ No newline at end of file diff --git a/app/config/agent_base_url.py b/app/config/agent_base_url.py new file mode 100644 index 0000000..6d903d2 --- /dev/null +++ b/app/config/agent_base_url.py @@ -0,0 +1 @@ +RG_CHAT_DIALOG= "/api/v1/chats/{}/completions" \ No newline at end of file diff --git a/app/config/const.py b/app/config/const.py index 6165148..6e6f8a3 100644 --- a/app/config/const.py +++ b/app/config/const.py @@ -37,4 +37,8 @@ RESOURCE_STATUS_DELETE = "2" RESOURCE_STATUS_ON = "1" -RESOURCE_STATUS_OFF = "0" \ No newline at end of file +RESOURCE_STATUS_OFF = "0" + + +### +max_chunk_size =5000 \ No newline at end of file diff --git a/app/models/v2/session_model.py b/app/models/v2/session_model.py index 94cf32b..aa85f19 100644 --- a/app/models/v2/session_model.py +++ b/app/models/v2/session_model.py @@ -11,9 +11,11 @@ from app.models.agent_model import AgentType from app.models.base_model import Base + def current_time(): tz = pytz.timezone('Asia/Shanghai') return datetime.now(tz) + class ChatSessionModel(Base): __tablename__ = "chat_sessions" @@ -22,18 +24,17 @@ # Index('idx_username', 'username'), # ) - id = Column(Integer, primary_key=True) + id = Column(String(36), primary_key=True) name = Column(String(255)) agent_id = Column(String(255)) agent_type = Column(Integer) # 鐩墠鍙瓨basic鐨勶紝ragflow鍜宐isheng鐨勮皟鎺ュ彛鑾峰彇 create_date = Column(DateTime, default=current_time) # 鍒涘缓鏃堕棿锛岄粯璁ゅ�间负褰撳墠鏃跺尯鏃堕棿 update_date = Column(DateTime, default=current_time, onupdate=current_time, index=True) # 鏇存柊鏃堕棿锛岄粯璁ゅ�间负褰撳墠鏃跺尯鏃堕棿锛屾洿鏂版椂鑷姩鏇存柊 - tenant_id = Column(Integer) # 鍒涘缓浜� - message = Column(TEXT) # 璇存槑 - reference = Column(TEXT) # 璇存槑 - conversation_id = Column(String(64)) - session_id = Column(String(36), index=True) - chat_mode = Column(Integer) + tenant_id = Column(Integer, index=True) # 鍒涘缓浜� + message = Column(TEXT) + reference = Column(TEXT) + conversation_id = Column(String(36), index=True) + event_type = Column(String(16)) # to_dict 鏂规硶 def to_dict(self): @@ -64,8 +65,10 @@ msg = json.loads(self.message) msg.append(message) except Exception as e: + print(e) return self.message = json.dumps(msg) + class ChatDialogData(BaseModel): @@ -74,45 +77,34 @@ chatId: str - class ChatSessionDao: def __init__(self, db: Session): self.db = db - def create_session(self, session_id: str, name: str, agent_id: str, agent_type: int, user_id: int, message: str,reference:str) -> ChatSessionModel: + async def create_session(self, session_id: str, **kwargs) -> ChatSessionModel: new_session = ChatSessionModel( id=session_id, - name=name[0:255], - agent_id=agent_id, - agent_type=agent_type, create_date=current_time(), update_date=current_time(), - tenant_id=user_id, - message=message, - reference=reference, + **kwargs ) + new_session.message = json.dumps([new_session.message]) self.db.add(new_session) self.db.commit() self.db.refresh(new_session) return new_session - def get_session_by_id(self, session_id: str) -> Type[ChatSessionModel] | None: + async def get_session_by_id(self, session_id: str) -> ChatSessionModel | None: session = self.db.query(ChatSessionModel).filter_by(id=session_id).first() - if session and session.message is None: - session.message = '[]' return session - def update_session_by_id(self, session_id: str, **kwargs) -> Type[ChatSessionModel] | None: - session = self.get_session_by_id(session_id) + async def update_session_by_id(self, session_id: str, session, message: dict) -> ChatSessionModel | None: + if not session: + session = await self.get_session_by_id(session_id) if session: - if "message" in kwargs: - session.add_message(kwargs["message"]) - # 鏇挎崲鍏朵粬瀛楁 - for key, value in kwargs.items(): - if key != "message": - setattr(session, key, value) - session.update_date = current_time() try: + session.add_message(message) + session.update_date = current_time() self.db.commit() self.db.refresh(session) except Exception as e: @@ -120,36 +112,16 @@ self.db.rollback() return session - def create_session(self, session_id: str, name: str, agent_id: str, agent_type: AgentType, user_id: int) -> ChatSessionModel: - existing_session = self.get_session_by_id(session_id) + async def update_or_insert_by_id(self, session_id: str, **kwargs) -> ChatSessionModel: + existing_session = await self.get_session_by_id(session_id) if existing_session: - existing_session.add_message({"role": "user", "content": name}) - existing_session.update_date = current_time() - self.db.commit() - self.db.refresh(existing_session) - return existing_session + return await self.update_session_by_id(session_id, existing_session, kwargs.get("message")) - new_session = ChatSessionModel( - id=session_id, - name=name[0:50], - agent_id=agent_id, - agent_type=agent_type, - tenant_id=user_id, - message=json.dumps([{"role": "user", "content": name}]) - ) - self.db.add(new_session) - self.db.commit() - self.db.refresh(new_session) - return new_session + existing_session = await self.create_session(session_id, **kwargs) + return existing_session - def delete_session(self, session_id: str) -> None: - """ - 鍒犻櫎浼氳瘽璁板綍銆� - - 鍙傛暟: - session_id (str): 浼氳瘽ID銆� - """ - session = self.get_session_by_id(session_id) + async def delete_session(self, session_id: str) -> None: + session = await self.get_session_by_id(session_id) if session: self.db.delete(session) - self.db.commit() \ No newline at end of file + self.db.commit() diff --git a/app/service/v2/app_driver/chat_dialog.py b/app/service/v2/app_driver/chat_dialog.py index 9a7bf88..27807fd 100644 --- a/app/service/v2/app_driver/chat_dialog.py +++ b/app/service/v2/app_driver/chat_dialog.py @@ -1,3 +1,6 @@ +import json + +from Log import logger from app.service.v2.app_driver.chat_base import ChatBase @@ -16,7 +19,50 @@ async def chat_completions(self, url, data, headers): + complete_response = "" + async for line in self.http_stream(url, data, headers): + # logger.error(line) + if line.startswith("data:"): + complete_response = line.strip("data:").strip() + else: + complete_response += line.strip() + try: + json_data = json.loads(complete_response) + # 澶勭悊 JSON 鏁版嵁 + # print(json_data) + complete_response = "" + yield json_data - async for rag_response in self.http_stream(url, data, headers): + except json.JSONDecodeError as e: + logger.info("Invalid JSON data------------------") + # print(e) - yield rag_response + + + + + + +if __name__ == "__main__": + async def aa(): + chat_id = "6b8ee426c67511efb1510242ac1b0006" + token = "ragflow-YzMzE1NDRjYzMyZjExZWY5ZjkxMDI0Mm" + base_url = "http://192.168.20.116:11080" + url = f"{base_url}/api/v1/chats/{chat_id}/completions" + chat = ChatDialog(token) + data = { + "question": "鐢电綉鎶�鏈�荤粨300瀛�", + "stream": True, + "session_id": "9969c152cce411ef8a140242ac1b0002" + } + headers = { + 'Content-Type': 'application/json', + 'Authorization': f"Bearer {token}" + } + async for ans in chat.chat_completions(url, data, headers): + print(ans) + + + import asyncio + + asyncio.run(aa()) diff --git a/app/service/v2/chat.py b/app/service/v2/chat.py index 136a113..982ed43 100644 --- a/app/service/v2/chat.py +++ b/app/service/v2/chat.py @@ -1,13 +1,18 @@ import json +from Log import logger +from app.config.agent_base_url import RG_CHAT_DIALOG +from app.config.config import settings +from app.config.const import max_chunk_size +from app.models.v2.session_model import ChatSessionDao from app.service.v2.app_driver.chat_dialog import ChatDialog -async def service_chat_dialog(chat_id:str, question: str, session_id: str): +async def service_chat_dialog(db, chat_id:str, question: str, session_id: str, user_id): token = "ragflow-YzMzE1NDRjYzMyZjExZWY5ZjkxMDI0Mm" - url = f"/api/v1/chats/{chat_id}/completions" + url = settings.fwr_base_url+RG_CHAT_DIALOG.format(chat_id) chat = ChatDialog(token) - data = { + request_data = { "question": question, "stream": True, "session_id": session_id @@ -17,12 +22,49 @@ 'Authorization': f"Bearer {token}" } try: - for ans in chat.chat_completions(url, data, headers): - - yield "data:" + json.dumps({"code": 0, "message": "", "data": ans}, ensure_ascii=False) + "\n\n" - ChatSessionModel.update_by_id(conv.id, conv.to_dict()) + await ChatSessionDao(db).update_or_insert_by_id( + session_id=session_id, + name=question[:255], + agent_id=chat_id, + agent_type=1, + tenant_id=user_id, + message={"role": "user", "content": question}, + conversation_id=session_id, + event_type="message" + ) except Exception as e: - yield "data:" + json.dumps({"code": 500, "message": str(e), + logger.error(e) + try: + message = {"role": "assistant","answer":"", "reference": {}} + async for ans in chat.chat_completions(url, request_data, headers): + if ans.get("code", None) == 102: + error = ans.get("message", "璇疯緭鍏ヤ綘鐨勯棶棰橈紒") + data = {"answer":error} + event = "message" + else: + if isinstance(ans.get("data"), bool) and ans.get("data") is True: + data = {} + event = "message_end" + else: + data = ans.get("data", {}) + message = ans.get("data", {}) + event = "message" + message_str = "data: " + json.dumps({"event": event, "data": data}, ensure_ascii=False) + "\n\n" + for i in range(0, len(message_str), max_chunk_size): + chunk = message_str[i:i + max_chunk_size] + # print(chunk) + yield chunk # 鍙戦�佸垎鍧楁秷鎭� + await ChatSessionDao(db).update_session_by_id( + session_id=session_id, + session=None, + message=message + ) + except Exception as e: + logger.error(e) + yield "data: " + json.dumps({"message": "message", "data": {"answer": "**ERROR**: " + str(e), "reference": []}}, ensure_ascii=False) + "\n\n" - yield "data:" + json.dumps({"code": 0, "message": "", "data": True}, ensure_ascii=False) + "\n\n" \ No newline at end of file + + yield "data: " + json.dumps({"message": "message_end", + "data": {}}, + ensure_ascii=False) + "\n\n" \ No newline at end of file -- Gitblit v1.8.0