import os import shutil import uuid import yaml from datetime import datetime from fastapi import UploadFile from Log import logger from app.config.const import SYSTEM_ID, ENV_CONF_PATH, APP_STATIC_PATH, APP_SERVICE_PATH, SYSTEM_STATUS_ON from app.models.system import SystemDataModel from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives import hashes import base64 import json from app.utils.common import get_machine_id async def services_get_system_data(db): system = db.query(SystemDataModel).filter_by(id=SYSTEM_ID).first() if not system: with open(os.path.join(ENV_CONF_PATH, "system.yaml"), 'r', encoding='utf-8') as file: # 加载JSON数据 config = yaml.safe_load(file) system = SystemDataModel(id=SYSTEM_ID, title=config["smart_system"]["title"], desc=config["smart_system"]["desc"], version=config["smart_system"]["version"]) db.add(system) db.commit() db.refresh(system) return system.to_dict() async def services_update_system_data(db, title, desc, logo): try: if os.path.exists(os.path.join(APP_STATIC_PATH, logo)): shutil.move(os.path.join(APP_STATIC_PATH, logo), os.path.join(APP_STATIC_PATH, "logo.png")) system = db.query(SystemDataModel).filter_by(id=SYSTEM_ID).first() system.title = title system.desc = desc system.updated_at = datetime.now() db.commit() db.refresh(system) return system.to_dict() except Exception as e: logger.error(e) return {} async def service_upload_logo_image(file: UploadFile): file_name = str(uuid.uuid4()) try: save_path = os.path.join(APP_STATIC_PATH, file_name) # 将上传的文件保存到本地 with open(save_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) file.file.close() return file_name except Exception as e: logger.error(f"保存失败: {str(e)}") return "" async def services_update_system_license(db, license_code): try: with open(os.path.join(APP_SERVICE_PATH, "pom/public_key.pem"), "rb") as f: public_key = serialization.load_pem_public_key(f.read()) license_data, signature = base64.b64decode(license_code).split(b"-----", 1) # print(license_data) public_key.verify( signature, license_data, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) license_dict = json.loads(license_data.decode('utf-8')) # print(license_dict) expiration_date = datetime.fromisoformat(license_dict['expiration_date']) if expiration_date < datetime.now(): return "授权码已过期" system = db.query(SystemDataModel).filter_by(id=SYSTEM_ID).first() if license_dict['machine_id'] != get_machine_id() or system.machine_id != license_dict['machine_id']: return "授权码无效" system.license_code = license_code system.expired_at = expiration_date system.status = SYSTEM_STATUS_ON system.updated_at = datetime.now() # db.query(SystemDataModel).filter_by(id=SYSTEM_ID).update({"license_code": license_code, "expired_at": expiration_date, "status": 1, "updated_at": datetime.now()}) db.commit() return "" except Exception as e: return f"验证失败: {str(e)}"