zhaoqingang
2025-03-07 af86455055918d26a0f6eebc270074c4863db0be
app/models/token_model.py
@@ -1,13 +1,7 @@
from datetime import datetime
from typing import Type
from sqlalchemy import Column, Integer, DateTime, Text
from sqlalchemy import Column, Integer, DateTime, Text, String
from sqlalchemy.orm import Session
from Log import logger
from app.config.const import RAGFLOW
from app.models.base_model import Base
from app.service.auth import UserAppDao
class TokenModel(Base):
@@ -55,41 +49,11 @@
        db.rollback()  # 回滚事务
async def update_token(db: Session, user_id: int, access_token: str, token: dict):
    # 参数验证
    if not isinstance(user_id, int) or user_id <= 0:
        return
    db_token = None
    # print(token)
    try:
        # 查询现有记录
        db_token = db.query(TokenModel).filter_by(user_id=user_id).first()
        if db_token:
            # 记录存在,进行更新
            db_token.token = access_token
            for k, v in token.items():
                setattr(db_token, k.replace("app", "token"), v)
        else:
            # 记录不存在,进行插入
            db_token = TokenModel(
                user_id=user_id,
                token=access_token,
            )
            for k, v in token.items():
                setattr(db_token, k.replace("app", "token"), v)
            db.add(db_token)
        # 提交事务
        db.commit()
        db.refresh(db_token)
    except Exception as e:
        logger.error(e)
        # 异常处理
        db.rollback()  # 回滚事务
async def get_token(db: Session, user_id: int):
    # return db.query(TokenModel).filter_by(user_id=user_id).first()
    return {i.app_type.replace("app", "token"): i.access_token for i in await UserAppDao(db).get_user_datas(user_id)}
class ApiTokenModel(Base):
    __tablename__ = "chat_api_tokens"
    id = Column(String(36), primary_key=True)
    app_id = Column(String(36), index=True)
    type = Column(String(16))
    token = Column(String(255))
    created_at = Column(DateTime, default=datetime.now())
    last_used_at = Column(DateTime, default=datetime.now())