import asyncio import base64 import json import os from datetime import datetime from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives import hashes from app.config.const import SYSTEM_ID, SYSTEM_STATUS_EXPIRED, SYSTEM_STATUS_ON, APP_SERVICE_PATH from app.models import SystemDataModel from app.models.base_model import SessionLocal from app.utils.common import get_machine_id async def sync_system_license(): print("-------------------------------------------") db = SessionLocal() try: system = db.query(SystemDataModel).filter_by(id=SYSTEM_ID).first() if not system: return machine_id = get_machine_id() if system.machine_id: if system.machine_id != machine_id: system.machine_id = machine_id if system.status == SYSTEM_STATUS_ON: system.status = SYSTEM_STATUS_EXPIRED else: if system.status == SYSTEM_STATUS_ON: if not system.license_code or system.expired_at < datetime.now(): system.status = SYSTEM_STATUS_EXPIRED else: try: with open(os.path.join(APP_SERVICE_PATH, "pom/public_key.pem"), "rb") as f: public_key = serialization.load_pem_public_key(f.read()) license_data, signature = base64.b64decode(system.license_code).split(b"-----", 1) # print(license_data) public_key.verify( signature, license_data, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) license_dict = json.loads(license_data.decode('utf-8')) # print(license_dict) expiration_date = datetime.fromisoformat(license_dict['expiration_date']) system.expired_at = expiration_date if expiration_date < datetime.now(): system.status = SYSTEM_STATUS_EXPIRED except Exception as e: print(e) system.status = SYSTEM_STATUS_EXPIRED else: if system.status == SYSTEM_STATUS_ON: system.status = SYSTEM_STATUS_EXPIRED db.commit() except Exception as e: print(e) finally: db.close() def sync_resource(): asyncio.run(sync_system_license())