m
zhaoqingang
2024-12-09 52ba4076f5ad55fdf3239a33a2a376eaa0e0dea5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import re
from datetime import datetime, timedelta
from typing import Type
 
from jwt import encode, decode, exceptions
from passlib.context import CryptContext
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
 
from Log import logger
from app.config.config import settings
from app.models.user_model import UserModel, UserAppModel
 
SECRET_KEY = settings.secret_key
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 3000
 
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
 
 
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)
 
 
def get_password_hash(password):
    return pwd_context.hash(password)
 
 
def authenticate_user(db, username: str, password: str):
    user = db.query(UserModel).filter(UserModel.username == username).first()
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user
 
 
def create_access_token(data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    encoded_jwt = encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt
 
 
def decode_access_token(token: str):
    try:
        payload = decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except exceptions.DecodeError:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
 
 
def is_valid_password(password: str) -> bool:
    if len(password) <= 8:
        return False
    has_digit = re.search(r'[0-9]', password)
    has_letter = re.search(r'[A-Za-z]', password)
 
    # 如果密码包含数字和字母,则返回True,否则返回None
    return has_digit is not None and has_letter is not None
 
 
async def save_register_user(db, username, password, email, register_dict):
    user_id = ""
    try:
        hashed_password = pwd_context.hash(password)
        db_user = UserModel(username=username, hashed_password=hashed_password, email=email)
        pwd = db_user.encrypted_password(password)
        db_user.password = pwd
        db.add(db_user)
        db.add(db_user)
        db.commit()
        db.refresh(db_user)
        user_id = db_user.id
        for k, v in register_dict.items():
            UserAppDao(db).update_and_insert_token(v.get("name"), pwd, v.get("email"), user_id, str(v.get("id")), k)
 
    except Exception as e:
        logger.error(e)
        # db.roolback()
        if user_id:
            db.query(UserModel).filter(UserModel.id == user_id).delete()
        return False
    return True
 
 
async def update_user_token(db, user_id, token_dict):
 
    try:
        for k, v in token_dict.items():
            UserAppDao(db).update_user_app_data({"user_id": user_id, "app_type": k},
                                                {"access_token": v, "token_at": datetime.now()})
 
    except Exception as e:
        logger.error(e)
        return False
    return True
 
 
class UserAppDao:
    def __init__(self, db: Session):
        self.db = db
 
    def get_data_by_id(self, user_id: int, app_type: int) -> Type[UserAppModel] | None:
        session = self.db.query(UserAppModel).filter_by(user_id=user_id, app_type=app_type).first()
        return session
 
    def update_user_app_data(self, query: int, update_data: str):
 
        logger.error("更新数据df update_app_data---------------------------")
        try:
            self.db.query(UserAppModel).filter_by(**query).update(update_data)
            self.db.commit()
        except Exception as e:
            logger.error(e)
            self.db.rollback()
            raise Exception("更新失败!")
 
    def insert_user_app_data(self, username: str, password: str, email: str, user_id: int, app_id: str, app_type: int):
        logger.error("新增数据df insert_user_app_data---------------------------")
        new_session = UserAppModel(
            username=username,
            password=password,
            email=email,
            user_id=user_id,
            app_id=app_id,
            app_type=app_type,
        )
        self.db.add(new_session)
        self.db.commit()
        self.db.refresh(new_session)
        return new_session
 
    def update_and_insert_token(self, username: str, password: str, email: str, user_id: int, app_id: str,
                                app_type: int):
 
        logger.error("更新或者添加数据 update_and_insert_token---------------------------")
        token_boj = self.get_data_by_id(user_id, app_type)
        if token_boj:
            self.update_user_app_data({"id": token_boj.id}, {"username": username,
                                                             "password": password, "email": email, "username": username,
                                                             "updated_at": datetime.now(),
                                                             })
        else:
            self.insert_user_app_data(username, password, email, user_id, app_id, app_type)