zhangxiao
2024-10-16 f192a7a302373ae931104d6cb44076fe694da470
报表合并
2个文件已修改
3个文件已添加
172 ■■■■■ 已修改文件
.gitignore 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/api/excel.py 105 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/utils/excelmerge/conformity.py 64 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/utils/excelmerge/国网上海电力整合模版.xlsx 补丁 | 查看 | 原始文档 | blame | 历史
main.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -1,5 +1,6 @@
venv
dist
data
build
main
__pycache__
app/api/excel.py
New file
@@ -0,0 +1,105 @@
from fastapi import APIRouter, File, UploadFile
from fastapi.responses import JSONResponse, FileResponse
from fastapi.exceptions import HTTPException
from werkzeug.utils import secure_filename
from app.utils.excelmerge.conformity import run_conformity
from pathlib import Path
import subprocess
import shutil
import os
router = APIRouter()
ALLOWED_EXTENSIONS = {'xlsx'}
EXCEL_FILES_PATH = 'data/output'
SOURCE_FILES_PATH = 'data/source'
output_path_value = None
def allowed_file(filename):
    return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def create_dir_if_not_exists(path):
    if not os.path.exists(path):
        os.makedirs(path)
@router.post('/excel/upload')
async def upload_file(files: list[UploadFile] = File(...)):
    if not any(file.filename for file in files):
        return JSONResponse(content={"error": "没有文件部分"}, status_code=400)
    create_dir_if_not_exists(SOURCE_FILES_PATH)
    # æ¸…空SOURCE_FILES_PATH目录
    for filename in os.listdir(SOURCE_FILES_PATH):
        file_path = os.path.join(SOURCE_FILES_PATH, filename)
        try:
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
        except Exception as e:
            return JSONResponse(content={"error": "文件处理出错"}, status_code=500)
    save_path_list = []
    for file in files:
        if file.filename == '':
            return JSONResponse(content={"error": "没有选择文件"}, status_code=400)
        if file and allowed_file(file.filename):
            filename = secure_filename(file.filename)
            save_path = os.path.join(SOURCE_FILES_PATH, filename)
            with open(save_path, 'wb') as buffer:
                shutil.copyfileobj(file.file, buffer)
            save_path_list.append(save_path)
        else:
            return JSONResponse(content={"error": "不允许的文件类型"}, status_code=400)
    return JSONResponse(content={"message": "文件上传成功", "paths": save_path_list}, status_code=201)
@router.post('/excel/conformity')
async def run_conformity_api():
    global output_path_value  # å£°æ˜Žå…¨å±€å˜é‡
    try:
        create_dir_if_not_exists(EXCEL_FILES_PATH)
        # æ¸…空EXCEL_FILES_PATH目录
        for filename in os.listdir(EXCEL_FILES_PATH):
            file_path = os.path.join(EXCEL_FILES_PATH, filename)
            try:
                if os.path.isfile(file_path) or os.path.islink(file_path):
                    os.unlink(file_path)
                elif os.path.isdir(file_path):
                    shutil.rmtree(file_path)
            except Exception as e:
                return JSONResponse(content={"error": "文件处理出错"}, status_code=500)
        # è¿è¡Œæ–¹æ³•
        output_path = run_conformity()
        output_path_value = output_path
        return JSONResponse(content={"message": "conformity.py è¿è¡ŒæˆåŠŸ", "output_path": str(output_path)}, status_code=200)
    except Exception as e:
        return JSONResponse(content={"error": str(e)}, status_code=500)
@router.get('/excel/file/status')
async def get_file_status():
    try:
        return JSONResponse(content={"output_path": str(output_path_value)}, status_code=200)
    except Exception as e:
        return JSONResponse(content={"error": str(e)}, status_code=500)
