import random import string from fastapi import APIRouter, File, UploadFile, Form, BackgroundTasks, Depends, Request, WebSocket from fastapi.responses import JSONResponse, FileResponse from sqlalchemy.orm import Session # from starlette.websockets import WebSocket from app.api import get_current_user, get_current_user_websocket, Response from app.models import UserModel, AgentType from app.models.base_model import get_db from app.service.session import SessionService from app.utils.excelmerge.conformity import run_conformity import shutil import os router = APIRouter() ALLOWED_EXTENSIONS = {'xlsx'} EXCEL_FILES_PATH = 'data/output' SOURCE_FILES_PATH = 'data/source' def allowed_file(filename: str) -> bool: return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def create_dir_if_not_exists(path: str): if not os.path.exists(path): os.makedirs(path) def clear_directory(path: str) -> dict: for filename in os.listdir(path): file_path = os.path.join(path, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: return {"error": "清空出错"} return {"message": "目录已清空"} def user_file_path(userid: str, path: str) -> str: return os.path.join(path, userid) def generate_db_id(prefix: str = "me") -> str: random_part = ''.join(random.choices(string.ascii_letters + string.digits, k=13)) return prefix + random_part def db_create_session(db: Session, user_id: str, message:str, upload_filenames: list): db_id = generate_db_id() session = SessionService(db).create_session( db_id, message, "basic_excel_merge", AgentType.BASIC, int(user_id), {"role": "user", "content": message, "upload_filenames": upload_filenames} ) return session @router.post('/excel/upload', response_model=Response) async def upload_file(files: list[UploadFile] = File(...), current_user: UserModel = Depends(get_current_user)): user_id = str(current_user.id) if not any(file.filename for file in files): return Response(code=400, msg="没有文件部分", data={}) if not user_id: return Response(code=400, msg="缺少参数user_id", data={}) user_source = user_file_path(user_id, SOURCE_FILES_PATH) user_excel = EXCEL_FILES_PATH create_dir_if_not_exists(user_source) create_dir_if_not_exists(user_excel) clear_directory(user_source) save_path_list = [] for file in files: if file and allowed_file(file.filename): save_path = os.path.join(user_source, file.filename) with open(save_path, 'wb') as buffer: shutil.copyfileobj(file.file, buffer) save_path_list.append(save_path) else: return Response(code=400, msg="不允许的文件类型", data={}) return Response(code=200, msg="上传成功", data={}) # ws://localhost:9201/api/document/ws/excel @router.websocket("/ws/excel") async def ws_excel(websocket: WebSocket, current_user: UserModel = Depends(get_current_user_websocket), db: Session = Depends(get_db)): await websocket.accept() user_id = str(current_user.id) user_source = user_file_path(user_id, SOURCE_FILES_PATH) user_excel = EXCEL_FILES_PATH create_dir_if_not_exists(user_source) create_dir_if_not_exists(user_excel) while True: # data = await websocket.receive_text()git receive_message = await websocket.receive_json() try: if receive_message.get("message") == "合并Excel": upload_filenames = receive_message.get('upload_filenames', []) merge_file = run_conformity(user_source, user_excel) if merge_file is not None: await websocket.send_json({ "type": "stream", "files": [ { "file_name": "Excel", "file_url": f"./api/document/download/{merge_file}.xlsx?file_type=excel", } ] }) await websocket.send_json({ "message": "合并成功", "type": "close", }) # 创建会话记录 session = db_create_session(db, user_id, receive_message.get("message"), upload_filenames) # 更新会话记录 if session: session_id = session.id new_message = { "role": "assistant", "content": { "message": "\u5408\u5e76\u6210\u529f", "type": "message", "file_name": "Excel", "file_url": f"/api/document/download/{merge_file}.xlsx?file_type=excel" } } session_service = SessionService(db) session_service.update_session(session_id, message=new_message) else: await websocket.send_json({"error": "合并失败", "type": "stream", "files": []}) await websocket.close() else: print(f"Received data: {receive_message.get('message')}") await websocket.send_json({"error": "未知指令", "data": str(receive_message.get('message'))}) await websocket.close() except Exception as e: await websocket.send_json({"error": str(e)}) await websocket.close() @router.get("/download/{file_full_name}") async def download_file(file_full_name: str): file_name = os.path.basename(file_full_name) user_excel = EXCEL_FILES_PATH file_path = os.path.join(user_excel, file_full_name) if not os.path.exists(file_path): return JSONResponse(content={"error": "文件不存在"}, status_code=404) return FileResponse( path=file_path, filename="Excel.xlsx", media_type='application/octet-stream', ) # def delete_file(): # try: # os.unlink(file_path) # except OSError as e: # print(f"Deleting file error") # 待下载完成后删除生成的文件 # background_tasks.add_task(delete_file) # return FileResponse(path=file_path, filename=file_name, # media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")