zhaoqingang
2025-03-18 282a631b9ceee9a634ee1d93751a5254ed37ccef
app/task/fetch_agent.py
@@ -1,17 +1,20 @@
import json
import os
from pickle import PROTO
from typing import Dict, List, Tuple
from sqlalchemy import create_engine, Column, String, Integer, Text
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import sessionmaker, Session
from app.config.config import settings
from app.config.const import RAGFLOW, BISHENG, DIFY
from app.models import KnowledgeModel
from app.config.const import RAGFLOW, BISHENG, DIFY, ENV_CONF_PATH, Dialog_STATSU_DELETE, Dialog_STATSU_ON
from app.models import KnowledgeModel, ComplexChatDao
from app.models.dialog_model import DialogModel
from app.models.user_model import UserAppModel
from app.models.agent_model import AgentModel
from app.models.base_model import SessionLocal, Base
from app.models.resource_model import ResourceModel, ResourceTypeModel
from app.service.v2.app_register import AppRegisterDao
# 创建数据库引擎和会话工厂
@@ -40,6 +43,7 @@
    status = Column(String(1), nullable=False)
    description = Column(String(255), nullable=False)
    tenant_id = Column(String(36), nullable=False)
    kb_ids = Column(String(128), nullable=False)
class DfApps(Base):
@@ -49,6 +53,7 @@
    status = Column(String(16), nullable=False)
    description = Column(Text, nullable=False)
    tenant_id = Column(String(36), nullable=False)
    mode = Column(String(36), nullable=False)
class RgKnowledge(Base):
@@ -60,6 +65,16 @@
    description = Column(Text)  # 说明
    status = Column(String(1))  # 状态
    doc_num = Column(Integer)  # 文档
    embd_id = Column(String(128))  # 文档
class RgUserTenant(Base):
    __tablename__ = 'user_tenant'
    id = Column(String(36), primary_key=True)  # id
    tenant_id = Column(String(32))  # 名称
    user_id = Column(String(32))
    role = Column(String(32))  # 创建人id
# 解析名字
def parse_names(names_str: str) -> List[str]:
@@ -150,7 +165,7 @@
            ('basic_excel_talk', 6, '智能数据', 'BASIC', 'excelTalk'),
            ('basic_question_talk', 7, '出题组卷', 'BASIC', 'questionTalk'),
            ('9d75142a-66eb-4e23-b7d4-03efe4584915', 8, '小数绘图', 'DIFY', 'imageTalk'),
            ('basic_paper_talk', 9, '文档出卷', 'BASIC', 'paperTalk'),
            ('2f6ddf93-7ba6-4b2d-b991-d96421404600', 9, '文档出卷', 'DIFY', 'paperTalk'),
            ('basic_report_clean', 10, '文档报告', 'DIFY', 'reportWorkflow')
        ]
@@ -197,9 +212,9 @@
    finally:
        db.close()
def get_rag_user_id(db, tenant_id, app_type):
    user = db.query(UserAppModel).filter(UserAppModel.app_type==app_type, UserAppModel.app_id==tenant_id).first()
def get_rag_user_id(db, tenant_id, app_type):
    user = db.query(UserAppModel).filter(UserAppModel.app_type == app_type, UserAppModel.app_id == tenant_id).first()
    if user:
        return user.user_id
    return tenant_id
@@ -210,75 +225,99 @@
    try:
        if names:
            query = db.query(Flow.id, Flow.name, Flow.description, Flow.status, Flow.user_id) \
                .filter(Flow.name.in_(names))
                .filter(Flow.name.in_(names), Flow.status == "1")
        else:
            query = db.query(Flow.id, Flow.name, Flow.description, Flow.status, Flow.user_id)
            query = db.query(Flow.id, Flow.name, Flow.description, Flow.status, Flow.user_id).filter(Flow.status == "1")
        results = query.all()
        # print(f"Executing query: {query}")
        # 格式化id为UUID
        formatted_results = [{"id":format_uuid(row[0]), "name": row[1], "description": row[2], "status": "1" if row[3] ==2 else "0", "user_id": str(row[4])} for row in results]
        formatted_results = [
            {"id": row[0], "name": row[1], "description": row[2], "status": row[3], "user_id": str(row[4]),
             "mode": "agent-dialog"} for row in results]
        return formatted_results
    finally:
        db.close()
