| | |
| | | import os |
| | | import shutil |
| | | import uuid |
| | | from datetime import datetime |
| | | |
| | | import yaml |
| | | from fastapi import UploadFile |
| | | |
| | | from datetime import datetime |
| | | from fastapi import UploadFile |
| | | from Log import logger |
| | | from app.api import pwd_context |
| | | from app.config.const import SYSTEM_ID, ENV_CONF_PATH, APP_STATIC_PATH |
| | | from app.config.const import SYSTEM_ID, ENV_CONF_PATH, APP_STATIC_PATH, APP_SERVICE_PATH, SYSTEM_STATUS_ON |
| | | from app.models.system import SystemDataModel |
| | | from cryptography.hazmat.primitives import serialization |
| | | from cryptography.hazmat.primitives.asymmetric import padding |
| | | from cryptography.hazmat.primitives import hashes |
| | | import base64 |
| | | import json |
| | | |
| | | from app.utils.common import get_machine_id |
| | | |
| | | |
| | | async def services_get_system_data(db): |
| | |
| | | with open(os.path.join(ENV_CONF_PATH, "system.yaml"), 'r', encoding='utf-8') as file: |
| | | # 加载JSON数据 |
| | | config = yaml.safe_load(file) |
| | | system = SystemDataModel(id=SYSTEM_ID, title=config["smart_system"]["title"], desc=config["smart_system"]["desc"]) |
| | | system = SystemDataModel(id=SYSTEM_ID, title=config["smart_system"]["title"], desc=config["smart_system"]["desc"], version=config["smart_system"]["version"]) |
| | | db.add(system) |
| | | db.commit() |
| | | db.refresh(system) |
| | |
| | | except Exception as e: |
| | | logger.error(f"保存失败: {str(e)}") |
| | | return "" |
| | | |
| | | |
| | | async def services_update_system_license(db, license_code): |
| | | try: |
| | | with open(os.path.join(APP_SERVICE_PATH, "pom/public_key.pem"), "rb") as f: |
| | | public_key = serialization.load_pem_public_key(f.read()) |
| | | license_data, signature = base64.b64decode(license_code).split(b"-----", 1) |
| | | # print(license_data) |
| | | public_key.verify( |
| | | signature, |
| | | license_data, |
| | | padding.PSS( |
| | | mgf=padding.MGF1(hashes.SHA256()), |
| | | salt_length=padding.PSS.MAX_LENGTH |
| | | ), |
| | | hashes.SHA256() |
| | | ) |
| | | license_dict = json.loads(license_data.decode('utf-8')) |
| | | # print(license_dict) |
| | | expiration_date = datetime.fromisoformat(license_dict['expiration_date']) |
| | | if expiration_date < datetime.now(): |
| | | return "授权码已过期" |
| | | system = db.query(SystemDataModel).filter_by(id=SYSTEM_ID).first() |
| | | if license_dict['machine_id'] != get_machine_id() or system.machine_id != license_dict['machine_id']: |
| | | return "授权码无效" |
| | | system.license_code = license_code |
| | | system.expired_at = expiration_date |
| | | system.status = SYSTEM_STATUS_ON |
| | | system.updated_at = datetime.now() |
| | | # db.query(SystemDataModel).filter_by(id=SYSTEM_ID).update({"license_code": license_code, "expired_at": expiration_date, "status": 1, "updated_at": datetime.now()}) |
| | | db.commit() |
| | | return "" |
| | | except Exception as e: |
| | | return f"验证失败: {str(e)}" |