zhaoqingang
2025-04-10 abb91124a4372b0efe5ab1b7aa25859c635d30eb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import os
import shutil
import uuid
import yaml
 
from datetime import datetime
from fastapi import UploadFile
from Log import logger
from app.config.const import SYSTEM_ID, ENV_CONF_PATH, APP_STATIC_PATH, APP_SERVICE_PATH, SYSTEM_STATUS_ON
from app.models.system import SystemDataModel
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
import base64
import json
 
from app.utils.common import get_machine_id
 
 
async def services_get_system_data(db):
    system = db.query(SystemDataModel).filter_by(id=SYSTEM_ID).first()
    if not system:
        with open(os.path.join(ENV_CONF_PATH, "system.yaml"), 'r', encoding='utf-8') as file:
            # 加载JSON数据
            config = yaml.safe_load(file)
            system = SystemDataModel(id=SYSTEM_ID, title=config["smart_system"]["title"], desc=config["smart_system"]["desc"], version=config["smart_system"]["version"])
            db.add(system)
            db.commit()
            db.refresh(system)
    return system.to_dict()
 
 
async def services_update_system_data(db, title, desc, logo):
    try:
        if os.path.exists(os.path.join(APP_STATIC_PATH, logo)):
            shutil.move(os.path.join(APP_STATIC_PATH, logo), os.path.join(APP_STATIC_PATH, "logo.png"))
        system = db.query(SystemDataModel).filter_by(id=SYSTEM_ID).first()
        system.title = title
        system.desc = desc
        system.updated_at = datetime.now()
        db.commit()
        db.refresh(system)
        return system.to_dict()
    except Exception as e:
        logger.error(e)
        return {}
 
 
async def service_upload_logo_image(file: UploadFile):
    file_name = str(uuid.uuid4())
    try:
        save_path = os.path.join(APP_STATIC_PATH, file_name)
        # 将上传的文件保存到本地
        with open(save_path, "wb") as buffer:
            shutil.copyfileobj(file.file, buffer)
        file.file.close()
        return file_name
    except Exception as e:
        logger.error(f"保存失败: {str(e)}")
        return ""
 
 
async def services_update_system_license(db, license_code):
    try:
        with open(os.path.join(APP_SERVICE_PATH, "pom/public_key.pem"), "rb") as f:
            public_key = serialization.load_pem_public_key(f.read())
        license_data, signature = base64.b64decode(license_code).split(b"-----", 1)
        # print(license_data)
        public_key.verify(
            signature,
            license_data,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
        license_dict = json.loads(license_data.decode('utf-8'))
        # print(license_dict)
        expiration_date = datetime.fromisoformat(license_dict['expiration_date'])
        if expiration_date < datetime.now():
            return "授权码已过期"
        system = db.query(SystemDataModel).filter_by(id=SYSTEM_ID).first()
        if license_dict['machine_id'] != get_machine_id() or system.machine_id != license_dict['machine_id']:
            return "授权码无效"
        system.license_code = license_code
        system.expired_at = expiration_date
        system.status = SYSTEM_STATUS_ON
        system.updated_at = datetime.now()
        # db.query(SystemDataModel).filter_by(id=SYSTEM_ID).update({"license_code": license_code, "expired_at": expiration_date, "status": 1, "updated_at": datetime.now()})
        db.commit()
        return ""
    except Exception as e:
        return f"验证失败: {str(e)}"