def get_data_from_ragflow_v2(names: List[str]) -> List[Dict]:
def get_data_from_ragflow_v2(base_db, names: List[str], tenant_id) -> List[Dict]:
    db = SessionRagflow()
    para = {
        "user_input_form": [],
        "retriever_resource": {
            "enabled": True
        },
        "file_upload": {
            "enabled": False
        }
    }
    try:
        chat_ids = ComplexChatDao(base_db).get_complex_chat_ids()
        # print(chat_ids)
        if names:
            query = db.query(Dialog.id, Dialog.name, Dialog.description, Dialog.status, Dialog.tenant_id) \
                .filter( Dialog.name.in_(names))
                .filter(Dialog.name.in_(names), Dialog.status == "1")
        else:
            query = db.query(Dialog.id, Dialog.name, Dialog.description, Dialog.status, Dialog.tenant_id)
            query = db.query(Dialog.id, Dialog.name, Dialog.description, Dialog.status, Dialog.tenant_id, Dialog.kb_ids).filter(
                Dialog.status == "1", Dialog.tenant_id == tenant_id)
        results = query.all()
        formatted_results = [
            {"id": format_uuid(row[0]), "name": row[1], "description": row[2], "status": str(row[3]) if row[3]  ==1 else "2",
             "user_id": str(row[4])} for row in results]
            {"id": row[0], "name": row[1], "description": row[2], "status": "1" if row[3] == "1" else "2",
             "user_id": str(row[4]), "mode": "agent-dialog", "parameters": para, "kb_ids": row[5]} for row in results if row[0] not in chat_ids]
        return formatted_results
    finally:
        db.close()
def get_data_from_dify_v2(names: List[str]) -> List[Dict]:
def get_data_from_dy_v2(base_db, names: List[str]) -> List[Dict]:
    db = SessionDify()
    try:
        chat_ids = ComplexChatDao(base_db).get_complex_chat_ids()
        # print(chat_ids)
        if names:
            query = db.query(DfApps.id, DfApps.name, DfApps.description, DfApps.status, DfApps.tenant_id) \
                .filter( DfApps.name.in_(names))
            query = db.query(DfApps.id, DfApps.name, DfApps.description, DfApps.status, DfApps.tenant_id, DfApps.mode) \
                .filter(DfApps.name.in_(names))
        else:
            query = db.query(DfApps.id, DfApps.name, DfApps.description, DfApps.status, DfApps.tenant_id)
            query = db.query(DfApps.id, DfApps.name, DfApps.description, DfApps.status, DfApps.tenant_id, DfApps.mode)
        results = query.all()
        formatted_results = [
            {"id": str(row[0]), "name": row[1], "description": row[2], "status": "1",
             "user_id": str(row[4])} for row in results]
             "user_id": str(row[4]), "mode": row[5], "parameters": {}} for row in results if str(row[0]) not in chat_ids]
        return formatted_results
    finally:
        db.close()
def update_ids_in_local_v2(data: List[Dict], dialog_type:str):
def update_ids_in_local_v2(data: List[Dict], dialog_type: str):
    db = SessionLocal()
    agent_id_list = []
    type_dict = {"1": RAGFLOW,"2": BISHENG,"4": DIFY}
    type_dict = {"1": RAGFLOW, "2": BISHENG, "4": DIFY}
    try:
        for row in data:
            agent_id_list.append(row["id"])
            existing_agent = db.query(DialogModel).filter_by(id=row["id"]).first()
            if existing_agent:
                existing_agent.name = row["name"]
                existing_agent.status = row["status"]
                existing_agent.description = row["description"]
                # existing_agent.tenant_id = get_rag_user_id(db, row["user_id"], type_dict[dialog_type])
                existing_agent.mode = row["mode"]
                existing_agent.kb_ids = row.get("kb_ids", "")
                if existing_agent.status == Dialog_STATSU_DELETE:
                    existing_agent.status = Dialog_STATSU_ON
                if row["parameters"]:
                    existing_agent.parameters = json.dumps(row["parameters"])
            else:
                existing = DialogModel(id=row["id"], status=row["status"], name=row["name"], description=row["description"], tenant_id=get_rag_user_id(db, row["user_id"], type_dict[dialog_type]), dialog_type=dialog_type)
                existing = DialogModel(id=row["id"], status=row["status"], name=row["name"],
                                       description=row["description"], kb_ids=row.get("kb_ids", ""),
                                       tenant_id=get_rag_user_id(db, row["user_id"], type_dict[dialog_type]),
                                       dialog_type=dialog_type, mode=row["mode"], parameters=json.dumps(row["parameters"]))
                db.add(existing)
        db.commit()
        for dialog in db.query(DialogModel).filter_by(dialog_type=dialog_type).all():
            if dialog.id not in agent_id_list:
                db.query(DialogModel).filter_by(id=dialog.id).update({"status": "2"})
                # print(dialog.id)
                db.query(DialogModel).filter_by(id=dialog.id).update({"status": Dialog_STATSU_DELETE})
                db.commit()
    except IntegrityError:
        db.rollback()
