zhaoqingang
2025-04-01 6846a4c98a793e74ae17b47f04a0ff8b210aeb24
app/service/system.py
@@ -1,15 +1,20 @@
import os
import shutil
import uuid
from datetime import datetime
import yaml
from fastapi import UploadFile
from datetime import datetime
from fastapi import UploadFile
from Log import logger
from app.api import pwd_context
from app.config.const import SYSTEM_ID, ENV_CONF_PATH, APP_STATIC_PATH
from app.config.const import SYSTEM_ID, ENV_CONF_PATH, APP_STATIC_PATH, APP_SERVICE_PATH, SYSTEM_STATUS_ON
from app.models.system import SystemDataModel
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
import base64
import json
from app.utils.common import get_machine_id
async def services_get_system_data(db):
@@ -18,7 +23,7 @@
        with open(os.path.join(ENV_CONF_PATH, "system.yaml"), 'r', encoding='utf-8') as file:
            # 加载JSON数据
            config = yaml.safe_load(file)
            system = SystemDataModel(id=SYSTEM_ID, title=config["smart_system"]["title"], desc=config["smart_system"]["desc"])
            system = SystemDataModel(id=SYSTEM_ID, title=config["smart_system"]["title"], desc=config["smart_system"]["desc"], version=config["smart_system"]["version"])
            db.add(system)
            db.commit()
            db.refresh(system)
@@ -53,3 +58,37 @@
    except Exception as e:
        logger.error(f"保存失败: {str(e)}")
        return ""
async def services_update_system_license(db, license_code):
    try:
        with open(os.path.join(APP_SERVICE_PATH, "pom/public_key.pem"), "rb") as f:
            public_key = serialization.load_pem_public_key(f.read())
        license_data, signature = base64.b64decode(license_code).split(b"-----", 1)
        # print(license_data)
        public_key.verify(
            signature,
            license_data,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
        license_dict = json.loads(license_data.decode('utf-8'))
        # print(license_dict)
        expiration_date = datetime.fromisoformat(license_dict['expiration_date'])
        if expiration_date < datetime.now():
            return "授权码已过期"
        system = db.query(SystemDataModel).filter_by(id=SYSTEM_ID).first()
        if license_dict['machine_id'] != get_machine_id() or system.machine_id != license_dict['machine_id']:
            return "授权码无效"
        system.license_code = license_code
        system.expired_at = expiration_date
        system.status = SYSTEM_STATUS_ON
        system.updated_at = datetime.now()
        # db.query(SystemDataModel).filter_by(id=SYSTEM_ID).update({"license_code": license_code, "expired_at": expiration_date, "status": 1, "updated_at": datetime.now()})
        db.commit()
        return ""
    except Exception as e:
        return f"验证失败: {str(e)}"