zhaoqingang
2025-02-13 6ab3410581d664dd602f6af68f5c8f78bfdb4806
app/service/auth.py
@@ -1,6 +1,10 @@
import os.path
import re
import uuid
import base64
from datetime import datetime, timedelta
from typing import Type
from uuid import uuid4
from jwt import encode, decode, exceptions
from passlib.context import CryptContext
@@ -9,7 +13,16 @@
from Log import logger
from app.config.config import settings
from app.config.const import RAGFLOW, BISHENG, DIFY, USER_STATSU_DELETE, APP_SERVICE_PATH
from app.models import RoleModel, GroupModel, TokenModel
from app.models.user_model import UserModel, UserAppModel
# from app.service.ragflow import RagflowService
# from app.service.service_token import get_new_token
from app.service.v2.app_register import AppRegisterDao
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
SECRET_KEY = settings.secret_key
ALGORITHM = "HS256"
@@ -27,7 +40,7 @@
def authenticate_user(db, username: str, password: str):
    user = db.query(UserModel).filter(UserModel.username == username).first()
    user = db.query(UserModel).filter(UserModel.username == username, UserModel.status != USER_STATSU_DELETE).first()
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
@@ -55,7 +68,7 @@
def is_valid_password(password: str) -> bool:
    if len(password) <= 8:
    if len(password) < 8:
        return False
    has_digit = re.search(r'[0-9]', password)
    has_letter = re.search(r'[A-Za-z]', password)
@@ -64,36 +77,38 @@
    return has_digit is not None and has_letter is not None
async def save_register_user(db, username, password, email, register_dict):
async def save_register_user(db, username, password, email, app_password, register_dict):
    user_id = ""
    sync_flag = str(uuid.uuid4())
    try:
        hashed_password = pwd_context.hash(password)
        db_user = UserModel(username=username, hashed_password=hashed_password, email=email)
        pwd = db_user.encrypted_password(password)
        db_user = UserModel(username=username, hashed_password=hashed_password, email=email, sync_flag=sync_flag)
        pwd = db_user.encrypted_password(app_password)
        db_user.password = pwd
        db.add(db_user)
        db_user.roles = [db.query(RoleModel).filter(RoleModel.role_type == 2).first()]
        db_user.groups = [db.query(GroupModel).filter(GroupModel.group_type == 2).first()]
        db.add(db_user)
        db.commit()
        db.refresh(db_user)
        user_id = db_user.id
        for k, v in register_dict.items():
            UserAppDao(db).update_and_insert_token(v.get("name"), pwd, v.get("email"), user_id, str(v.get("id")), k)
            await UserAppDao(db).update_and_insert_data(v.get("name"), pwd, v.get("email"), user_id, str(v.get("id")), k)
    except Exception as e:
        logger.error(e)
        # db.roolback()
        if user_id:
            db.query(UserModel).filter(UserModel.id == user_id).delete()
            db.commit
        return False
    return True
    return sync_flag
async def update_user_token(db, user_id, token_dict):
    try:
        for k, v in token_dict.items():
            UserAppDao(db).update_user_app_data({"user_id": user_id, "app_type": k},
                                                {"access_token": v, "token_at": datetime.now()})
            await UserAppDao(db).update_user_app_data({"user_id": user_id, "app_type": k},
                                                      {"access_token": v, "token_at": datetime.now()})
    except Exception as e:
        logger.error(e)
@@ -101,15 +116,49 @@
    return True
