xuyonghao
2024-12-26 b40851d4f1d321782c17af6ed8f062113bf0ed31
合并excel上传,合并,下载 修改
2个文件已修改
110 ■■■■■ 已修改文件
app/api/excel.py 82 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/utils/excelmerge/conformity.py 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/api/excel.py
@@ -49,12 +49,11 @@
    if not user_id:
        return JSONResponse(content={"error": "缺少参数user_id"}, status_code=400)
    user_source = user_file_path(user_id, SOURCE_FILES_PATH)
    user_excel = user_file_path(user_id, EXCEL_FILES_PATH)
    user_excel = EXCEL_FILES_PATH
    create_dir_if_not_exists(user_source)
    create_dir_if_not_exists(user_excel)
    clear_directory(user_source)
    clear_directory(user_excel)
    save_path_list = []
    for file in files:
@@ -77,7 +76,7 @@
    user_id = str(current_user.id)
    user_source = user_file_path(user_id, SOURCE_FILES_PATH)
    user_excel = user_file_path(user_id, EXCEL_FILES_PATH)
    user_excel = EXCEL_FILES_PATH
    create_dir_if_not_exists(user_source)
    create_dir_if_not_exists(user_excel)
@@ -85,19 +84,14 @@
        data = await websocket.receive_text()
        try:
            if data == "\"合并Excel\"":
                run_excel = run_conformity(user_source, user_excel)
                files = os.listdir(user_excel)
                if run_excel:
                    first_file = files[0]
                    file_name = os.path.basename(first_file)
                    download_url = f"./api/document/download/{first_file}"
                merge_file = run_conformity(user_source, user_excel)
                if merge_file is not None:
                    await websocket.send_json({
                        "message": "文档合并成功!",
                        "type": "stream",
                        "files": [{
                            "file_name": file_name,
                            "file_url": download_url
                        }]
                        "file_name": f"{merge_file}.xlsx",
                        "download_url": f"./api/document/download/{merge_file}.xlsx"
                    })
                    await websocket.send_json({
                        "message": "文档合并成功!",
@@ -105,28 +99,6 @@
                    })
                else:
                    await websocket.send_json({"error": "合并失败", "type": "stream", "files": []})
            elif data == "\"查询合并进度\"":
                files = os.listdir(user_excel)
                if not files:
                    await websocket.send_json({"step_message": "正在合并中", "type": "stream", "files": []})
                else:
                    await websocket.send_json({"step_message": "文档合并成功!", "type": "stream", "files": []})
            elif data == "\"获取文件\"":
                files = os.listdir(user_excel)
                if not files:
                    await websocket.send_json({"error": "目录下没有生成的文件", "type": "stream", "files": []})
                else:
                    first_file = files[0]
                    file_name = os.path.basename(first_file)
                    file_url = f"./api/document/download/{first_file}"
                    await websocket.send_json({
                        "step_message": "文档合并成功!",
                        "type": "stream",
                        "files": [{
                            "file_name": file_name,
                            "file_url": file_url
                        }]
                    })
            else:
                print(f"Received data: {data}")
                await websocket.send_json({"error": "未知指令", "data": str(data)})
@@ -135,38 +107,22 @@
            await websocket.close()
@router.get("/download/excel")
async def download_file(background_tasks: BackgroundTasks, current_user: UserModel = Depends(get_current_user)):
    user_id = str(current_user.id)
    user_excel = user_file_path(user_id, EXCEL_FILES_PATH)
    user_source = user_file_path(user_id, SOURCE_FILES_PATH)
@router.get("/download/{file_full_name}")
async def download_file(background_tasks: BackgroundTasks, file_full_name: str):
    file_name = os.path.basename(file_full_name)
    user_excel = EXCEL_FILES_PATH
    file_path = os.path.join(user_excel, file_full_name)
    if not os.path.exists(user_excel):
        return JSONResponse(status_code=404, content={"error": "用户目录不存在"})
    excel_files = [f for f in os.listdir(user_excel) if os.path.isfile(os.path.join(user_excel, f))]
    excel_files.sort(key=lambda x: os.path.getmtime(os.path.join(user_excel, x)), reverse=True)
    if not excel_files:
        return JSONResponse(status_code=404, content={"error": "用户目录内没有文件"})
    filename = excel_files[0]
    file_path = os.path.join(user_excel, filename)
    def delete_files_in_directory(directory):
        for root, dirs, files in os.walk(directory, topdown=False):
            for name in files:
                os.remove(os.path.join(root, name))
            for name in dirs:
                os.rmdir(os.path.join(root, name))
    if not os.path.exists(file_path):
        return JSONResponse(content={"error": "文件不存在"}, status_code=404)
    def delete_file():
        try:
            delete_files_in_directory(user_excel)
            delete_files_in_directory(user_source)
            os.unlink(file_path)
        except OSError as e:
            print(f"Error deleting file {file_path}: {e}")
            print(f"Deleting file error")
    # 待下载完成后删除生成的文件
    background_tasks.add_task(delete_file)
    return FileResponse(file_path, filename=filename,
                        media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
    return FileResponse(path=file_path, filename=file_name,
                        media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
app/utils/excelmerge/conformity.py
@@ -1,6 +1,10 @@
from openpyxl import load_workbook
from datetime import datetime
import os
import random
import shutil
import string
from datetime import datetime
from openpyxl import load_workbook
def clear_blank_rows(sheet):
@@ -55,11 +59,25 @@
                    template_sheets[name].cell(row=i, column=1).value = i - start_row + 1
        timestamp = datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
        output_path = os.path.join(print_path, f'{timestamp}.xlsx')
        random_string = ''.join(random.choice(string.ascii_letters + string.digits) for i in range(5))
        file_name = f'{random_string}_{timestamp}'
        output_path = os.path.join(print_path, f'{file_name}.xlsx')
        template_excel.save(output_path)
        template_excel.close()
        return True
        # 合并完成后删除无用文件
        for filename in os.listdir(file_path):
            file_path_full = os.path.join(file_path, filename)
            try:
                if os.path.isfile(file_path_full) or os.path.islink(file_path_full):
                    os.unlink(file_path_full)
                elif os.path.isdir(file_path_full):
                    shutil.rmtree(file_path_full)
                os.rmdir(file_path)
            except Exception as e:
                print(f"删除文件时发生错误: {e}")
        return file_name
    except Exception as e:
        print(f"读取数据发生错误: {e}")
        return False
        return None