import asyncio from fastapi import Depends from Log import logger from app.config.config import settings from app.models import AgentType from app.models.public_api_model import AppRegisterModel from app.models.base_model import get_db from app.service.bisheng import BishengService from app.service.ragflow import RagflowService def sync_resource(): asyncio.run(sync_knowledge()) asyncio.run(sync_dialog()) asyncio.run(sync_agent()) asyncio.run(sync_llm()) async def sync_knowledge(db=Depends(get_db)): token = "" register_app = db.query(AppRegisterModel).filter(AppRegisterModel.status.__eq__(1)).all() for rapp in register_app: if rapp.app_type == AgentType.RAGFLOW: token = "" ragflow_service = RagflowService(settings.fwr_base_url) ragflow_service.get_knowledge_list() elif rapp.app_type ==AgentType.BISHENG: token = "" bisheng_service = BishengService(settings.sgb_base_url) else: logger.error("注册未知应用:{}".format(rapp.app_type)) async def sync_dialog(): ... async def sync_agent(): ... async def sync_llm(): ...