async def update_user_info(db, user_id):
    app_register = AppRegisterDao(db).get_apps()
    register_dict = {}
    user = db.query(UserModel).filter(UserModel.id==user_id).first()
    for app in app_register:
        if app["id"] == RAGFLOW:
            register_dict[app['id']] = {"id": user.ragflow_id, "name": user.username, "email": f"{user.username}@example.com"}
        elif app["id"] == BISHENG:
            register_dict[app['id']] = {"id": user.bisheng_id, "name": user.username, "email": ""}
        elif app["id"] == DIFY:
            register_dict[app['id']] = {"id": "", "name": user.username, "email": ""}
        else:
            logger.error("未知注册应用---")
            continue
        try:
            for k, v in register_dict.items():
                await UserAppDao(db).update_and_insert_data(v.get("name"), user.password, v.get("email"), user_id,
                                                            str(v.get("id")), k)
        except Exception as e:
            logger.error(e)
    # 存储用户信息
    # hashed_password = pwd_context.hash(user.password)
    # db_user = UserModel(username=user.username, hashed_password=hashed_password, email=user.email)
    # db_user.password = db_user.encrypted_password(user.password)
    # for k, v in register_dict.items():
    #     setattr(db_user, k.replace("app", "id"), v)
    # db.add(db_user)
    # db.commit()
    # db.refresh(db_user)
    # is_sava = await save_register_user(db, user.username, user.password, user.email, register_dict)
class UserAppDao:
    def __init__(self, db: Session):
        self.db = db
    def get_data_by_id(self, user_id: int, app_type: int) -> Type[UserAppModel] | None:
    async def get_data_by_id(self, user_id: int, app_type: int) -> Type[UserAppModel] | None:
        session = self.db.query(UserAppModel).filter_by(user_id=user_id, app_type=app_type).first()
        return session
    def update_user_app_data(self, query: int, update_data: str):
    async def update_user_app_data(self, query: dict, update_data: dict):
        logger.error("更新数据df update_app_data---------------------------")
        try:
@@ -120,7 +169,8 @@
            self.db.rollback()
            raise Exception("更新失败!")
    def insert_user_app_data(self, username: str, password: str, email: str, user_id: int, app_id: str, app_type: int):
    async def insert_user_app_data(self, username: str, password: str, email: str, user_id: int, app_id: str,
                                   app_type: int):
        logger.error("新增数据df insert_user_app_data---------------------------")
        new_session = UserAppModel(
            username=username,
@@ -135,15 +185,42 @@
        self.db.refresh(new_session)
        return new_session
    def update_and_insert_token(self, username: str, password: str, email: str, user_id: int, app_id: str,
                                app_type: int):
    async def update_and_insert_data(self, username: str, password: str, email: str, user_id: int, app_id: str,
                                     app_type: int):
        logger.error("更新或者添加数据 update_and_insert_token---------------------------")
        token_boj = self.get_data_by_id(user_id, app_type)
        token_boj = await self.get_data_by_id(user_id, app_type)
        if token_boj:
            self.update_user_app_data({"id": token_boj.id}, {"username": username,
                                                             "password": password, "email": email, "username": username,
                                                             "updated_at": datetime.now(),
                                                             })
            await self.update_user_app_data({"id": token_boj.id}, {"username": username,
                                                                   "password": password, "email": email,
                                                                   "updated_at": datetime.now(),
                                                                   })
        else:
            self.insert_user_app_data(username, password, email, user_id, app_id, app_type)
            await self.insert_user_app_data(username, password, email, user_id, app_id, app_type)
    async def get_user_datas(self, user_id: int):
        return self.db.query(UserAppModel).filter_by(user_id=user_id).all()
async def password_rsa(password):
    with open(os.path.join(APP_SERVICE_PATH, "pom/private_key.pem"), "rb") as key_file:
        private_key = serialization.load_pem_private_key(
            key_file.read(),
            password=None,  # 如果私钥加密,请提供密码
            backend=default_backend()
        )
        # Base64 解码
        try:
            # 解密消息
            ciphertext = base64.b64decode(password)
            # 使用 PKCS#1 v1.5 填充解密
            plaintext = private_key.decrypt(
                ciphertext,
                padding.PKCS1v15()  # 改为 PKCS#1 v1.5 填充
            )
            return  plaintext.decode()
        except Exception as e:
            print(e)
            return ""