zhaoqingang
2025-03-13 30ff0afd5d76a3a5aa48058210ae411253574ada
app/api/__init__.py
@@ -1,9 +1,11 @@
import urllib
from datetime import datetime
from typing import Callable, Any
from urllib.parse import urlencode
import jwt
# from cryptography.fernet import Fernet
from fastapi import FastAPI, Depends, HTTPException
from fastapi import FastAPI, Depends, HTTPException, Header, Request
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
from pydantic import BaseModel
@@ -11,8 +13,9 @@
from starlette.websockets import WebSocket, WebSocketDisconnect
from Log import logger
from app.models.base_model import SessionLocal
# from app.models.app_model import AppRegisterModel
from app.models.user_model import UserModel
from app.models.user_model import UserModel, UserApiTokenModel
from app.service.auth import SECRET_KEY, ALGORITHM
from app.config.config import settings
@@ -34,6 +37,33 @@
    msg: str = ""
    data: list[dict] = []
def verify_token(token: str) -> Any:
    """
    验证 Token 是否有效
    """
    db = SessionLocal()
    try:
        db_token = db.query(UserApiTokenModel).filter(UserApiTokenModel.token == token, UserApiTokenModel.is_active == 1).first()
        return db_token is not None and (db_token.expires_at is None or db_token.expires_at > datetime.now())
    finally:
        db.close()
def token_required()-> Callable:
    def decorated_function(request: Request)-> Any:
        authorization_str = request.headers.get("Authorization")
        if not authorization_str:
            raise HTTPException(status_code=401, detail="Authorization` can't be empty")
        authorization_list = authorization_str.split()
        if len(authorization_list) < 2:
            raise HTTPException(status_code=401, detail="Invalid token")
        token = authorization_list[1]
        objs = verify_token(token)
        if not objs:
            raise HTTPException(status_code=401, detail="Invalid token")
        user = UserModel(username="", id=objs.user_id)
        return user
    return decorated_function
def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
@@ -116,6 +146,12 @@
                # 记录异常信息,但继续处理其他文件
                print(f"Error processing file URL: {e}")
def get_api_key(authorization: str = Header(...)):
    if not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Invalid Authorization header format.")
    return authorization.split(" ")[1]
if __name__=="__main__":