xuyonghao
2024-12-18 962116115afd0781d8b7f2a7315c427581f18201
修改excel脚本
2个文件已修改
142 ■■■■ 已修改文件
app/api/excel.py 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/utils/excelmerge/conformity.py 118 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/api/excel.py
@@ -1,13 +1,7 @@
from fastapi import APIRouter, File, UploadFile, Depends
from fastapi import APIRouter, File, UploadFile
from fastapi.responses import JSONResponse, FileResponse
from fastapi.exceptions import HTTPException
from sqlalchemy.orm import Session
from starlette.websockets import WebSocket, WebSocketDisconnect
from werkzeug.utils import secure_filename
from app.api import get_current_user_websocket
from app.models.agent_model import AgentModel, AgentType
from app.models.base_model import get_db
from app.models.user_model import UserModel
from starlette.websockets import WebSocket
from app.utils.excelmerge.conformity import run_conformity
import shutil
import os
@@ -17,6 +11,7 @@
ALLOWED_EXTENSIONS = {'xlsx'}
EXCEL_FILES_PATH = 'data/output'
SOURCE_FILES_PATH = 'data/source'
def allowed_file(filename):
    return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@@ -77,19 +72,18 @@
        data = await websocket.receive_text()
        try:
            if data == "\"合并Excel\"":
                output_file_path = run_conformity()
                output_file_path = run_conformity()
                run_excel = run_conformity()
                files = os.listdir(EXCEL_FILES_PATH)
                if files:
                if run_excel:
                    first_file = files[0]
                    file_name = os.path.basename(first_file)
                    file_url = f"./api/document/download/{first_file}"
                    download_url = f"./api/document/download/{first_file}"
                    await websocket.send_json({
                        "message": "文档合并成功!",
                        "type": "stream",
                        "files": [{
                            "file_name": file_name,
                            "file_url": file_url
                            "file_url": download_url
                        }]
                    })
                    await websocket.send_json({
@@ -97,7 +91,7 @@
                        "type": "close",
                    })
                else:
                    await websocket.send_json({"error": "合并操作未生成文件", "type": "stream", "files": []})
                    await websocket.send_json({"error": "合并失败", "type": "stream", "files": []})
            elif data == "\"查询合并进度\"":
                files = os.listdir(EXCEL_FILES_PATH)
                if not files:
@@ -136,4 +130,4 @@
    except FileNotFoundError:
        raise HTTPException(status_code=404, detail="文件不存在")
    except Exception as e:
        raise HTTPException(status_code=500, detail="服务器错误")
        raise HTTPException(status_code=500, detail="服务器错误")
