from Log import logger
|
from app.models import current_time
|
from app.models.public_api_model import DfTokenModel
|
from sqlalchemy.orm import Session
|
from typing import Type
|
|
|
class DfTokenDao:
|
def __init__(self, db: Session):
|
self.db = db
|
|
def get_token_by_id(self, api_id: str) -> Type[DfTokenModel] | None:
|
session = self.db.query(DfTokenModel).filter_by(id=api_id).first()
|
if session:
|
return session.token
|
return None
|
|
def update_token(self, api_id: str, token: str):
|
|
logger.error("更新数据df api token---------------------------")
|
try:
|
self.db.query(DfTokenModel).filter(DfTokenModel.id==api_id).update({"token":token, "updated_at": current_time()})
|
self.db.commit()
|
except Exception as e:
|
logger.error(e)
|
self.db.rollback()
|
raise Exception("更新失败!")
|
|
def insert_token(self, api_id: str, token: str):
|
logger.error("新增数据df api token---------------------------")
|
new_session = DfTokenModel(
|
id=api_id,
|
token=token
|
)
|
self.db.add(new_session)
|
self.db.commit()
|
self.db.refresh(new_session)
|
return new_session
|
|
|
def update_and_insert_token(self, api_id: str, token: str):
|
|
logger.error("更新或者添加数据df api token---------------------------")
|
token_boj = self.get_token_by_id(api_id)
|
if token_boj:
|
self.update_token(api_id, token)
|
else:
|
self.insert_token(api_id, token)
|