@@ -287,12 +326,12 @@
        db.close()
def get_data_from_ragflow_knowledge():
def get_data_from_ragflow_knowledge(tenant_id):
    db = SessionRagflow()
    try:
        results = db.query(RgKnowledge.id, RgKnowledge.name, RgKnowledge.description, RgKnowledge.status, RgKnowledge.tenant_id, RgKnowledge.doc_num, RgKnowledge.permission).all()
        results = db.query(RgKnowledge.id, RgKnowledge.name, RgKnowledge.description, RgKnowledge.status,
                           RgKnowledge.tenant_id, RgKnowledge.doc_num, RgKnowledge.permission).filter(RgKnowledge.tenant_id==tenant_id).all()
        formatted_results = [
            {"id": row[0], "name": row[1], "description": row[2], "status": str(row[3]),
             "user_id": str(row[4]), "doc_num": row[5], "permission": row[6]} for row in results]
@@ -300,27 +339,30 @@
    finally:
        db.close()
def sync_agents_v2():
    db = SessionLocal()
    try:
        app_register = AppRegisterDao(db).get_apps()
        for app in app_register:
            if app["id"] == RAGFLOW:
                ragflow_data = get_data_from_ragflow_v2([])
                if ragflow_data:
                    update_ids_in_local_v2(ragflow_data, "1")
            elif app["id"] == BISHENG:
                bisheng_data = get_data_from_bisheng_v2([])
                if bisheng_data:
                    update_ids_in_local_v2(bisheng_data, "2")
            elif app["id"] == DIFY:
                dify_data = get_data_from_dify_v2([])
                if dify_data:
                    update_ids_in_local_v2(dify_data, "4")
        print("Agents synchronized successfully")
            try:
                if app["id"] == RAGFLOW:
                    ragflow_data = get_data_from_ragflow_v2(db, [], app["name"])
                    if ragflow_data:
                        update_ids_in_local_v2(ragflow_data, "1")
                elif app["id"] == DIFY:
                    dify_data = get_data_from_dy_v2(db, [])
                    if dify_data:
                        update_ids_in_local_v2(dify_data, "4")
            except Exception as e:
                print(f"Failed to sync agents: {str(e)}")
        print("v2 Agents synchronized successfully")
    except Exception as e:
        print(f"Failed to sync agents: {str(e)}")
        print(f"v2 Failed to sync agents: {str(e)}")
    finally:
        db.close()
def update_ids_in_local_knowledge(data, klg_type):
    type_dict = {"1": RAGFLOW, "2": BISHENG, "4": DIFY}
@@ -334,16 +376,17 @@
                existing_agent.name = row["name"]
                existing_agent.description = row["description"]
                # existing_agent.tenant_id = get_rag_user_id(db, row["user_id"], type_dict[klg_type])
                existing_agent.permission =  row["permission"]
                existing_agent.documents =  row["doc_num"]
                existing_agent.status =  row["status"]
                existing_agent.permission = row["permission"]
                existing_agent.documents = row["doc_num"]
                existing_agent.status = row["status"]
            else:
                existing = KnowledgeModel(id=row["id"], name=row["name"], description=row["description"],
                                       tenant_id=get_rag_user_id(db, row["user_id"], type_dict[klg_type]),status=row["status"],
                                       knowledge_type=1, permission=row["permission"], documents=row["doc_num"])
                                          tenant_id=get_rag_user_id(db, row["user_id"], type_dict[klg_type]),
                                          status=row["status"],
                                          knowledge_type=1, permission=row["permission"], documents=row["doc_num"])
                db.add(existing)
        db.commit()
        for dialog in db.query(KnowledgeModel).filter_by(knowledge_type=type_dict[klg_type]).all():
        for dialog in db.query(KnowledgeModel).filter_by(knowledge_type=klg_type).all():
            if dialog.id not in agent_id_list:
                db.query(KnowledgeModel).filter_by(id=dialog.id).delete()
                db.commit()
@@ -353,13 +396,27 @@
    finally:
        db.close()
