m
zhaoqingang
2024-12-09 52ba4076f5ad55fdf3239a33a2a376eaa0e0dea5
app/service/auth.py
@@ -1,10 +1,15 @@
import re
from datetime import datetime, timedelta
from typing import Type
from jwt import encode, decode, exceptions
from passlib.context import CryptContext
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from Log import logger
from app.config.config import settings
from app.models.user_model import UserModel
from app.models.user_model import UserModel, UserAppModel
SECRET_KEY = settings.secret_key
ALGORITHM = "HS256"
@@ -47,3 +52,98 @@
        return payload
    except exceptions.DecodeError:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
def is_valid_password(password: str) -> bool:
    if len(password) <= 8:
        return False
    has_digit = re.search(r'[0-9]', password)
    has_letter = re.search(r'[A-Za-z]', password)
    # 如果密码包含数字和字母,则返回True,否则返回None
    return has_digit is not None and has_letter is not None
async def save_register_user(db, username, password, email, register_dict):
    user_id = ""
    try:
        hashed_password = pwd_context.hash(password)
        db_user = UserModel(username=username, hashed_password=hashed_password, email=email)
        pwd = db_user.encrypted_password(password)
        db_user.password = pwd
        db.add(db_user)
        db.add(db_user)
        db.commit()
        db.refresh(db_user)
        user_id = db_user.id
        for k, v in register_dict.items():
            UserAppDao(db).update_and_insert_token(v.get("name"), pwd, v.get("email"), user_id, str(v.get("id")), k)
    except Exception as e:
        logger.error(e)
        # db.roolback()
        if user_id:
            db.query(UserModel).filter(UserModel.id == user_id).delete()
        return False
    return True
async def update_user_token(db, user_id, token_dict):
    try:
        for k, v in token_dict.items():
            UserAppDao(db).update_user_app_data({"user_id": user_id, "app_type": k},
                                                {"access_token": v, "token_at": datetime.now()})
    except Exception as e:
        logger.error(e)
        return False
    return True
class UserAppDao:
    def __init__(self, db: Session):
        self.db = db
    def get_data_by_id(self, user_id: int, app_type: int) -> Type[UserAppModel] | None:
        session = self.db.query(UserAppModel).filter_by(user_id=user_id, app_type=app_type).first()
        return session
    def update_user_app_data(self, query: int, update_data: str):
        logger.error("更新数据df update_app_data---------------------------")
        try:
            self.db.query(UserAppModel).filter_by(**query).update(update_data)
            self.db.commit()
        except Exception as e:
            logger.error(e)
            self.db.rollback()
            raise Exception("更新失败!")
    def insert_user_app_data(self, username: str, password: str, email: str, user_id: int, app_id: str, app_type: int):
        logger.error("新增数据df insert_user_app_data---------------------------")
        new_session = UserAppModel(
            username=username,
            password=password,
            email=email,
            user_id=user_id,
            app_id=app_id,
            app_type=app_type,
        )
        self.db.add(new_session)
        self.db.commit()
        self.db.refresh(new_session)
        return new_session
    def update_and_insert_token(self, username: str, password: str, email: str, user_id: int, app_id: str,
                                app_type: int):
        logger.error("更新或者添加数据 update_and_insert_token---------------------------")
        token_boj = self.get_data_by_id(user_id, app_type)
        if token_boj:
            self.update_user_app_data({"id": token_boj.id}, {"username": username,
                                                             "password": password, "email": email, "username": username,
                                                             "updated_at": datetime.now(),
                                                             })
        else:
            self.insert_user_app_data(username, password, email, user_id, app_id, app_type)