zhaoqingang
2025-04-01 6846a4c98a793e74ae17b47f04a0ff8b210aeb24
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from abc import ABC, abstractmethod
from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import PKCS1_v1_5
import base64
import rsa
 
 
# 定义抽象基类
class RSACrypto(ABC):
 
    @abstractmethod
    def encrypt(self, password: str) -> str:
        pass
 
    @abstractmethod
    def decrypt(self, encrypted_password: str) -> str:
        pass
 
 
# 实现 RagflowCrypto 类
class RagflowCrypto(RSACrypto):
 
    def __init__(self, public_key: str, private_key: str):
        self.public_key = public_key
        self.private_key = private_key
 
    def encrypt(self, password: str) -> str:
        rsa_key = RSA.importKey(self.public_key)
        cipher = PKCS1_v1_5.new(rsa_key)
        encrypted_password = cipher.encrypt(base64.b64encode(password.encode('utf-8')))
        return base64.b64encode(encrypted_password).decode('utf-8')
 
    def decrypt(self, encrypted_password: str) -> str:
        rsa_key = RSA.importKey(self.private_key)
        cipher = PKCS1_v1_5.new(rsa_key)
        encrypted_password_bytes = base64.b64decode(encrypted_password)
        decoded_password = cipher.decrypt(encrypted_password_bytes, "Fail to decrypt password!")
        return base64.b64decode(decoded_password).decode('utf-8')
 
 
# 实现 BishengCrypto 类
# class BishengCrypto(RSACrypto):
#
#     def __init__(self, public_key, private_key: str):
#         self.public_key = public_key
#         self.private_key = private_key
#
#     def encrypt(self, password: str) -> str:
#         rsa_key = RSA.importKey(self.public_key)
#         cipher = PKCS1_v1_5.new(rsa_key)
#         encrypted_password = cipher.encrypt(password.encode('utf-8'))
#         return base64.b64encode(encrypted_password).decode('utf-8')
#
#     def decrypt(self, encrypted_password: str) -> str:
#         rsa_key = RSA.importKey(self.private_key)
#         cipher = PKCS1_v1_5.new(rsa_key)
#         encrypted_password_bytes = base64.b64decode(encrypted_password)
#         decoded_password = cipher.decrypt(encrypted_password_bytes, "Fail to decrypt password!")
#         return decoded_password.decode('utf-8')
 
 
class BishengCrypto:
 
    def __init__(self, public_key: str, private_key: str):
        self.public_key = rsa.PublicKey.load_pkcs1(public_key.encode('utf-8'))
 
    def encrypt(self, password: str) -> str:
        encrypted_password = rsa.encrypt(password.encode('utf-8'), self.public_key)
        return base64.b64encode(encrypted_password).decode('utf-8')
 
    @classmethod
    def decrypt(cls, password: str, private_key: str) -> str:
        private_key = rsa.PrivateKey.load_pkcs1(private_key.encode('utf-8'))
        encrypted_password_bytes = base64.b64decode(password)
        return rsa.decrypt(encrypted_password_bytes, private_key).decode('utf-8')