@router.get('/excel/download_excel')
async def download_excel():
    try:
        files = os.listdir(EXCEL_FILES_PATH)
        first_file = files[0]
        return FileResponse(os.path.join(EXCEL_FILES_PATH, first_file), filename=first_file,
                            media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
    except FileNotFoundError:
        raise HTTPException(status_code=404, detail="文件不存在")
    except Exception as e:
        raise HTTPException(status_code=500, detail="服务器错误")
app/utils/excelmerge/conformity.py
New file
@@ -0,0 +1,64 @@
import xlwings as xw
from datetime import datetime
import os
import shutil
def run_conformity():
    source_folder = os.path.join('data', 'source')
    template_path = os.path.join('app', 'utils', 'excelmerge', '国网上海电力整合模版.xlsx')
    EXCEL_FILES_PATH = os.path.join('data', 'output')
    app = xw.App(visible=False)
    templateExcel = xw.Book(template_path)
    sheet1 = templateExcel.sheets['技术监督工作统计表']
    sheet2 = templateExcel.sheets['技术监督告(预)警单统计表']
    sheet3 = templateExcel.sheets['投产前技术监督报告统计表']
    sheet4 = templateExcel.sheets['技术监督细则完善建议']
    sheet5 = templateExcel.sheets['典型经验交流']
    source_files = [f for f in os.listdir(source_folder) if f.endswith('.xlsx') and not f.startswith('~$')]
    for file in source_files:
        sourceExcel = xw.Book(os.path.join(source_folder, file))
        ssheet1 = sourceExcel.sheets['技术监督工作统计表']
        ssheet2 = sourceExcel.sheets['技术监督告(预)警单统计表']
        ssheet3 = sourceExcel.sheets['投产前技术监督报告统计表']
        ssheet4 = sourceExcel.sheets['技术监督细则完善建议']
        ssheet5 = sourceExcel.sheets['典型经验交流']
        for ssheet in [ssheet1, ssheet2, ssheet3, ssheet4, ssheet5]:
            last_row_value = ssheet.range(f'A{ssheet.used_range.rows.count}').value
            if isinstance(last_row_value, str):
                ssheet.range(f'A{ssheet.used_range.rows.count}:AA{ssheet.used_range.rows.count}').delete()
        for (ssheet, tsheet, start_point) in [
            (ssheet1, sheet1, 4),
            (ssheet2, sheet2, 3),
            (ssheet3, sheet3, 3),
            (ssheet4, sheet4, 4),
            (ssheet5, sheet5, 3),
        ]:
            for row in range(start_point, ssheet.used_range.rows.count + 1):
                a_cell_value = ssheet.range(f'A{row}').value
                if not isinstance(a_cell_value, (int, float)):
                    continue
                if not all(cell.value is None or cell.value == '' for cell in ssheet.range(f'B{row}:AA{row}')):
                    tsheet.range(f'B{tsheet.used_range.rows.count + 1}').value = ssheet.range(f'B{row}:AA{row}').value
        sourceExcel.close()
    for tsheet, start_point in [(sheet1, 4), (sheet2, 3), (sheet3, 3), (sheet4, 4), (sheet5, 3)]:
        last_row = tsheet.used_range.rows.count
        number = 1
        for i in range(start_point, last_row + 1):
            tsheet.range(f'A{i}').value = number
            number += 1
    timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
    output_path = os.path.join(EXCEL_FILES_PATH, f'{timestamp}.xlsx')
    templateExcel.save(output_path)
    templateExcel.close()
    app.quit()
    return timestamp
app/utils/excelmerge/¹úÍøÉϺ£µçÁ¦ÕûºÏÄ£°æ.xlsx
Binary files differ
main.py
@@ -2,6 +2,7 @@
from app.api.auth import router as auth_router
from app.api.chat import router as chat_router
from app.api.agent import router as agent_router
from app.api.excel import router as excel_router
from app.models.base_model import init_db
init_db()
@@ -14,6 +15,7 @@
app.include_router(auth_router, prefix='/auth', tags=["auth"])
app.include_router(chat_router, prefix='/chat', tags=["chat"])
app.include_router(agent_router, prefix='/agent', tags=["agent"])
app.include_router(excel_router, prefix='/document', tags=["document"])
if __name__ == "__main__":
    import uvicorn