From 08c8e8c9a4d65677de6a493446a605d70efee631 Mon Sep 17 00:00:00 2001 From: zhaoqingang <zhaoqg0118@163.com> Date: 星期二, 10 十二月 2024 16:32:07 +0800 Subject: [PATCH] 12.10 16 --- app/service/auth.py | 109 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 files changed, 106 insertions(+), 3 deletions(-) diff --git a/app/service/auth.py b/app/service/auth.py index 3ebccb1..d0436f8 100644 --- a/app/service/auth.py +++ b/app/service/auth.py @@ -1,14 +1,19 @@ +import re from datetime import datetime, timedelta +from typing import Type + from jwt import encode, decode, exceptions from passlib.context import CryptContext from fastapi import HTTPException, status +from sqlalchemy.orm import Session +from Log import logger from app.config.config import settings -from app.models.user_model import UserModel +from app.models.user_model import UserModel, UserAppModel SECRET_KEY = settings.secret_key ALGORITHM = "HS256" -ACCESS_TOKEN_EXPIRE_MINUTES = 30 +ACCESS_TOKEN_EXPIRE_MINUTES = 3000 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") @@ -35,7 +40,7 @@ if expires_delta: expire = datetime.utcnow() + expires_delta else: - expire = datetime.utcnow() + timedelta(minutes=15) + expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt @@ -47,3 +52,101 @@ return payload except exceptions.DecodeError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials") + + +def is_valid_password(password: str) -> bool: + if len(password) <= 8: + return False + has_digit = re.search(r'[0-9]', password) + has_letter = re.search(r'[A-Za-z]', password) + + # 濡傛灉瀵嗙爜鍖呭惈鏁板瓧鍜屽瓧姣嶏紝鍒欒繑鍥濼rue锛屽惁鍒欒繑鍥濶one + return has_digit is not None and has_letter is not None + + +async def save_register_user(db, username, password, email, register_dict): + user_id = "" + try: + hashed_password = pwd_context.hash(password) + db_user = UserModel(username=username, hashed_password=hashed_password, email=email) + pwd = db_user.encrypted_password(password) + db_user.password = pwd + db.add(db_user) + db.add(db_user) + db.commit() + db.refresh(db_user) + user_id = db_user.id + for k, v in register_dict.items(): + await UserAppDao(db).update_and_insert_data(v.get("name"), pwd, v.get("email"), user_id, str(v.get("id")), k) + + except Exception as e: + logger.error(e) + # db.roolback() + if user_id: + db.query(UserModel).filter(UserModel.id == user_id).delete() + return False + return True + + +async def update_user_token(db, user_id, token_dict): + try: + for k, v in token_dict.items(): + await UserAppDao(db).update_user_app_data({"user_id": user_id, "app_type": k}, + {"access_token": v, "token_at": datetime.now()}) + + except Exception as e: + logger.error(e) + return False + return True + + +class UserAppDao: + def __init__(self, db: Session): + self.db = db + + async def get_data_by_id(self, user_id: int, app_type: int) -> Type[UserAppModel] | None: + session = self.db.query(UserAppModel).filter_by(user_id=user_id, app_type=app_type).first() + return session + + async def update_user_app_data(self, query: dict, update_data: dict): + + logger.error("鏇存柊鏁版嵁df update_app_data---------------------------") + try: + self.db.query(UserAppModel).filter_by(**query).update(update_data) + self.db.commit() + except Exception as e: + logger.error(e) + self.db.rollback() + raise Exception("鏇存柊澶辫触锛�") + + async def insert_user_app_data(self, username: str, password: str, email: str, user_id: int, app_id: str, + app_type: int): + logger.error("鏂板鏁版嵁df insert_user_app_data---------------------------") + new_session = UserAppModel( + username=username, + password=password, + email=email, + user_id=user_id, + app_id=app_id, + app_type=app_type, + ) + self.db.add(new_session) + self.db.commit() + self.db.refresh(new_session) + return new_session + + async def update_and_insert_data(self, username: str, password: str, email: str, user_id: int, app_id: str, + app_type: int): + + logger.error("鏇存柊鎴栬�呮坊鍔犳暟鎹� update_and_insert_token---------------------------") + token_boj = await self.get_data_by_id(user_id, app_type) + if token_boj: + await self.update_user_app_data({"id": token_boj.id}, {"username": username, + "password": password, "email": email, + "updated_at": datetime.now(), + }) + else: + await self.insert_user_app_data(username, password, email, user_id, app_id, app_type) + + async def get_user_datas(self, user_id: int): + return self.db.query(UserAppModel).filter_by(user_id=user_id).all() -- Gitblit v1.8.0