| | |
| | | import os.path |
| | | import re |
| | | import uuid |
| | | import base64 |
| | | from datetime import datetime, timedelta |
| | | from typing import Type |
| | | from uuid import uuid4 |
| | |
| | | |
| | | from Log import logger |
| | | from app.config.config import settings |
| | | from app.config.const import RAGFLOW, BISHENG, DIFY |
| | | from app.config.const import RAGFLOW, BISHENG, DIFY, USER_STATSU_DELETE, APP_SERVICE_PATH |
| | | from app.models import RoleModel, GroupModel, TokenModel |
| | | from app.models.user_model import UserModel, UserAppModel |
| | | # from app.service.ragflow import RagflowService |
| | | # from app.service.service_token import get_new_token |
| | | from app.service.v2.app_register import AppRegisterDao |
| | | |
| | | from cryptography.hazmat.backends import default_backend |
| | | from cryptography.hazmat.primitives import serialization |
| | | from cryptography.hazmat.primitives.asymmetric import padding |
| | | |
| | | SECRET_KEY = settings.secret_key |
| | | ALGORITHM = "HS256" |
| | |
| | | |
| | | |
| | | def authenticate_user(db, username: str, password: str): |
| | | user = db.query(UserModel).filter(UserModel.username == username).first() |
| | | user = db.query(UserModel).filter(UserModel.username == username, UserModel.status != USER_STATSU_DELETE).first() |
| | | if not user: |
| | | return False |
| | | if not verify_password(password, user.hashed_password): |
| | |
| | | return self.db.query(UserAppModel).filter_by(user_id=user_id).all() |
| | | |
| | | |
| | | |
| | | async def password_rsa(password): |
| | | with open(os.path.join(APP_SERVICE_PATH, "pom/private_key.pem"), "rb") as key_file: |
| | | private_key = serialization.load_pem_private_key( |
| | | key_file.read(), |
| | | password=None, # 如果私钥加密,请提供密码 |
| | | backend=default_backend() |
| | | ) |
| | | # Base64 解码 |
| | | try: |
| | | # 解密消息 |
| | | ciphertext = base64.b64decode(password) |
| | | # 使用 PKCS#1 v1.5 填充解密 |
| | | plaintext = private_key.decrypt( |
| | | ciphertext, |
| | | padding.PKCS1v15() # 改为 PKCS#1 v1.5 填充 |
| | | ) |
| | | return plaintext.decode() |
| | | except Exception as e: |
| | | print(e) |
| | | return "" |
| | | |
| | | |