From b2e47e75a231baf5a7beca476256ab3d94e76c46 Mon Sep 17 00:00:00 2001 From: zhaoqingang <zhaoqg0118@163.com> Date: 星期二, 01 四月 2025 10:08:15 +0800 Subject: [PATCH] code加密 --- app/service/auth.py | 181 ++++++++++++++++++++++++++++++++++++++++++++- 1 files changed, 177 insertions(+), 4 deletions(-) diff --git a/app/service/auth.py b/app/service/auth.py index 09a4917..3c726fa 100644 --- a/app/service/auth.py +++ b/app/service/auth.py @@ -1,14 +1,28 @@ +import os.path +import re +import uuid +import base64 from datetime import datetime, timedelta +from typing import Type + from jwt import encode, decode, exceptions from passlib.context import CryptContext from fastapi import HTTPException, status +from sqlalchemy.orm import Session +from Log import logger from app.config.config import settings -from app.models.user_model import UserModel +from app.config.const import USER_STATSU_DELETE, APP_SERVICE_PATH +from app.models import RoleModel, GroupModel, TokenModel +from app.models.user_model import UserModel, UserAppModel + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import padding SECRET_KEY = settings.secret_key ALGORITHM = "HS256" -ACCESS_TOKEN_EXPIRE_MINUTES = 3000 +ACCESS_TOKEN_EXPIRE_MINUTES = 24*60 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") @@ -22,7 +36,7 @@ def authenticate_user(db, username: str, password: str): - user = db.query(UserModel).filter(UserModel.username == username).first() + user = db.query(UserModel).filter(UserModel.username == username, UserModel.status != USER_STATSU_DELETE).first() if not user: return False if not verify_password(password, user.hashed_password): @@ -35,7 +49,7 @@ if expires_delta: expire = datetime.utcnow() + expires_delta else: - expire = datetime.utcnow() + timedelta(minutes=15) + expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt @@ -47,3 +61,162 @@ return payload except exceptions.DecodeError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials") + + +def is_valid_password(password: str) -> bool: + if len(password) < 8: + return False + has_digit = re.search(r'[0-9]', password) + has_letter = re.search(r'[A-Za-z]', password) + + # 濡傛灉瀵嗙爜鍖呭惈鏁板瓧鍜屽瓧姣嶏紝鍒欒繑鍥濼rue锛屽惁鍒欒繑鍥濶one + return has_digit is not None and has_letter is not None + + +async def save_register_user(db, username, password, email, app_password, register_dict): + user_id = "" + sync_flag = str(uuid.uuid4()) + try: + hashed_password = pwd_context.hash(password) + db_user = UserModel(username=username, hashed_password=hashed_password, email=email, sync_flag=sync_flag) + # pwd = db_user.encrypted_password(app_password) + # db_user.password = pwd + db_user.roles = [db.query(RoleModel).filter(RoleModel.role_type == 2).first()] + db_user.groups = [db.query(GroupModel).filter(GroupModel.group_type == 2).first()] + db.add(db_user) + db.commit() + db.refresh(db_user) + ''' + user_id = db_user.id + for k, v in register_dict.items(): + await UserAppDao(db).update_and_insert_data(v.get("name"), pwd, v.get("email"), user_id, str(v.get("id")), k) + ''' + except Exception as e: + logger.error(e) + db.rollback() + return False + return sync_flag + + +async def update_user_token(db, user_id, token_dict): + try: + for k, v in token_dict.items(): + await UserAppDao(db).update_user_app_data({"user_id": user_id, "app_type": k}, + {"access_token": v, "token_at": datetime.now()}) + + except Exception as e: + logger.error(e) + return False + return True + +""" +async def update_user_info(db, user_id): + app_register = AppRegisterDao(db).get_apps() + register_dict = {} + user = db.query(UserModel).filter(UserModel.id==user_id).first() + for app in app_register: + if app["id"] == RAGFLOW: + register_dict[app['id']] = {"id": user.ragflow_id, "name": user.username, "email": f"{user.username}@example.com"} + elif app["id"] == BISHENG: + register_dict[app['id']] = {"id": user.bisheng_id, "name": user.username, "email": ""} + elif app["id"] == DIFY: + register_dict[app['id']] = {"id": "", "name": user.username, "email": ""} + else: + logger.error("鏈煡娉ㄥ唽搴旂敤---") + continue + + try: + for k, v in register_dict.items(): + await UserAppDao(db).update_and_insert_data(v.get("name"), user.password, v.get("email"), user_id, + str(v.get("id")), k) + except Exception as e: + logger.error(e) + + # 瀛樺偍鐢ㄦ埛淇℃伅 + # hashed_password = pwd_context.hash(user.password) + # db_user = UserModel(username=user.username, hashed_password=hashed_password, email=user.email) + # db_user.password = db_user.encrypted_password(user.password) + # for k, v in register_dict.items(): + # setattr(db_user, k.replace("app", "id"), v) + # db.add(db_user) + # db.commit() + # db.refresh(db_user) + + # is_sava = await save_register_user(db, user.username, user.password, user.email, register_dict) +""" + + +class UserAppDao: + def __init__(self, db: Session): + self.db = db + + async def get_data_by_id(self, user_id: int, app_type: int) -> Type[UserAppModel] | None: + session = self.db.query(UserAppModel).filter_by(user_id=user_id, app_type=app_type).first() + return session + + async def update_user_app_data(self, query: dict, update_data: dict): + + logger.error("鏇存柊鏁版嵁df update_app_data---------------------------") + try: + self.db.query(UserAppModel).filter_by(**query).update(update_data) + self.db.commit() + except Exception as e: + logger.error(e) + self.db.rollback() + raise Exception("鏇存柊澶辫触锛�") + + async def insert_user_app_data(self, username: str, password: str, email: str, user_id: int, app_id: str, + app_type: int): + logger.error("鏂板鏁版嵁df insert_user_app_data---------------------------") + new_session = UserAppModel( + username=username, + password=password, + email=email, + user_id=user_id, + app_id=app_id, + app_type=app_type, + ) + self.db.add(new_session) + self.db.commit() + self.db.refresh(new_session) + return new_session + + async def update_and_insert_data(self, username: str, password: str, email: str, user_id: int, app_id: str, + app_type: int): + + logger.error("鏇存柊鎴栬�呮坊鍔犳暟鎹� update_and_insert_token---------------------------") + token_boj = await self.get_data_by_id(user_id, app_type) + if token_boj: + await self.update_user_app_data({"id": token_boj.id}, {"username": username, + "password": password, "email": email, + "updated_at": datetime.now(), + }) + else: + await self.insert_user_app_data(username, password, email, user_id, app_id, app_type) + + async def get_user_datas(self, user_id: int): + return self.db.query(UserAppModel).filter_by(user_id=user_id).all() + + +async def password_rsa(password): + with open(os.path.join(APP_SERVICE_PATH, "pom/private_key.pem"), "rb") as key_file: + private_key = serialization.load_pem_private_key( + key_file.read(), + password=None, # 濡傛灉绉侀挜鍔犲瘑锛岃鎻愪緵瀵嗙爜 + backend=default_backend() + ) + # Base64 瑙g爜 + try: + # 瑙e瘑娑堟伅 + ciphertext = base64.b64decode(password) + # 浣跨敤 PKCS#1 v1.5 濉厖瑙e瘑 + plaintext = private_key.decrypt( + ciphertext, + padding.PKCS1v15() # 鏀逛负 PKCS#1 v1.5 濉厖 + ) + return plaintext.decode() + except Exception as e: + print(e) + return "" + + -- Gitblit v1.8.0