def get_one_from_ragflow_knowledge(klg_id):
    db = SessionRagflow()
    try:
        row = db.query(RgKnowledge.id, RgKnowledge.name, RgKnowledge.description, RgKnowledge.status, RgKnowledge.tenant_id, RgKnowledge.doc_num, RgKnowledge.permission).filter(RgKnowledge.id==klg_id).first()
        row = db.query(RgKnowledge.id, RgKnowledge.name, RgKnowledge.description, RgKnowledge.status,
                       RgKnowledge.tenant_id, RgKnowledge.doc_num, RgKnowledge.permission, RgKnowledge.embd_id).filter(
            RgKnowledge.id == klg_id).first()
        return {"id": row[0], "name": row[1], "description": row[2], "status": str(row[3]),
             "user_id": str(row[4]), "doc_num": row[5], "permission": row[6]} if row else {}
                "user_id": str(row[4]), "doc_num": row[5], "permission": row[6], "embd_id": row[7]} if row else {}
    finally:
        db.close()
def get_one_from_ragflow_dialog(dialog_id):
    db = SessionRagflow()
    try:
        row = db.query(Dialog.id, Dialog.name, Dialog.description, Dialog.status, Dialog.tenant_id, Dialog.kb_ids) \
            .filter(Dialog.id==dialog_id).first()
        return {"id": row[0], "name": row[1], "description": row[2], "status": str(row[3]),
                "user_id": str(row[4]), "kb_ids": row[5]} if row else {}
    finally:
        db.close()
@@ -371,7 +428,7 @@
        app_register = AppRegisterDao(db).get_apps()
        for app in app_register:
            if app["id"] == RAGFLOW:
                ragflow_data = get_data_from_ragflow_knowledge()
                ragflow_data = get_data_from_ragflow_knowledge(app["name"])
                if ragflow_data:
                    update_ids_in_local_knowledge(ragflow_data, "1")
            # elif app["id"] == BISHENG:
@@ -380,11 +437,100 @@
            # elif app["id"] == DIFY:
            #     dify_data = get_data_from_dify_v2([])
            #     update_ids_in_local_v2(dify_data, "4")
        print("Agents synchronized successfully")
        print("sync knowledge successfully")
    except Exception as e:
        print(f"Failed to sync agents: {str(e)}")
        print(f"Failed to sync knowledge: {str(e)}")
    finally:
        db.close()
def update_ragflow_user_tenant(user_id: str):
    db = SessionRagflow()
    try:
        if user_id:
            db.query(RgUserTenant).filter(RgUserTenant.user_id == user_id, RgUserTenant.role == "invite").update(
                {"role": "normal"})
            db.query(RgUserTenant).filter(RgUserTenant.tenant_id == user_id, RgUserTenant.role == "invite").update(
                {"role": "normal"})
        else:
            db.query(RgUserTenant).filter(RgUserTenant.role == "invite").update({"role": "normal"})
        db.commit()
    finally:
        db.close()
def import_type_table(session: Session, node: dict, parent=None):
    resource_type = ResourceTypeModel(
        id=node['id'],
        name=node['name'],
        description=node.get('description')
    )
    if parent:
        resource_type.parent = parent
    session.add(resource_type)
    session.commit()
def import_tree(session: Session, node: dict, parent=None):
    resource = ResourceModel(
        id=node['id'],
        name=node['name'],
        url=node['url'],
        path=node.get('path'),
        perms=node['perms'],
        description=node.get('description'),
        icon=node.get('icon'),
        seq=node['seq'],
        target=node.get('target'),
        canbdeeleted=node.get('canbdeeleted'),
        resource_type_id=node['resource_type_id'],
        resource_id=node.get('resource_id'),
        status=node['status'],
        hidden=node.get('hidden')
    )
    if parent:
        resource.parent = parent
    session.add(resource)
    if 'children' in node:
        for child in node['children']:
            import_tree(session, child, parent=resource)
    session.commit()
def sync_resources_from_json():
    db = SessionLocal()
    try:
        if db.query(ResourceTypeModel).count() == 0:
            with open(os.path.join(ENV_CONF_PATH, "resource_type.json"), 'r', encoding='utf-8') as file:
                type_json_data = json.load(file)
            db.query(ResourceTypeModel).delete()
            db.commit()
            for node in type_json_data:
                import_type_table(db, node)
            print("add resourceType record successfully")
        else:
            print("sync resourcesType successfully")
        if db.query(ResourceModel).count() == 0:
            with open(os.path.join(ENV_CONF_PATH, "resource.json"), 'r', encoding='utf-8') as file:
                json_data = json.load(file)
            db.query(ResourceModel).delete()
            db.commit()
            for node in json_data:
                import_tree(db, node)
            print("add resources record successfully")
        else:
            print("sync resources successfully")
    except Exception as e:
        print(f"Failed to sync resources or resource type: {str(e)}")
    finally:
        db.close()
if __name__ == "__main__":
    a = get_data_from_dify_v2([])
    print(a)
    # a = get_data_from_dify_v2([])
    # print(a)
    update_ragflow_user_tenant("")