import os.path import re import uuid import base64 from datetime import datetime, timedelta from typing import Type from uuid import uuid4 from jwt import encode, decode, exceptions from passlib.context import CryptContext from fastapi import HTTPException, status from sqlalchemy.orm import Session from Log import logger from app.config.config import settings from app.config.const import RAGFLOW, BISHENG, DIFY, USER_STATSU_DELETE, APP_SERVICE_PATH from app.models import RoleModel, GroupModel, TokenModel from app.models.user_model import UserModel, UserAppModel # from app.service.ragflow import RagflowService # from app.service.service_token import get_new_token from app.service.v2.app_register import AppRegisterDao from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import padding SECRET_KEY = settings.secret_key ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 3000 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def authenticate_user(db, username: str, password: str): user = db.query(UserModel).filter(UserModel.username == username, UserModel.status != USER_STATSU_DELETE).first() if not user: return False if not verify_password(password, user.hashed_password): return False return user def create_access_token(data: dict, expires_delta: timedelta = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def decode_access_token(token: str): try: payload = decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except exceptions.DecodeError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials") def is_valid_password(password: str) -> bool: if len(password) < 8: return False has_digit = re.search(r'[0-9]', password) has_letter = re.search(r'[A-Za-z]', password) # 如果密码包含数字和字母,则返回True,否则返回None return has_digit is not None and has_letter is not None async def save_register_user(db, username, password, email, app_password, register_dict): user_id = "" sync_flag = str(uuid.uuid4()) try: hashed_password = pwd_context.hash(password) db_user = UserModel(username=username, hashed_password=hashed_password, email=email, sync_flag=sync_flag) # pwd = db_user.encrypted_password(app_password) # db_user.password = pwd db_user.roles = [db.query(RoleModel).filter(RoleModel.role_type == 2).first()] db_user.groups = [db.query(GroupModel).filter(GroupModel.group_type == 2).first()] db.add(db_user) db.commit() db.refresh(db_user) ''' user_id = db_user.id for k, v in register_dict.items(): await UserAppDao(db).update_and_insert_data(v.get("name"), pwd, v.get("email"), user_id, str(v.get("id")), k) ''' except Exception as e: logger.error(e) db.rollback() return False return sync_flag async def update_user_token(db, user_id, token_dict): try: for k, v in token_dict.items(): await UserAppDao(db).update_user_app_data({"user_id": user_id, "app_type": k}, {"access_token": v, "token_at": datetime.now()}) except Exception as e: logger.error(e) return False return True async def update_user_info(db, user_id): app_register = AppRegisterDao(db).get_apps() register_dict = {} user = db.query(UserModel).filter(UserModel.id==user_id).first() for app in app_register: if app["id"] == RAGFLOW: register_dict[app['id']] = {"id": user.ragflow_id, "name": user.username, "email": f"{user.username}@example.com"} elif app["id"] == BISHENG: register_dict[app['id']] = {"id": user.bisheng_id, "name": user.username, "email": ""} elif app["id"] == DIFY: register_dict[app['id']] = {"id": "", "name": user.username, "email": ""} else: logger.error("未知注册应用---") continue try: for k, v in register_dict.items(): await UserAppDao(db).update_and_insert_data(v.get("name"), user.password, v.get("email"), user_id, str(v.get("id")), k) except Exception as e: logger.error(e) # 存储用户信息 # hashed_password = pwd_context.hash(user.password) # db_user = UserModel(username=user.username, hashed_password=hashed_password, email=user.email) # db_user.password = db_user.encrypted_password(user.password) # for k, v in register_dict.items(): # setattr(db_user, k.replace("app", "id"), v) # db.add(db_user) # db.commit() # db.refresh(db_user) # is_sava = await save_register_user(db, user.username, user.password, user.email, register_dict) class UserAppDao: def __init__(self, db: Session): self.db = db async def get_data_by_id(self, user_id: int, app_type: int) -> Type[UserAppModel] | None: session = self.db.query(UserAppModel).filter_by(user_id=user_id, app_type=app_type).first() return session async def update_user_app_data(self, query: dict, update_data: dict): logger.error("更新数据df update_app_data---------------------------") try: self.db.query(UserAppModel).filter_by(**query).update(update_data) self.db.commit() except Exception as e: logger.error(e) self.db.rollback() raise Exception("更新失败!") async def insert_user_app_data(self, username: str, password: str, email: str, user_id: int, app_id: str, app_type: int): logger.error("新增数据df insert_user_app_data---------------------------") new_session = UserAppModel( username=username, password=password, email=email, user_id=user_id, app_id=app_id, app_type=app_type, ) self.db.add(new_session) self.db.commit() self.db.refresh(new_session) return new_session async def update_and_insert_data(self, username: str, password: str, email: str, user_id: int, app_id: str, app_type: int): logger.error("更新或者添加数据 update_and_insert_token---------------------------") token_boj = await self.get_data_by_id(user_id, app_type) if token_boj: await self.update_user_app_data({"id": token_boj.id}, {"username": username, "password": password, "email": email, "updated_at": datetime.now(), }) else: await self.insert_user_app_data(username, password, email, user_id, app_id, app_type) async def get_user_datas(self, user_id: int): return self.db.query(UserAppModel).filter_by(user_id=user_id).all() async def password_rsa(password): with open(os.path.join(APP_SERVICE_PATH, "pom/private_key.pem"), "rb") as key_file: private_key = serialization.load_pem_private_key( key_file.read(), password=None, # 如果私钥加密,请提供密码 backend=default_backend() ) # Base64 解码 try: # 解密消息 ciphertext = base64.b64decode(password) # 使用 PKCS#1 v1.5 填充解密 plaintext = private_key.decrypt( ciphertext, padding.PKCS1v15() # 改为 PKCS#1 v1.5 填充 ) return plaintext.decode() except Exception as e: print(e) return ""