zhaoqingang
2025-04-01 b2e47e75a231baf5a7beca476256ab3d94e76c46
app/service/auth.py
@@ -1,4 +1,7 @@
import os.path
import re
import uuid
import base64
from datetime import datetime, timedelta
from typing import Type
@@ -9,16 +12,17 @@
from Log import logger
from app.config.config import settings
from app.config.const import RAGFLOW, BISHENG, DIFY
from app.config.const import USER_STATSU_DELETE, APP_SERVICE_PATH
from app.models import RoleModel, GroupModel, TokenModel
from app.models.user_model import UserModel, UserAppModel
# from app.service.ragflow import RagflowService
# from app.service.service_token import get_new_token
from app.service.v2.app_register import AppRegisterDao
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
SECRET_KEY = settings.secret_key
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 3000
ACCESS_TOKEN_EXPIRE_MINUTES = 24*60
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@@ -32,7 +36,7 @@
def authenticate_user(db, username: str, password: str):
    user = db.query(UserModel).filter(UserModel.username == username).first()
    user = db.query(UserModel).filter(UserModel.username == username, UserModel.status != USER_STATSU_DELETE).first()
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
@@ -60,7 +64,7 @@
def is_valid_password(password: str) -> bool:
    if len(password) <= 8:
    if len(password) < 8:
        return False
    has_digit = re.search(r'[0-9]', password)
    has_letter = re.search(r'[A-Za-z]', password)
@@ -69,30 +73,29 @@
    return has_digit is not None and has_letter is not None
async def save_register_user(db, username, password, email, register_dict):
async def save_register_user(db, username, password, email, app_password, register_dict):
    user_id = ""
    sync_flag = str(uuid.uuid4())
    try:
        hashed_password = pwd_context.hash(password)
        db_user = UserModel(username=username, hashed_password=hashed_password, email=email)
        pwd = db_user.encrypted_password(password)
        db_user.password = pwd
        db_user = UserModel(username=username, hashed_password=hashed_password, email=email, sync_flag=sync_flag)
        # pwd = db_user.encrypted_password(app_password)
        # db_user.password = pwd
        db_user.roles = [db.query(RoleModel).filter(RoleModel.role_type == 2).first()]
        db_user.groups = [db.query(GroupModel).filter(GroupModel.group_type == 2).first()]
        db.add(db_user)
        db.commit()
        db.refresh(db_user)
        '''
        user_id = db_user.id
        for k, v in register_dict.items():
            await UserAppDao(db).update_and_insert_data(v.get("name"), pwd, v.get("email"), user_id, str(v.get("id")), k)
        '''
    except Exception as e:
        logger.error(e)
        # db.roolback()
        if user_id:
            db.query(UserModel).filter(UserModel.id == user_id).delete()
            db.commit
        db.rollback()
        return False
    return user_id
    return sync_flag
async def update_user_token(db, user_id, token_dict):
@@ -106,7 +109,7 @@
        return False
    return True
"""
async def update_user_info(db, user_id):
    app_register = AppRegisterDao(db).get_apps()
    register_dict = {}
@@ -139,7 +142,9 @@
    # db.commit()
    # db.refresh(db_user)
    is_sava = await save_register_user(db, user.username, user.password, user.email, register_dict)
    # is_sava = await save_register_user(db, user.username, user.password, user.email, register_dict)
"""
class UserAppDao:
    def __init__(self, db: Session):
@@ -193,6 +198,25 @@
        return self.db.query(UserAppModel).filter_by(user_id=user_id).all()
async def password_rsa(password):
    with open(os.path.join(APP_SERVICE_PATH, "pom/private_key.pem"), "rb") as key_file:
        private_key = serialization.load_pem_private_key(
            key_file.read(),
            password=None,  # 如果私钥加密,请提供密码
            backend=default_backend()
        )
        # Base64 解码
        try:
            # 解密消息
            ciphertext = base64.b64decode(password)
            # 使用 PKCS#1 v1.5 填充解密
            plaintext = private_key.decrypt(
                ciphertext,
                padding.PKCS1v15()  # 改为 PKCS#1 v1.5 填充
            )
            return  plaintext.decode()
        except Exception as e:
            print(e)
            return ""