from abc import ABC, abstractmethod from Cryptodome.PublicKey import RSA from Cryptodome.Cipher import PKCS1_v1_5 import base64 import rsa # 定义抽象基类 class RSACrypto(ABC): @abstractmethod def encrypt(self, password: str) -> str: pass @abstractmethod def decrypt(self, encrypted_password: str) -> str: pass # 实现 RagflowCrypto 类 class RagflowCrypto(RSACrypto): def __init__(self, public_key: str, private_key: str): self.public_key = public_key self.private_key = private_key def encrypt(self, password: str) -> str: rsa_key = RSA.importKey(self.public_key) cipher = PKCS1_v1_5.new(rsa_key) encrypted_password = cipher.encrypt(base64.b64encode(password.encode('utf-8'))) return base64.b64encode(encrypted_password).decode('utf-8') def decrypt(self, encrypted_password: str) -> str: rsa_key = RSA.importKey(self.private_key) cipher = PKCS1_v1_5.new(rsa_key) encrypted_password_bytes = base64.b64decode(encrypted_password) decoded_password = cipher.decrypt(encrypted_password_bytes, "Fail to decrypt password!") return base64.b64decode(decoded_password).decode('utf-8') # 实现 BishengCrypto 类 # class BishengCrypto(RSACrypto): # # def __init__(self, public_key, private_key: str): # self.public_key = public_key # self.private_key = private_key # # def encrypt(self, password: str) -> str: # rsa_key = RSA.importKey(self.public_key) # cipher = PKCS1_v1_5.new(rsa_key) # encrypted_password = cipher.encrypt(password.encode('utf-8')) # return base64.b64encode(encrypted_password).decode('utf-8') # # def decrypt(self, encrypted_password: str) -> str: # rsa_key = RSA.importKey(self.private_key) # cipher = PKCS1_v1_5.new(rsa_key) # encrypted_password_bytes = base64.b64decode(encrypted_password) # decoded_password = cipher.decrypt(encrypted_password_bytes, "Fail to decrypt password!") # return decoded_password.decode('utf-8') class BishengCrypto: def __init__(self, public_key: str, private_key: str): self.public_key = rsa.PublicKey.load_pkcs1(public_key.encode('utf-8')) def encrypt(self, password: str) -> str: encrypted_password = rsa.encrypt(password.encode('utf-8'), self.public_key) return base64.b64encode(encrypted_password).decode('utf-8') @classmethod def decrypt(cls, password: str, private_key: str) -> str: private_key = rsa.PrivateKey.load_pkcs1(private_key.encode('utf-8')) encrypted_password_bytes = base64.b64decode(password) return rsa.decrypt(encrypted_password_bytes, private_key).decode('utf-8')