From 6846a4c98a793e74ae17b47f04a0ff8b210aeb24 Mon Sep 17 00:00:00 2001 From: zhaoqingang <zhaoqg0118@163.com> Date: 星期二, 01 四月 2025 16:52:51 +0800 Subject: [PATCH] 授权license --- app/api/__init__.py | 50 +++++++++++++++++++++++++++++++++++++++++++++++--- 1 files changed, 47 insertions(+), 3 deletions(-) diff --git a/app/api/__init__.py b/app/api/__init__.py index 33175b2..de2282f 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -1,9 +1,11 @@ import urllib +from datetime import datetime +from typing import Callable, Any from urllib.parse import urlencode import jwt # from cryptography.fernet import Fernet -from fastapi import FastAPI, Depends, HTTPException +from fastapi import FastAPI, Depends, HTTPException, Header, Request from fastapi.security import OAuth2PasswordBearer from passlib.context import CryptContext from pydantic import BaseModel @@ -11,8 +13,9 @@ from starlette.websockets import WebSocket, WebSocketDisconnect from Log import logger -from app.models.app_model import AppRegisterModel -from app.models.user_model import UserModel +from app.models.base_model import SessionLocal +# from app.models.app_model import AppRegisterModel +from app.models.user_model import UserModel, UserApiTokenModel from app.service.auth import SECRET_KEY, ALGORITHM from app.config.config import settings @@ -35,9 +38,44 @@ data: list[dict] = [] +def verify_token(token: str) -> Any: + """ + 楠岃瘉 Token 鏄惁鏈夋晥 + """ + db = SessionLocal() + try: + db_token = db.query(UserApiTokenModel).filter(UserApiTokenModel.token == token, UserApiTokenModel.is_active == 1).first() + return db_token is not None and (db_token.expires_at is None or db_token.expires_at > datetime.now()) + finally: + db.close() + +def token_required()-> Callable: + def decorated_function(request: Request)-> Any: + authorization_str = request.headers.get("Authorization") + if not authorization_str: + raise HTTPException(status_code=401, detail="Authorization` can't be empty") + authorization_list = authorization_str.split() + if len(authorization_list) < 2: + raise HTTPException(status_code=401, detail="Invalid token") + token = authorization_list[1] + objs = verify_token(token) + if not objs: + raise HTTPException(status_code=401, detail="Invalid token") + user = UserModel(username="", id=objs.user_id) + return user + return decorated_function + def get_current_user(token: str = Depends(oauth2_scheme)): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + expired_time = payload.get("lex") + if not expired_time: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="浠ょ墝鏃犳晥鎴栧凡杩囨湡", + headers={"WWW-Authenticate": "Bearer"}) + if datetime.strptime(expired_time, "%Y-%m-%d %H:%M:%S") < datetime.now(): + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="绯荤粺鎺堟潈宸茶繃鏈燂紒", + headers={"WWW-Authenticate": "Bearer"}) + username: str = payload.get("sub") if username is None: raise HTTPException( @@ -116,6 +154,12 @@ # 璁板綍寮傚父淇℃伅锛屼絾缁х画澶勭悊鍏朵粬鏂囦欢 print(f"Error processing file URL: {e}") +def get_api_key(authorization: str = Header(...)): + if not authorization.startswith("Bearer "): + raise HTTPException(status_code=401, detail="Invalid Authorization header format.") + return authorization.split(" ")[1] + + if __name__=="__main__": -- Gitblit v1.8.0