From 52ba4076f5ad55fdf3239a33a2a376eaa0e0dea5 Mon Sep 17 00:00:00 2001
From: zhaoqingang <zhaoqg0118@163.com>
Date: 星期一, 09 十二月 2024 17:53:01 +0800
Subject: [PATCH] m
---
app/service/auth.py | 102 ++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 101 insertions(+), 1 deletions(-)
diff --git a/app/service/auth.py b/app/service/auth.py
index 896b8d9..d843adb 100644
--- a/app/service/auth.py
+++ b/app/service/auth.py
@@ -1,10 +1,15 @@
+import re
from datetime import datetime, timedelta
+from typing import Type
+
from jwt import encode, decode, exceptions
from passlib.context import CryptContext
from fastapi import HTTPException, status
+from sqlalchemy.orm import Session
+from Log import logger
from app.config.config import settings
-from app.models.user_model import UserModel
+from app.models.user_model import UserModel, UserAppModel
SECRET_KEY = settings.secret_key
ALGORITHM = "HS256"
@@ -47,3 +52,98 @@
return payload
except exceptions.DecodeError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
+
+
+def is_valid_password(password: str) -> bool:
+ if len(password) <= 8:
+ return False
+ has_digit = re.search(r'[0-9]', password)
+ has_letter = re.search(r'[A-Za-z]', password)
+
+ # 濡傛灉瀵嗙爜鍖呭惈鏁板瓧鍜屽瓧姣嶏紝鍒欒繑鍥濼rue锛屽惁鍒欒繑鍥濶one
+ return has_digit is not None and has_letter is not None
+
+
+async def save_register_user(db, username, password, email, register_dict):
+ user_id = ""
+ try:
+ hashed_password = pwd_context.hash(password)
+ db_user = UserModel(username=username, hashed_password=hashed_password, email=email)
+ pwd = db_user.encrypted_password(password)
+ db_user.password = pwd
+ db.add(db_user)
+ db.add(db_user)
+ db.commit()
+ db.refresh(db_user)
+ user_id = db_user.id
+ for k, v in register_dict.items():
+ UserAppDao(db).update_and_insert_token(v.get("name"), pwd, v.get("email"), user_id, str(v.get("id")), k)
+
+ except Exception as e:
+ logger.error(e)
+ # db.roolback()
+ if user_id:
+ db.query(UserModel).filter(UserModel.id == user_id).delete()
+ return False
+ return True
+
+
+async def update_user_token(db, user_id, token_dict):
+
+ try:
+ for k, v in token_dict.items():
+ UserAppDao(db).update_user_app_data({"user_id": user_id, "app_type": k},
+ {"access_token": v, "token_at": datetime.now()})
+
+ except Exception as e:
+ logger.error(e)
+ return False
+ return True
+
+
+class UserAppDao:
+ def __init__(self, db: Session):
+ self.db = db
+
+ def get_data_by_id(self, user_id: int, app_type: int) -> Type[UserAppModel] | None:
+ session = self.db.query(UserAppModel).filter_by(user_id=user_id, app_type=app_type).first()
+ return session
+
+ def update_user_app_data(self, query: int, update_data: str):
+
+ logger.error("鏇存柊鏁版嵁df update_app_data---------------------------")
+ try:
+ self.db.query(UserAppModel).filter_by(**query).update(update_data)
+ self.db.commit()
+ except Exception as e:
+ logger.error(e)
+ self.db.rollback()
+ raise Exception("鏇存柊澶辫触锛�")
+
+ def insert_user_app_data(self, username: str, password: str, email: str, user_id: int, app_id: str, app_type: int):
+ logger.error("鏂板鏁版嵁df insert_user_app_data---------------------------")
+ new_session = UserAppModel(
+ username=username,
+ password=password,
+ email=email,
+ user_id=user_id,
+ app_id=app_id,
+ app_type=app_type,
+ )
+ self.db.add(new_session)
+ self.db.commit()
+ self.db.refresh(new_session)
+ return new_session
+
+ def update_and_insert_token(self, username: str, password: str, email: str, user_id: int, app_id: str,
+ app_type: int):
+
+ logger.error("鏇存柊鎴栬�呮坊鍔犳暟鎹� update_and_insert_token---------------------------")
+ token_boj = self.get_data_by_id(user_id, app_type)
+ if token_boj:
+ self.update_user_app_data({"id": token_boj.id}, {"username": username,
+ "password": password, "email": email, "username": username,
+ "updated_at": datetime.now(),
+ })
+ else:
+ self.insert_user_app_data(username, password, email, user_id, app_id, app_type)
--
Gitblit v1.8.0