from fastapi import APIRouter, File, UploadFile, Form, BackgroundTasks, Depends from fastapi.responses import JSONResponse, FileResponse from starlette.websockets import WebSocket from app.api import get_current_user, get_current_user_websocket from app.models import UserModel from app.utils.excelmerge.conformity import run_conformity import shutil import os router = APIRouter() ALLOWED_EXTENSIONS = {'xlsx'} EXCEL_FILES_PATH = 'data/output' SOURCE_FILES_PATH = 'data/source' def allowed_file(filename: str) -> bool: return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def create_dir_if_not_exists(path: str): if not os.path.exists(path): os.makedirs(path) def clear_directory(path: str) -> dict: for filename in os.listdir(path): file_path = os.path.join(path, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: return {"error": "清空出错"} return {"message": "目录已清空"} def user_file_path(userid: str, path: str) -> str: return os.path.join(path, userid) @router.post('/excel/upload') async def upload_file(files: list[UploadFile] = File(...), current_user: UserModel = Depends(get_current_user)): user_id = str(current_user.id) if not any(file.filename for file in files): return JSONResponse(content={"error": "没有文件部分"}, status_code=400) if not user_id: return JSONResponse(content={"error": "缺少参数user_id"}, status_code=400) user_source = user_file_path(user_id, SOURCE_FILES_PATH) user_excel = user_file_path(user_id, EXCEL_FILES_PATH) create_dir_if_not_exists(user_source) create_dir_if_not_exists(user_excel) clear_directory(user_source) clear_directory(user_excel) save_path_list = [] for file in files: if file.filename == '': return JSONResponse(content={"error": "没有选择文件"}, status_code=400) if file and allowed_file(file.filename): save_path = os.path.join(user_source, file.filename) with open(save_path, 'wb') as buffer: shutil.copyfileobj(file.file, buffer) save_path_list.append(save_path) else: return JSONResponse(content={"error": "不允许的文件类型"}, status_code=400) return JSONResponse(content={"code": 200, "msg": "", "data": {}}, status_code=200) # ws://localhost:9201/api/document/ws/excel @router.websocket("/ws/excel") async def ws_excel(websocket: WebSocket, current_user: UserModel = Depends(get_current_user_websocket)): await websocket.accept() user_id = str(current_user.id) user_source = user_file_path(user_id, SOURCE_FILES_PATH) user_excel = user_file_path(user_id, EXCEL_FILES_PATH) create_dir_if_not_exists(user_source) create_dir_if_not_exists(user_excel) while True: data = await websocket.receive_text() try: if data == "\"合并Excel\"": run_excel = run_conformity(user_source, user_excel) files = os.listdir(user_excel) if run_excel: first_file = files[0] file_name = os.path.basename(first_file) download_url = f"./api/document/download/{first_file}" await websocket.send_json({ "message": "文档合并成功!", "type": "stream", "files": [{ "file_name": file_name, "file_url": download_url }] }) await websocket.send_json({ "message": "文档合并成功!", "type": "close", }) else: await websocket.send_json({"error": "合并失败", "type": "stream", "files": []}) elif data == "\"查询合并进度\"": files = os.listdir(user_excel) if not files: await websocket.send_json({"step_message": "正在合并中", "type": "stream", "files": []}) else: await websocket.send_json({"step_message": "文档合并成功!", "type": "stream", "files": []}) elif data == "\"获取文件\"": files = os.listdir(user_excel) if not files: await websocket.send_json({"error": "目录下没有生成的文件", "type": "stream", "files": []}) else: first_file = files[0] file_name = os.path.basename(first_file) file_url = f"./api/document/download/{first_file}" await websocket.send_json({ "step_message": "文档合并成功!", "type": "stream", "files": [{ "file_name": file_name, "file_url": file_url }] }) else: print(f"Received data: {data}") await websocket.send_json({"error": "未知指令", "data": str(data)}) except Exception as e: await websocket.send_json({"error": str(e)}) await websocket.close() @router.get("/download/excel") async def download_file(background_tasks: BackgroundTasks, current_user: UserModel = Depends(get_current_user)): user_id = str(current_user.id) user_excel = user_file_path(user_id, EXCEL_FILES_PATH) user_source = user_file_path(user_id, SOURCE_FILES_PATH) if not os.path.exists(user_excel): return JSONResponse(status_code=404, content={"error": "用户目录不存在"}) excel_files = [f for f in os.listdir(user_excel) if os.path.isfile(os.path.join(user_excel, f))] excel_files.sort(key=lambda x: os.path.getmtime(os.path.join(user_excel, x)), reverse=True) if not excel_files: return JSONResponse(status_code=404, content={"error": "用户目录内没有文件"}) filename = excel_files[0] file_path = os.path.join(user_excel, filename) def delete_files_in_directory(directory): for root, dirs, files in os.walk(directory, topdown=False): for name in files: os.remove(os.path.join(root, name)) for name in dirs: os.rmdir(os.path.join(root, name)) def delete_file(): try: delete_files_in_directory(user_excel) delete_files_in_directory(user_source) except OSError as e: print(f"Error deleting file {file_path}: {e}") background_tasks.add_task(delete_file) return FileResponse(file_path, filename=filename, media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')