import asyncio import json import time from datetime import datetime, timedelta from sqlalchemy.future import select from app.config.agent_base_url import RG_USER_LOGIN, DF_USER_LOGIN, RG_PING, DF_PING from app.config.config import settings from app.config.const import chat_server, workflow_server, http_200 from app.models import UserTokenModel from app.models.app_token_model import AppToken from app.models.base_model import SessionLocal from app.models.postgresql_base_model import get_pdb, PostgresqlSessionLocal from app.service.ragflow import RagflowService from app.service.v2.app_driver.chat_data import ChatBaseApply from app.utils.password_handle import password_decrypted async def sync_token(): """ 1.获取到user_token表中的账号 2.判断是否超过12小时,超过则获取新的token 3.未超过12小时,则测试token有效性,chat_ping接口,返回401,则重新获取token 4.token获取:login df:/console/api/workspaces rg:/v1/system/version 5.跟新本地token和kong网关token :return: """ app_data = [{"token_id": chat_server, "url": f"{settings.fwr_base_url}{RG_USER_LOGIN}", "data": {"email": "", "password": ""}, "is_crypt": True, "ping_url": f"{settings.fwr_base_url}{RG_PING}", "token": "{}"}, {"token_id": workflow_server, "url": f"{settings.dify_base_url}{DF_USER_LOGIN}", "data": {"email": "", "password": "", "remember_me": True, "language": "zh-Hans"}, "is_crypt": False , "ping_url": f"{settings.dify_base_url}{DF_PING}", "token": "Bearer {}"}] async def sync_token_chat(token_id, url, data, is_crypt, ping_url, token): db = SessionLocal() # pdb = PostgresqlSessionLocal() current_time = datetime.now() - timedelta(hours=12) try: user_token = db.query(UserTokenModel).filter(UserTokenModel.id == token_id).first() chat = ChatBaseApply() if user_token and (user_token.updated_at < current_time or not user_token.access_token or await chat.chat_ping(ping_url, {}, await chat.get_chat_headers(token.format(user_token.access_token))) != http_200): data["email"] = user_token.account if is_crypt: data["password"] = await chat.password_encrypt(await password_decrypted(user_token.password)) else: data["password"] = await password_decrypted(user_token.password) res = await chat.chat_login(url, data, {'Content-Type': 'application/json'}) if res: access_token = res["data"]["access_token"] user_token.access_token = access_token user_token.updated_at = datetime.now() db.commit() except Exception as e: print(e) finally: db.close() # await pdb.close() tasks = [] for app in app_data: tasks.append(asyncio.create_task(sync_token_chat(app["token_id"], app["url"], app["data"], app["is_crypt"], app["ping_url"], app["token"]))) done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) def start_sync_token_task(): asyncio.run(sync_token()) if __name__ == "__main__": start_sync_token_task()