import io from typing import Optional, List import requests from fastapi import Depends, APIRouter, HTTPException, UploadFile, File, Query, Form from pydantic import BaseModel from sqlalchemy.orm import Session from starlette.responses import StreamingResponse from werkzeug.utils import send_file from app.api import Response, get_current_user, ResponseList from app.config.config import settings from app.models.agent_model import AgentType, AgentModel from app.models.base_model import get_db from app.models.user_model import UserModel from app.service.basic import BasicService from app.service.bisheng import BishengService from app.service.v2.api_token import DfTokenDao from app.service.difyService import DifyService from app.service.ragflow import RagflowService from app.service.service_token import get_ragflow_token, get_bisheng_token import urllib.parse router = APIRouter() @router.post("/upload/{agent_id}", response_model=Response) async def upload_files( agent_id: str, file: List[UploadFile] = File(...), # 修改这里,接受文件列表 chat_id: str = Query(None, description="The ID of the chat"), db: Session = Depends(get_db), current_user: UserModel = Depends(get_current_user) ): agent = db.query(AgentModel).filter(AgentModel.id == agent_id).first() if not agent: return Response(code=404, msg="Agent not found") # 检查 agent 类型,确定是否允许上传多个文件 if agent.agent_type in [AgentType.RAGFLOW, AgentType.BISHENG]: if len(file) > 1: return Response(code=400, msg="这个智能体只支持传单个文件") if agent.agent_type == AgentType.RAGFLOW or agent.agent_type == AgentType.BISHENG: file = file[0] # 读取上传的文件内容 try: file_content = await file.read() except Exception as e: return Response(code=400, msg=str(e)) if agent.agent_type == AgentType.RAGFLOW: token = await get_ragflow_token(db, current_user.id) ragflow_service = RagflowService(base_url=settings.fwr_base_url) # 查询会话是否存在,不存在先创建会话 history = await ragflow_service.get_session_history(token, chat_id) if len(history) == 0: message = {"role": "user", "message": file.filename} await ragflow_service.set_session(token, agent_id, message, chat_id, True) doc_ids = await ragflow_service.upload_and_parse(token, chat_id, file.filename, file_content) # 对于多文件,可能需要收集所有doc_ids return Response(code=200, msg="", data={"doc_ids": doc_ids, "file_name": file.filename}) elif agent.agent_type == AgentType.BISHENG: bisheng_service = BishengService(base_url=settings.sgb_base_url) try: token = await get_bisheng_token(db, current_user.id) result = await bisheng_service.upload(token, file.filename, file_content) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) result["file_name"] = file.filename return Response(code=200, msg="", data=result) elif agent.agent_type == AgentType.BASIC: if agent_id == "basic_excel_talk": # 处理单个文件的情况 file_list = file if len(file) == 1: # and agent.agent_type != AgentType.BASIC file_list = [file[0]] # 如果只有一个文件,确保它是一个列表 service = BasicService(base_url=settings.basic_base_url) # 遍历file_list,存到files 列表中 files = [] for item in file_list: file_content = await item.read() files.append(('files', (item.filename, file_content, 'application/octet-stream'))) result = await service.excel_talk_upload(chat_id, files) if not result: return Response(code=400, msg="上传文件出错") return Response(code=200, msg="", data=result) elif agent_id == "basic_paper_agent": ... # service = BasicService(base_url=settings.basic_paper_url) # result = await service.paper_file_upload(chat_id, file.filename, file_content) elif agent.agent_type == AgentType.DIFY: dify_service = DifyService(base_url=settings.dify_base_url) if agent.type == "imageTalk": token = DfTokenDao(db).get_token_by_id("image_and_text_conversion") if not token: raise HTTPException(status_code=500, detail="获取token失败,image_and_text_conversion!") file = file[0] # 读取上传的文件内容 try: file_content = await file.read() except Exception as e: return Response(code=400, msg=str(e)) try: data = await dify_service.upload(token, file.filename, file_content, current_user.id) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) elif agent.type == "reportWorkflow": token = DfTokenDao(db).get_token_by_id("document_to_report") if not token: raise HTTPException(status_code=500, detail="获取token失败,document_to_report!") result = [] for f in file: try: file_content = await f.read() except Exception as e: return Response(code=400, msg=str(e)) try: file_upload = await dify_service.upload(token, f.filename, file_content, current_user.id) result.append(file_upload) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # result["file_name"] = file.filename data = {"files": result} return Response(code=200, msg="", data=data) @router.get("/download/", response_model=Response) async def download_file( url: Optional[str] = Query(None, description="URL of the file to download for bisheng"), agent_id: str = Query(..., description="Agent ID"), doc_id: Optional[str] = Query(None, description="Optional doc id for ragflow agents"), doc_name: Optional[str] = Query(None, description="Optional doc name for ragflow agents"), file_id: Optional[str] = Query(None, description="Optional file id for basic agents"), file_type: Optional[str] = Query(None, description="Optional file type for basic agents"), db: Session = Depends(get_db) ): agent = db.query(AgentModel).filter(AgentModel.id == agent_id).first() if not agent: return Response(code=404, msg="Agent not found") if agent.agent_type == AgentType.BISHENG: url = urllib.parse.unquote(url) # 从 URL 中提取文件名 parsed_url = urllib.parse.urlparse(url) filename = urllib.parse.unquote(parsed_url.path.split('/')[-1]) url = url.replace("http://minio:9000", settings.sgb_base_url) elif agent.agent_type == AgentType.RAGFLOW: if not doc_id: return Response(code=400, msg="doc_id is required") url = f"{settings.fwr_base_url}/v1/document/get/{doc_id}" filename = doc_name elif agent.agent_type == AgentType.BASIC: if agent_id == "basic_excel_talk": return await download_basic_file(file_id, file_type) elif agent_id == "basic_question_talk": return await download_basic_file(file_id, file_type) else: return Response(code=400, msg="Unsupported agent type") try: # 发送GET请求获取文件内容 response = requests.get(url, stream=True) response.raise_for_status() # 检查请求是否成功 # 返回流式响应 return StreamingResponse( response.iter_content(chunk_size=1024), media_type="application/octet-stream", headers={"Content-Disposition": f"attachment; filename*=utf-8''{urllib.parse.quote(filename)}"} ) except Exception as e: raise HTTPException(status_code=400, detail=f"Error downloading file: {e}") async def download_basic_file(file_id: str, file_type: str): service = BasicService(base_url=settings.basic_base_url) if not file_type or not file_id: return Response(code=400, msg="file_type and file_id is required") if file_type == "image": content, filename, mimetype = await service.excel_talk_image_download(file_id) return StreamingResponse( io.BytesIO(content), media_type=mimetype, headers={"Content-Disposition": f"attachment; filename={filename}"} ) elif file_type == "excel": content, filename, mimetype = await service.excel_talk_excel_download(file_id) return StreamingResponse( io.BytesIO(content), media_type=mimetype, headers={"Content-Disposition": f"attachment; filename={filename}"} ) elif file_type == "word": content, filename, mimetype = await service.questions_talk_word_download(file_id) return StreamingResponse( io.BytesIO(content), media_type=mimetype, headers={"Content-Disposition": f"attachment; filename={filename}"} ) else: return Response(code=400, msg="Unsupported file type") @router.get("/image/{imageId}", response_model=Response) async def download_image_file(imageId: str, db=Depends(get_db)): file_path = f"app/images/{imageId}.png" def generate(): with open(file_path, "rb") as file: while True: data = file.read(1048576) # 读取1MB if not data: break yield data return StreamingResponse(generate(), media_type="application/octet-stream")