zhaoqingang
2024-12-11 a791022ff1311e1fb76930c398d6ff91036d0456
app/service/auth.py
@@ -1,14 +1,22 @@
import re
from datetime import datetime, timedelta
from typing import Type
from jwt import encode, decode, exceptions
from passlib.context import CryptContext
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from Log import logger
from app.config.config import settings
from app.models.user_model import UserModel
from app.config.const import RAGFLOW, BISHENG, DIFY
from app.models import RoleModel, GroupModel
from app.models.user_model import UserModel, UserAppModel
from app.service.v2.app_register import AppRegisterDao
SECRET_KEY = settings.secret_key
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
ACCESS_TOKEN_EXPIRE_MINUTES = 3000
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@@ -35,7 +43,7 @@
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    encoded_jwt = encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt
@@ -47,3 +55,137 @@
        return payload
    except exceptions.DecodeError:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
def is_valid_password(password: str) -> bool:
    if len(password) <= 8:
        return False
    has_digit = re.search(r'[0-9]', password)
    has_letter = re.search(r'[A-Za-z]', password)
    # 如果密码包含数字和字母,则返回True,否则返回None
    return has_digit is not None and has_letter is not None
async def save_register_user(db, username, password, email, register_dict):
    user_id = ""
    try:
        hashed_password = pwd_context.hash(password)
        db_user = UserModel(username=username, hashed_password=hashed_password, email=email)
        pwd = db_user.encrypted_password(password)
        db_user.password = pwd
        db_user.roles = [db.query(RoleModel).filter(RoleModel.role_type == 2).first()]
        db_user.groups = [db.query(GroupModel).filter(GroupModel.group_type == 2).first()]
        db.add(db_user)
        db.commit()
        db.refresh(db_user)
        user_id = db_user.id
        for k, v in register_dict.items():
            await UserAppDao(db).update_and_insert_data(v.get("name"), pwd, v.get("email"), user_id, str(v.get("id")), k)
    except Exception as e:
        logger.error(e)
        # db.roolback()
        if user_id:
            db.query(UserModel).filter(UserModel.id == user_id).delete()
            db.commit
        return False
    return True
async def update_user_token(db, user_id, token_dict):
    try:
        for k, v in token_dict.items():
            await UserAppDao(db).update_user_app_data({"user_id": user_id, "app_type": k},
                                                      {"access_token": v, "token_at": datetime.now()})
    except Exception as e:
        logger.error(e)
        return False
    return True
async def update_user_info(db, user_id):
    app_register = AppRegisterDao(db).get_apps()
    register_dict = {}
    user = db.query(UserModel).filter(UserModel.id==user_id).first()
    for app in app_register:
        if app["id"] == RAGFLOW:
            register_dict[app['id']] = {"id": user.ragflow_id, "name": user.username, "email": f"{user.username}@example.com"}
        elif app["id"] == BISHENG:
            register_dict[app['id']] = {"id": user.bisheng_id, "name": user.username, "email": ""}
        elif app["id"] == DIFY:
            register_dict[app['id']] = {"id": "", "name": user.username, "email": ""}
        else:
            logger.error("未知注册应用---")
            continue
        try:
            for k, v in register_dict.items():
                await UserAppDao(db).update_and_insert_data(v.get("name"), user.password, v.get("email"), user_id,
                                                            str(v.get("id")), k)
        except Exception as e:
            logger.error(e)
    # 存储用户信息
    # hashed_password = pwd_context.hash(user.password)
    # db_user = UserModel(username=user.username, hashed_password=hashed_password, email=user.email)
    # db_user.password = db_user.encrypted_password(user.password)
    # for k, v in register_dict.items():
    #     setattr(db_user, k.replace("app", "id"), v)
    # db.add(db_user)
    # db.commit()
    # db.refresh(db_user)
    is_sava = await save_register_user(db, user.username, user.password, user.email, register_dict)
class UserAppDao:
    def __init__(self, db: Session):
        self.db = db
    async def get_data_by_id(self, user_id: int, app_type: int) -> Type[UserAppModel] | None:
        session = self.db.query(UserAppModel).filter_by(user_id=user_id, app_type=app_type).first()
        return session
    async def update_user_app_data(self, query: dict, update_data: dict):
        logger.error("更新数据df update_app_data---------------------------")
        try:
            self.db.query(UserAppModel).filter_by(**query).update(update_data)
            self.db.commit()
        except Exception as e:
            logger.error(e)
            self.db.rollback()
            raise Exception("更新失败!")
    async def insert_user_app_data(self, username: str, password: str, email: str, user_id: int, app_id: str,
                                   app_type: int):
        logger.error("新增数据df insert_user_app_data---------------------------")
        new_session = UserAppModel(
            username=username,
            password=password,
            email=email,
            user_id=user_id,
            app_id=app_id,
            app_type=app_type,
        )
        self.db.add(new_session)
        self.db.commit()
        self.db.refresh(new_session)
        return new_session
    async def update_and_insert_data(self, username: str, password: str, email: str, user_id: int, app_id: str,
                                     app_type: int):
        logger.error("更新或者添加数据 update_and_insert_token---------------------------")
        token_boj = await self.get_data_by_id(user_id, app_type)
        if token_boj:
            await self.update_user_app_data({"id": token_boj.id}, {"username": username,
                                                                   "password": password, "email": email,
                                                                   "updated_at": datetime.now(),
                                                                   })
        else:
            await self.insert_user_app_data(username, password, email, user_id, app_id, app_type)
    async def get_user_datas(self, user_id: int):
        return self.db.query(UserAppModel).filter_by(user_id=user_id).all()