zhangqian
2024-10-16 3fc9f4f33cf90610c71a1de7b00db0f82b988e98
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import json
 
from fastapi import WebSocket, WebSocketDisconnect, APIRouter, Depends, HTTPException, Query
import asyncio
import websockets
from sqlalchemy.orm import Session
from app.api import get_current_user_websocket, ResponseList, get_current_user
from app.config.config import settings
from app.models.agent_model import AgentModel, AgentType
from app.models.base_model import get_db
from app.models.user_model import UserModel
from app.service.bisheng import BishengService
from app.service.token import get_bisheng_token
 
router = APIRouter()
 
 
@router.websocket("/ws/{agent_id}/{chat_id}")
async def report_chat(websocket: WebSocket,
                      agent_id: str,
                      chat_id: str,
                      current_user: UserModel = Depends(get_current_user_websocket),
                      db: Session = Depends(get_db)):
    agent = db.query(AgentModel).filter(AgentModel.id == agent_id).first()
    if not agent:
        ret = {"message": "Agent not found", "type": "close"}
        return websocket.send_json(ret)
    agent_type = agent.agent_type
    if chat_id == "" or chat_id == "0":
        ret = {"message": "Chat ID not found", "type": "close"}
        return websocket.send_json(ret)
 
    if agent_type != AgentType.BISHENG:
        ret = {"message": "Agent error", "type": "close"}
        return websocket.send_json(ret)
 
    token = get_bisheng_token(db, current_user.id)
    service_uri = f"{settings.bisheng_websocket_url}/api/v1/chat/{agent_id}?type=L1&t=&chat_id={chat_id}"
    headers = {'cookie': f"access_token_cookie={token};"}
 
    await websocket.accept()
    print(f"Client {agent_id} connected")
 
    async with websockets.connect(service_uri, extra_headers=headers) as service_websocket:
 
        try:
            # 处理客户端发来的消息
            async def forward_to_service():
                while True:
                    message = await websocket.receive_json()
                    print(f"Received from client, {chat_id}: {message}")
                    # 添加 'agent_id' 和 'chat_id' 字段
                    message['flow_id'] = agent_id
                    message['chat_id'] = chat_id
                    await service_websocket.send(json.dumps(message))
                    print(f"Forwarded to bisheng: {message}")
 
            # 监听毕昇发来的消息并转发给客户端
            async def forward_to_client():
                while True:
                    message = await service_websocket.recv()
                    print(f"Received from bisheng: {message}")
                    data = json.loads(message)
                    files = data.get("files", [])
                    steps = data.get("intermediate_steps", "")
                    if len(files) != 0 or steps != "" or data["type"] == "close":
                        if data["type"] == "close":
                            t = "close"
                        else:
                            t = "stream"
                        result = {"step_message": steps, "type": t, "files": files}
                        await websocket.send_json(result)
                        print(f"Forwarded to client, {chat_id}: {result}")
 
            # 启动两个任务,分别处理客户端和服务端的消息
            tasks = [
                asyncio.create_task(forward_to_service()),
                asyncio.create_task(forward_to_client())
            ]
            done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
 
            # 取消未完成的任务
            for task in pending:
                task.cancel()
                try:
                    await task
                except asyncio.CancelledError:
                    pass
 
        except WebSocketDisconnect:
            print(f"Client {chat_id} disconnected")
 
 
@router.get("/variables/list", response_model=ResponseList)
async def get_variables(agent_id: str = Query(..., description="The ID of the agent"), db: Session = Depends(get_db), current_user: UserModel = Depends(get_current_user)):
    agent = db.query(AgentModel).filter(AgentModel.id == agent_id).first()
    if not agent:
        return ResponseList(code=404, msg="Agent not found")
    bisheng_service = BishengService(base_url=settings.bisheng_base_url)
    try:
        token = get_bisheng_token(db, current_user.id)
        result = await bisheng_service.variable_list(token, agent_id)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    return ResponseList(code=200, msg="", data=result)