|  |  | 
 |  |  | from fastapi import APIRouter, File, UploadFile | 
 |  |  | from fastapi import APIRouter, File, UploadFile, Form | 
 |  |  | from fastapi.responses import JSONResponse, FileResponse | 
 |  |  | from fastapi.exceptions import HTTPException | 
 |  |  | from starlette.websockets import WebSocket | 
 |  |  | from app.utils.excelmerge.conformity import run_conformity | 
 |  |  | import shutil | 
 |  |  | 
 |  |  | SOURCE_FILES_PATH = 'data/source' | 
 |  |  |  | 
 |  |  |  | 
 |  |  | def allowed_file(filename): | 
 |  |  | def allowed_file(filename: str) -> bool: | 
 |  |  |     return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | 
 |  |  |  | 
 |  |  |  | 
 |  |  | def create_dir_if_not_exists(path): | 
 |  |  | def create_dir_if_not_exists(path: str): | 
 |  |  |     if not os.path.exists(path): | 
 |  |  |         os.makedirs(path) | 
 |  |  |  | 
 |  |  |  | 
 |  |  | # 清理函数 | 
 |  |  | def clear_directory(path): | 
 |  |  | def clear_directory(path: str) -> dict: | 
 |  |  |     for filename in os.listdir(path): | 
 |  |  |         file_path = os.path.join(path, filename) | 
 |  |  |         try: | 
 |  |  | 
 |  |  |     return {"message": "目录已清空"} | 
 |  |  |  | 
 |  |  |  | 
 |  |  | def user_file_path(userid: str, path: str) -> str: | 
 |  |  |     return os.path.join(path, userid) | 
 |  |  |  | 
 |  |  |  | 
 |  |  | @router.post('/excel/upload') | 
 |  |  | async def upload_file(files: list[UploadFile] = File(...)): | 
 |  |  | async def upload_file(files: list[UploadFile] = File(...), user_id: str = Form(...)): | 
 |  |  |     if not any(file.filename for file in files): | 
 |  |  |         return JSONResponse(content={"error": "没有文件部分"}, status_code=400) | 
 |  |  |     if not user_id: | 
 |  |  |         return JSONResponse(content={"error": "缺少参数user_id"}, status_code=400) | 
 |  |  |     user_source = user_file_path(user_id, SOURCE_FILES_PATH) | 
 |  |  |     user_excel = user_file_path(user_id, EXCEL_FILES_PATH) | 
 |  |  |  | 
 |  |  |     create_dir_if_not_exists(SOURCE_FILES_PATH) | 
 |  |  |     create_dir_if_not_exists(EXCEL_FILES_PATH) | 
 |  |  |     clear_directory(SOURCE_FILES_PATH) | 
 |  |  |     clear_directory(EXCEL_FILES_PATH) | 
 |  |  |     create_dir_if_not_exists(user_source) | 
 |  |  |     create_dir_if_not_exists(user_excel) | 
 |  |  |     clear_directory(user_source) | 
 |  |  |     clear_directory(user_excel) | 
 |  |  |  | 
 |  |  |     save_path_list = [] | 
 |  |  |     for file in files: | 
 |  |  |         if file.filename == '': | 
 |  |  |             return JSONResponse(content={"error": "没有选择文件"}, status_code=400) | 
 |  |  |         if file and allowed_file(file.filename): | 
 |  |  |             save_path = os.path.join(SOURCE_FILES_PATH, file.filename) | 
 |  |  |             save_path = os.path.join(user_source, file.filename) | 
 |  |  |             with open(save_path, 'wb') as buffer: | 
 |  |  |                 shutil.copyfileobj(file.file, buffer) | 
 |  |  |             save_path_list.append(save_path) | 
 |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  | # ws://localhost:9201/api/document/ws/excel | 
 |  |  | @router.websocket("/ws/excel") | 
 |  |  | async def ws_excel(websocket: WebSocket): | 
 |  |  | @router.websocket("/ws/excel/{user_id}") | 
 |  |  | async def ws_excel(websocket: WebSocket, user_id: str): | 
 |  |  |     await websocket.accept() | 
 |  |  |  | 
 |  |  |     create_dir_if_not_exists(SOURCE_FILES_PATH) | 
 |  |  |     create_dir_if_not_exists(EXCEL_FILES_PATH) | 
 |  |  |     user_source = user_file_path(user_id, SOURCE_FILES_PATH) | 
 |  |  |     user_excel = user_file_path(user_id, EXCEL_FILES_PATH) | 
 |  |  |     create_dir_if_not_exists(user_source) | 
 |  |  |     create_dir_if_not_exists(user_excel) | 
 |  |  |  | 
 |  |  |     while True: | 
 |  |  |         data = await websocket.receive_text() | 
 |  |  |         try: | 
 |  |  |             if data == "\"合并Excel\"": | 
 |  |  |                 run_excel = run_conformity() | 
 |  |  |                 files = os.listdir(EXCEL_FILES_PATH) | 
 |  |  |                 run_excel = run_conformity(user_source, user_excel) | 
 |  |  |                 files = os.listdir(user_excel) | 
 |  |  |                 if run_excel: | 
 |  |  |                     first_file = files[0] | 
 |  |  |                     file_name = os.path.basename(first_file) | 
 |  |  | 
 |  |  |                 else: | 
 |  |  |                     await websocket.send_json({"error": "合并失败", "type": "stream", "files": []}) | 
 |  |  |             elif data == "\"查询合并进度\"": | 
 |  |  |                 files = os.listdir(EXCEL_FILES_PATH) | 
 |  |  |                 files = os.listdir(user_excel) | 
 |  |  |                 if not files: | 
 |  |  |                     await websocket.send_json({"step_message": "正在合并中", "type": "stream", "files": []}) | 
 |  |  |                 else: | 
 |  |  |                     await websocket.send_json({"step_message": "文档合并成功!", "type": "stream", "files": []}) | 
 |  |  |             elif data == "\"获取文件\"": | 
 |  |  |                 files = os.listdir(EXCEL_FILES_PATH) | 
 |  |  |                 files = os.listdir(user_excel) | 
 |  |  |                 if not files: | 
 |  |  |                     await websocket.send_json({"error": "目录下没有生成的文件", "type": "stream", "files": []}) | 
 |  |  |                 else: | 
 |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  | @router.get("/download/{filename}") | 
 |  |  | async def download_file(filename: str): | 
 |  |  |     try: | 
 |  |  |         return FileResponse(os.path.join(EXCEL_FILES_PATH, filename), filename=filename, | 
 |  |  |                             media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') | 
 |  |  |     except FileNotFoundError: | 
 |  |  |         raise HTTPException(status_code=404, detail="文件不存在") | 
 |  |  |     except Exception as e: | 
 |  |  |         raise HTTPException(status_code=500, detail="服务器错误") | 
 |  |  | async def download_file(filename: str, user_id: str): | 
 |  |  |     user_excel = user_file_path(user_id, EXCEL_FILES_PATH) | 
 |  |  |     file_path = os.path.join(user_excel, filename) | 
 |  |  |  | 
 |  |  |     if not os.path.exists(file_path): | 
 |  |  |         return JSONResponse(status_code=404, content={"error": "文件不存在"}) | 
 |  |  |  | 
 |  |  |     return FileResponse(file_path, filename=filename, | 
 |  |  |                         media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') |