zhaoqingang
2025-04-01 6846a4c98a793e74ae17b47f04a0ff8b210aeb24
app/api/__init__.py
@@ -1,9 +1,11 @@
import urllib
from datetime import datetime
from typing import Callable, Any
from urllib.parse import urlencode
import jwt
# from cryptography.fernet import Fernet
from fastapi import FastAPI, Depends, HTTPException, Header
from fastapi import FastAPI, Depends, HTTPException, Header, Request
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
from pydantic import BaseModel
@@ -11,8 +13,9 @@
from starlette.websockets import WebSocket, WebSocketDisconnect
from Log import logger
from app.models.base_model import SessionLocal
# from app.models.app_model import AppRegisterModel
from app.models.user_model import UserModel
from app.models.user_model import UserModel, UserApiTokenModel
from app.service.auth import SECRET_KEY, ALGORITHM
from app.config.config import settings
@@ -35,9 +38,44 @@
    data: list[dict] = []
def verify_token(token: str) -> Any:
    """
    验证 Token 是否有效
    """
    db = SessionLocal()
    try:
        db_token = db.query(UserApiTokenModel).filter(UserApiTokenModel.token == token, UserApiTokenModel.is_active == 1).first()
        return db_token is not None and (db_token.expires_at is None or db_token.expires_at > datetime.now())
    finally:
        db.close()
def token_required()-> Callable:
    def decorated_function(request: Request)-> Any:
        authorization_str = request.headers.get("Authorization")
        if not authorization_str:
            raise HTTPException(status_code=401, detail="Authorization` can't be empty")
        authorization_list = authorization_str.split()
        if len(authorization_list) < 2:
            raise HTTPException(status_code=401, detail="Invalid token")
        token = authorization_list[1]
        objs = verify_token(token)
        if not objs:
            raise HTTPException(status_code=401, detail="Invalid token")
        user = UserModel(username="", id=objs.user_id)
        return user
    return decorated_function
def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        expired_time = payload.get("lex")
        if not expired_time:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,  detail="令牌无效或已过期",
            headers={"WWW-Authenticate": "Bearer"})
        if datetime.strptime(expired_time, "%Y-%m-%d %H:%M:%S") < datetime.now():
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,  detail="系统授权已过期!",
            headers={"WWW-Authenticate": "Bearer"})
        username: str = payload.get("sub")
        if username is None:
            raise HTTPException(