app/utils/excelmerge/conformity.py
@@ -1,73 +1,77 @@
from openpyxl import load_workbook
from openpyxl.utils import get_column_letter
from datetime import datetime
import os
def run_conformity():
    # 加载模板文件
    template_path = os.path.join('app', 'utils', 'excelmerge', '国网上海电力整合模版.xlsx')
    template_excel = load_workbook(template_path)
    EXCEL_FILES_PATH = os.path.join('data', 'output')
    try:
        # 加载模板文件
        template_path = os.path.join('app', 'utils', 'excelmerge', '国网上海电力整合模版.xlsx')
        template_excel = load_workbook(template_path)
        EXCEL_FILES_PATH = os.path.join('data', 'output')
    # 获取工作表
    sheet1 = template_excel['技术监督工作统计表']
    sheet2 = template_excel['技术监督告(预)警单统计表']
    sheet3 = template_excel['投产前技术监督报告统计表']
    sheet4 = template_excel['技术监督细则完善建议']
    sheet5 = template_excel['典型经验交流']
        # 获取工作表
        sheet1 = template_excel['技术监督工作统计表']
        sheet2 = template_excel['技术监督告(预)警单统计表']
        sheet3 = template_excel['投产前技术监督报告统计表']
        sheet4 = template_excel['技术监督细则完善建议']
        sheet5 = template_excel['典型经验交流']
    # 获取源文件路径
    source_folder = os.path.join('data', 'source')
    source_files = [f for f in os.listdir(source_folder) if f.endswith('.xlsx') and not f.startswith('~$')]
        # 获取源文件路径
        source_folder = os.path.join('data', 'source')
        source_files = [f for f in os.listdir(source_folder) if f.endswith('.xlsx') and not f.startswith('~$')]
    for file in source_files:
        source_path = os.path.join(source_folder, file)
        source_excel = load_workbook(source_path)
        for file in source_files:
            source_path = os.path.join(source_folder, file)
            source_excel = load_workbook(source_path)
        # 获取源工作表
        ssheet1 = source_excel['技术监督工作统计表']
        ssheet2 = source_excel['技术监督告(预)警单统计表']
        ssheet3 = source_excel['投产前技术监督报告统计表']
        ssheet4 = source_excel['技术监督细则完善建议']
        ssheet5 = source_excel['典型经验交流']
            # 获取源工作表
            ssheet1 = source_excel['技术监督工作统计表']
            ssheet2 = source_excel['技术监督告(预)警单统计表']
            ssheet3 = source_excel['投产前技术监督报告统计表']
            ssheet4 = source_excel['技术监督细则完善建议']
            ssheet5 = source_excel['典型经验交流']
        # 清除无效空白行
        for ssheet in [ssheet1, ssheet2, ssheet3, ssheet4, ssheet5]:
            last_row = ssheet.max_row
            if last_row > 1:
                for row in range(last_row, 1, -1):
                    if all(ssheet.cell(row=row, column=col).value is None or ssheet.cell(row=row,
                                                                                         column=col).value == ''
                           for col in range(1, ssheet.max_column + 1)):
                        ssheet.delete_rows(row)
            # 清除无效空白行
            for ssheet in [ssheet1, ssheet2, ssheet3, ssheet4, ssheet5]:
                last_row = ssheet.max_row
                if last_row > 1:
                    for row in range(last_row, 1, -1):
                        if all(ssheet.cell(row=row, column=col).value is None or ssheet.cell(row=row,
                                                                                             column=col).value == ''
                               for col in range(1, ssheet.max_column + 1)):
                            ssheet.delete_rows(row)
        # 复制数据
        for (ssheet, tsheet, start_point) in [
            (ssheet1, sheet1, 4),
            (ssheet2, sheet2, 3),
            (ssheet3, sheet3, 3),
            (ssheet4, sheet4, 4),
            (ssheet5, sheet5, 3),
        ]:
            for row in range(start_point, ssheet.max_row + 1):
                a_cell_value = ssheet.cell(row=row, column=1).value
                if isinstance(a_cell_value, (int, float)) and any(
                        ssheet.cell(row=row, column=col).value for col in range(2, ssheet.max_column + 1)):
                    tsheet.append([ssheet.cell(row=row, column=col).value for col in range(1, ssheet.max_column + 1)])
            # 复制数据
            for (ssheet, tsheet, start_point) in [
                (ssheet1, sheet1, 4),
                (ssheet2, sheet2, 3),
                (ssheet3, sheet3, 3),
                (ssheet4, sheet4, 4),
                (ssheet5, sheet5, 3),
            ]:
                for row in range(start_point, ssheet.max_row + 1):
                    a_cell_value = ssheet.cell(row=row, column=1).value
                    if isinstance(a_cell_value, (int, float)) and any(
                            ssheet.cell(row=row, column=col).value for col in range(2, ssheet.max_column + 1)):
                        tsheet.append(
                            [ssheet.cell(row=row, column=col).value for col in range(1, ssheet.max_column + 1)])
        source_excel.close()
            source_excel.close()
    # 在目标工作表中添加序号
    for tsheet, start_point in [(sheet1, 4), (sheet2, 3), (sheet3, 3), (sheet4, 4), (sheet5, 3)]:
        last_row = tsheet.max_row
        for i in range(start_point, last_row + 1):
            tsheet.cell(row=i, column=1).value = i - start_point + 1
        # 在目标工作表中添加序号
        for tsheet, start_point in [(sheet1, 4), (sheet2, 3), (sheet3, 3), (sheet4, 4), (sheet5, 3)]:
            last_row = tsheet.max_row
            for i in range(start_point, last_row + 1):
                tsheet.cell(row=i, column=1).value = i - start_point + 1
    # 保存文件
    timestamp = datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
    output_path = os.path.join(EXCEL_FILES_PATH, f'{timestamp}.xlsx')
    template_excel.save(output_path)
    template_excel.close()
        # 保存文件
        timestamp = datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
        output_path = os.path.join(EXCEL_FILES_PATH, f'{timestamp}.xlsx')
        template_excel.save(output_path)
        template_excel.close()
    return timestamp
        return True
    except Exception as e:
        print(f"读取数据发生错误: {e}")
        return False