zhaoqingang
2025-04-01 6846a4c98a793e74ae17b47f04a0ff8b210aeb24
app/api/__init__.py
@@ -1,9 +1,11 @@
import urllib
from datetime import datetime
from typing import Callable, Any
from urllib.parse import urlencode
import jwt
from cryptography.fernet import Fernet
from fastapi import FastAPI, Depends, HTTPException
# from cryptography.fernet import Fernet
from fastapi import FastAPI, Depends, HTTPException, Header, Request
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
from pydantic import BaseModel
@@ -11,8 +13,9 @@
from starlette.websockets import WebSocket, WebSocketDisconnect
from Log import logger
from app.models.app_model import AppRegisterModel
from app.models.user_model import UserModel
from app.models.base_model import SessionLocal
# from app.models.app_model import AppRegisterModel
from app.models.user_model import UserModel, UserApiTokenModel
from app.service.auth import SECRET_KEY, ALGORITHM
from app.config.config import settings
@@ -35,9 +38,44 @@
    data: list[dict] = []
def verify_token(token: str) -> Any:
    """
    验证 Token 是否有效
    """
    db = SessionLocal()
    try:
        db_token = db.query(UserApiTokenModel).filter(UserApiTokenModel.token == token, UserApiTokenModel.is_active == 1).first()
        return db_token is not None and (db_token.expires_at is None or db_token.expires_at > datetime.now())
    finally:
        db.close()
def token_required()-> Callable:
    def decorated_function(request: Request)-> Any:
        authorization_str = request.headers.get("Authorization")
        if not authorization_str:
            raise HTTPException(status_code=401, detail="Authorization` can't be empty")
        authorization_list = authorization_str.split()
        if len(authorization_list) < 2:
            raise HTTPException(status_code=401, detail="Invalid token")
        token = authorization_list[1]
        objs = verify_token(token)
        if not objs:
            raise HTTPException(status_code=401, detail="Invalid token")
        user = UserModel(username="", id=objs.user_id)
        return user
    return decorated_function
def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        expired_time = payload.get("lex")
        if not expired_time:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,  detail="令牌无效或已过期",
            headers={"WWW-Authenticate": "Bearer"})
        if datetime.strptime(expired_time, "%Y-%m-%d %H:%M:%S") < datetime.now():
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,  detail="系统授权已过期!",
            headers={"WWW-Authenticate": "Bearer"})
        username: str = payload.get("sub")
        if username is None:
            raise HTTPException(
@@ -116,6 +154,12 @@
                # 记录异常信息,但继续处理其他文件
                print(f"Error processing file URL: {e}")
def get_api_key(authorization: str = Header(...)):
    if not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Invalid Authorization header format.")
    return authorization.split(" ")[1]
if __name__=="__main__":