zhaoqingang
2025-04-01 b2e47e75a231baf5a7beca476256ab3d94e76c46
app/service/auth.py
@@ -1,8 +1,9 @@
import os.path
import re
import uuid
import base64
from datetime import datetime, timedelta
from typing import Type
from uuid import uuid4
from jwt import encode, decode, exceptions
from passlib.context import CryptContext
@@ -11,16 +12,17 @@
from Log import logger
from app.config.config import settings
from app.config.const import RAGFLOW, BISHENG, DIFY
from app.config.const import USER_STATSU_DELETE, APP_SERVICE_PATH
from app.models import RoleModel, GroupModel, TokenModel
from app.models.user_model import UserModel, UserAppModel
# from app.service.ragflow import RagflowService
# from app.service.service_token import get_new_token
from app.service.v2.app_register import AppRegisterDao
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
SECRET_KEY = settings.secret_key
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 3000
ACCESS_TOKEN_EXPIRE_MINUTES = 24*60
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@@ -34,7 +36,7 @@
def authenticate_user(db, username: str, password: str):
    user = db.query(UserModel).filter(UserModel.username == username).first()
    user = db.query(UserModel).filter(UserModel.username == username, UserModel.status != USER_STATSU_DELETE).first()
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
@@ -77,23 +79,21 @@
    try:
        hashed_password = pwd_context.hash(password)
        db_user = UserModel(username=username, hashed_password=hashed_password, email=email, sync_flag=sync_flag)
        pwd = db_user.encrypted_password(app_password)
        db_user.password = pwd
        # pwd = db_user.encrypted_password(app_password)
        # db_user.password = pwd
        db_user.roles = [db.query(RoleModel).filter(RoleModel.role_type == 2).first()]
        db_user.groups = [db.query(GroupModel).filter(GroupModel.group_type == 2).first()]
        db.add(db_user)
        db.commit()
        db.refresh(db_user)
        '''
        user_id = db_user.id
        for k, v in register_dict.items():
            await UserAppDao(db).update_and_insert_data(v.get("name"), pwd, v.get("email"), user_id, str(v.get("id")), k)
        '''
    except Exception as e:
        logger.error(e)
        # db.roolback()
        if user_id:
            db.query(UserModel).filter(UserModel.id == user_id).delete()
            db.commit
        db.rollback()
        return False
    return sync_flag
@@ -109,7 +109,7 @@
        return False
    return True
"""
async def update_user_info(db, user_id):
    app_register = AppRegisterDao(db).get_apps()
    register_dict = {}
@@ -143,6 +143,8 @@
    # db.refresh(db_user)
    # is_sava = await save_register_user(db, user.username, user.password, user.email, register_dict)
"""
class UserAppDao:
    def __init__(self, db: Session):
@@ -196,6 +198,25 @@
        return self.db.query(UserAppModel).filter_by(user_id=user_id).all()
async def password_rsa(password):
    with open(os.path.join(APP_SERVICE_PATH, "pom/private_key.pem"), "rb") as key_file:
        private_key = serialization.load_pem_private_key(
            key_file.read(),
            password=None,  # 如果私钥加密,请提供密码
            backend=default_backend()
        )
        # Base64 解码
        try:
            # 解密消息
            ciphertext = base64.b64decode(password)
            # 使用 PKCS#1 v1.5 填充解密
            plaintext = private_key.decrypt(
                ciphertext,
                padding.PKCS1v15()  # 改为 PKCS#1 v1.5 填充
            )
            return  plaintext.decode()
        except Exception as e:
            print(e)
            return ""