From 08c8e8c9a4d65677de6a493446a605d70efee631 Mon Sep 17 00:00:00 2001
From: zhaoqingang <zhaoqg0118@163.com>
Date: 星期二, 10 十二月 2024 16:32:07 +0800
Subject: [PATCH] 12.10 16

---
 app/service/auth.py |  109 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 106 insertions(+), 3 deletions(-)

diff --git a/app/service/auth.py b/app/service/auth.py
index 3ebccb1..d0436f8 100644
--- a/app/service/auth.py
+++ b/app/service/auth.py
@@ -1,14 +1,19 @@
+import re
 from datetime import datetime, timedelta
+from typing import Type
+
 from jwt import encode, decode, exceptions
 from passlib.context import CryptContext
 from fastapi import HTTPException, status
+from sqlalchemy.orm import Session
 
+from Log import logger
 from app.config.config import settings
-from app.models.user_model import UserModel
+from app.models.user_model import UserModel, UserAppModel
 
 SECRET_KEY = settings.secret_key
 ALGORITHM = "HS256"
-ACCESS_TOKEN_EXPIRE_MINUTES = 30
+ACCESS_TOKEN_EXPIRE_MINUTES = 3000
 
 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
 
@@ -35,7 +40,7 @@
     if expires_delta:
         expire = datetime.utcnow() + expires_delta
     else:
-        expire = datetime.utcnow() + timedelta(minutes=15)
+        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
     to_encode.update({"exp": expire})
     encoded_jwt = encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
     return encoded_jwt
@@ -47,3 +52,101 @@
         return payload
     except exceptions.DecodeError:
         raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
+
+
+def is_valid_password(password: str) -> bool:
+    if len(password) <= 8:
+        return False
+    has_digit = re.search(r'[0-9]', password)
+    has_letter = re.search(r'[A-Za-z]', password)
+
+    # 濡傛灉瀵嗙爜鍖呭惈鏁板瓧鍜屽瓧姣嶏紝鍒欒繑鍥濼rue锛屽惁鍒欒繑鍥濶one
+    return has_digit is not None and has_letter is not None
+
+
+async def save_register_user(db, username, password, email, register_dict):
+    user_id = ""
+    try:
+        hashed_password = pwd_context.hash(password)
+        db_user = UserModel(username=username, hashed_password=hashed_password, email=email)
+        pwd = db_user.encrypted_password(password)
+        db_user.password = pwd
+        db.add(db_user)
+        db.add(db_user)
+        db.commit()
+        db.refresh(db_user)
+        user_id = db_user.id
+        for k, v in register_dict.items():
+            await UserAppDao(db).update_and_insert_data(v.get("name"), pwd, v.get("email"), user_id, str(v.get("id")), k)
+
+    except Exception as e:
+        logger.error(e)
+        # db.roolback()
+        if user_id:
+            db.query(UserModel).filter(UserModel.id == user_id).delete()
+        return False
+    return True
+
+
+async def update_user_token(db, user_id, token_dict):
+    try:
+        for k, v in token_dict.items():
+            await UserAppDao(db).update_user_app_data({"user_id": user_id, "app_type": k},
+                                                      {"access_token": v, "token_at": datetime.now()})
+
+    except Exception as e:
+        logger.error(e)
+        return False
+    return True
+
+
+class UserAppDao:
+    def __init__(self, db: Session):
+        self.db = db
+
+    async def get_data_by_id(self, user_id: int, app_type: int) -> Type[UserAppModel] | None:
+        session = self.db.query(UserAppModel).filter_by(user_id=user_id, app_type=app_type).first()
+        return session
+
+    async def update_user_app_data(self, query: dict, update_data: dict):
+
+        logger.error("鏇存柊鏁版嵁df update_app_data---------------------------")
+        try:
+            self.db.query(UserAppModel).filter_by(**query).update(update_data)
+            self.db.commit()
+        except Exception as e:
+            logger.error(e)
+            self.db.rollback()
+            raise Exception("鏇存柊澶辫触锛�")
+
+    async def insert_user_app_data(self, username: str, password: str, email: str, user_id: int, app_id: str,
+                                   app_type: int):
+        logger.error("鏂板鏁版嵁df insert_user_app_data---------------------------")
+        new_session = UserAppModel(
+            username=username,
+            password=password,
+            email=email,
+            user_id=user_id,
+            app_id=app_id,
+            app_type=app_type,
+        )
+        self.db.add(new_session)
+        self.db.commit()
+        self.db.refresh(new_session)
+        return new_session
+
+    async def update_and_insert_data(self, username: str, password: str, email: str, user_id: int, app_id: str,
+                                     app_type: int):
+
+        logger.error("鏇存柊鎴栬�呮坊鍔犳暟鎹� update_and_insert_token---------------------------")
+        token_boj = await self.get_data_by_id(user_id, app_type)
+        if token_boj:
+            await self.update_user_app_data({"id": token_boj.id}, {"username": username,
+                                                                   "password": password, "email": email,
+                                                                   "updated_at": datetime.now(),
+                                                                   })
+        else:
+            await self.insert_user_app_data(username, password, email, user_id, app_id, app_type)
+
+    async def get_user_datas(self, user_id: int):
+        return self.db.query(UserAppModel).filter_by(user_id=user_id).all()

--
Gitblit v1.8.0