import os
|
import shutil
|
import uuid
|
import yaml
|
|
from datetime import datetime
|
from fastapi import UploadFile
|
from Log import logger
|
from app.config.const import SYSTEM_ID, ENV_CONF_PATH, APP_STATIC_PATH, APP_SERVICE_PATH, SYSTEM_STATUS_ON
|
from app.models.system import SystemDataModel
|
from cryptography.hazmat.primitives import serialization
|
from cryptography.hazmat.primitives.asymmetric import padding
|
from cryptography.hazmat.primitives import hashes
|
import base64
|
import json
|
|
from app.utils.common import get_machine_id
|
|
|
async def services_get_system_data(db):
|
system = db.query(SystemDataModel).filter_by(id=SYSTEM_ID).first()
|
if not system:
|
with open(os.path.join(ENV_CONF_PATH, "system.yaml"), 'r', encoding='utf-8') as file:
|
# 加载JSON数据
|
config = yaml.safe_load(file)
|
system = SystemDataModel(id=SYSTEM_ID, title=config["smart_system"]["title"], desc=config["smart_system"]["desc"], version=config["smart_system"]["version"])
|
db.add(system)
|
db.commit()
|
db.refresh(system)
|
return system.to_dict()
|
|
|
async def services_update_system_data(db, title, desc, logo):
|
try:
|
if os.path.exists(os.path.join(APP_STATIC_PATH, logo)):
|
shutil.move(os.path.join(APP_STATIC_PATH, logo), os.path.join(APP_STATIC_PATH, "logo.png"))
|
system = db.query(SystemDataModel).filter_by(id=SYSTEM_ID).first()
|
system.title = title
|
system.desc = desc
|
system.updated_at = datetime.now()
|
db.commit()
|
db.refresh(system)
|
return system.to_dict()
|
except Exception as e:
|
logger.error(e)
|
return {}
|
|
|
async def service_upload_logo_image(file: UploadFile):
|
file_name = str(uuid.uuid4())
|
try:
|
save_path = os.path.join(APP_STATIC_PATH, file_name)
|
# 将上传的文件保存到本地
|
with open(save_path, "wb") as buffer:
|
shutil.copyfileobj(file.file, buffer)
|
file.file.close()
|
return file_name
|
except Exception as e:
|
logger.error(f"保存失败: {str(e)}")
|
return ""
|
|
|
async def services_update_system_license(db, license_code):
|
try:
|
with open(os.path.join(APP_SERVICE_PATH, "pom/public_key.pem"), "rb") as f:
|
public_key = serialization.load_pem_public_key(f.read())
|
license_data, signature = base64.b64decode(license_code).split(b"-----", 1)
|
# print(license_data)
|
public_key.verify(
|
signature,
|
license_data,
|
padding.PSS(
|
mgf=padding.MGF1(hashes.SHA256()),
|
salt_length=padding.PSS.MAX_LENGTH
|
),
|
hashes.SHA256()
|
)
|
license_dict = json.loads(license_data.decode('utf-8'))
|
# print(license_dict)
|
expiration_date = datetime.fromisoformat(license_dict['expiration_date'])
|
if expiration_date < datetime.now():
|
return "授权码已过期"
|
system = db.query(SystemDataModel).filter_by(id=SYSTEM_ID).first()
|
if license_dict['machine_id'] != get_machine_id() or system.machine_id != license_dict['machine_id']:
|
return "授权码无效"
|
system.license_code = license_code
|
system.expired_at = expiration_date
|
system.status = SYSTEM_STATUS_ON
|
system.updated_at = datetime.now()
|
# db.query(SystemDataModel).filter_by(id=SYSTEM_ID).update({"license_code": license_code, "expired_at": expiration_date, "status": 1, "updated_at": datetime.now()})
|
db.commit()
|
return ""
|
except Exception as e:
|
return f"验证失败: {str(e)}"
|