from abc import ABC, abstractmethod
|
from Cryptodome.PublicKey import RSA
|
from Cryptodome.Cipher import PKCS1_v1_5
|
import base64
|
import rsa
|
|
|
# 定义抽象基类
|
class RSACrypto(ABC):
|
|
@abstractmethod
|
def encrypt(self, password: str) -> str:
|
pass
|
|
@abstractmethod
|
def decrypt(self, encrypted_password: str) -> str:
|
pass
|
|
|
# 实现 RagflowCrypto 类
|
class RagflowCrypto(RSACrypto):
|
|
def __init__(self, public_key: str, private_key: str):
|
self.public_key = public_key
|
self.private_key = private_key
|
|
def encrypt(self, password: str) -> str:
|
rsa_key = RSA.importKey(self.public_key)
|
cipher = PKCS1_v1_5.new(rsa_key)
|
encrypted_password = cipher.encrypt(base64.b64encode(password.encode('utf-8')))
|
return base64.b64encode(encrypted_password).decode('utf-8')
|
|
def decrypt(self, encrypted_password: str) -> str:
|
rsa_key = RSA.importKey(self.private_key)
|
cipher = PKCS1_v1_5.new(rsa_key)
|
encrypted_password_bytes = base64.b64decode(encrypted_password)
|
decoded_password = cipher.decrypt(encrypted_password_bytes, "Fail to decrypt password!")
|
return base64.b64decode(decoded_password).decode('utf-8')
|
|
|
# 实现 BishengCrypto 类
|
# class BishengCrypto(RSACrypto):
|
#
|
# def __init__(self, public_key, private_key: str):
|
# self.public_key = public_key
|
# self.private_key = private_key
|
#
|
# def encrypt(self, password: str) -> str:
|
# rsa_key = RSA.importKey(self.public_key)
|
# cipher = PKCS1_v1_5.new(rsa_key)
|
# encrypted_password = cipher.encrypt(password.encode('utf-8'))
|
# return base64.b64encode(encrypted_password).decode('utf-8')
|
#
|
# def decrypt(self, encrypted_password: str) -> str:
|
# rsa_key = RSA.importKey(self.private_key)
|
# cipher = PKCS1_v1_5.new(rsa_key)
|
# encrypted_password_bytes = base64.b64decode(encrypted_password)
|
# decoded_password = cipher.decrypt(encrypted_password_bytes, "Fail to decrypt password!")
|
# return decoded_password.decode('utf-8')
|
|
|
class BishengCrypto:
|
|
def __init__(self, public_key: str, private_key: str):
|
self.public_key = rsa.PublicKey.load_pkcs1(public_key.encode('utf-8'))
|
|
def encrypt(self, password: str) -> str:
|
encrypted_password = rsa.encrypt(password.encode('utf-8'), self.public_key)
|
return base64.b64encode(encrypted_password).decode('utf-8')
|
|
@classmethod
|
def decrypt(cls, password: str, private_key: str) -> str:
|
private_key = rsa.PrivateKey.load_pkcs1(private_key.encode('utf-8'))
|
encrypted_password_bytes = base64.b64decode(password)
|
return rsa.decrypt(encrypted_password_bytes, private_key).decode('utf-8')
|