zhaoqingang
2025-01-07 b134897836ca6f26cba71ef22f2474447cf39d15
app/task/fetch_agent.py
@@ -1,15 +1,20 @@
import json
import os
from pickle import PROTO
from typing import Dict, List, Tuple
from sqlalchemy import create_engine, Column, String, Integer, Text
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import sessionmaker, Session
from app.config.config import settings
from app.config.const import RAGFLOW, BISHENG, DIFY
from app.models import DialogModel
from app.config.const import RAGFLOW, BISHENG, DIFY, ENV_CONF_PATH
from app.models import KnowledgeModel
from app.models.dialog_model import DialogModel
from app.models.user_model import UserAppModel
from app.models.agent_model import AgentModel
from app.models.base_model import SessionLocal, Base
from app.models.resource_model import ResourceModel, ResourceTypeModel
from app.service.v2.app_register import AppRegisterDao
# 创建数据库引擎和会话工厂
@@ -47,6 +52,26 @@
    status = Column(String(16), nullable=False)
    description = Column(Text, nullable=False)
    tenant_id = Column(String(36), nullable=False)
    mode = Column(String(36), nullable=False)
class RgKnowledge(Base):
    __tablename__ = 'knowledgebase'
    id = Column(String(36), primary_key=True)  # id
    name = Column(String(128))  # 名称
    permission = Column(String(32), default="me")
    tenant_id = Column(String(32))  # 创建人id
    description = Column(Text)  # 说明
    status = Column(String(1))  # 状态
    doc_num = Column(Integer)  # 文档
class RgUserTenant(Base):
    __tablename__ = 'user_tenant'
    id = Column(String(36), primary_key=True)  # id
    tenant_id = Column(String(32))  # 名称
    user_id = Column(String(32))
    role = Column(String(32))  # 创建人id
# 解析名字
@@ -130,15 +155,15 @@
            result = db.query(AgentModel).delete()
            db.commit()  # 提交事务
        initial_agents = [
            ('80ee430a-e396-48c4-a12c-7c7cdf5eda51', 1, '报告生成', 'BISHENG', 'report'),
            # ('80ee430a-e396-48c4-a12c-7c7cdf5eda51', 1, '报告生成', 'DIFY', 'report'),
            ('basic_excel_merge', 2, '报表合并', 'BASIC', 'excelMerge'),
            ('bfd090d589d811efb3630242ac190006', 4, '文档智能', 'BISHENG', 'report'),
            ('7638f00638a24c21a68ec6c49b304a35', 4, '文档智能', 'DIFY', 'documentIa'),
            ('da3451da89d911efb9490242ac190006', 3, '知识问答', 'RAGFLOW', 'knowledgeQA'),
            ('e96eb7a589db11ef87d20242ac190006', 5, '智能问答', 'RAGFLOW', 'chat'),
            ('basic_excel_talk', 6, '智能数据', 'BASIC', 'excelTalk'),
            ('basic_question_talk', 7, '出题组卷', 'BASIC', 'questionTalk'),
            ('9d75142a-66eb-4e23-b7d4-03efe4584915', 8, '小数绘图', 'DIFY', 'imageTalk'),
            ('basic_paper_talk', 8, '文档出卷', 'BASIC', 'paperTalk'),
            ('2f6ddf93-7ba6-4b2d-b991-d96421404600', 9, '文档出卷', 'DIFY', 'paperTalk'),
            ('basic_report_clean', 10, '文档报告', 'DIFY', 'reportWorkflow')
        ]
@@ -157,10 +182,10 @@
def sync_agents():
    try:
        bisheng_data = get_data_from_bisheng(BISHENG_NAMES_TO_SYNC)
        # bisheng_data = get_data_from_bisheng(BISHENG_NAMES_TO_SYNC)
        ragflow_data = get_data_from_ragflow(RAGFLOW_NAMES_TO_SYNC)
        update_ids_in_local(bisheng_data)
        # update_ids_in_local(bisheng_data)
        update_ids_in_local(ragflow_data)
        print("Agents synchronized successfully")
@@ -185,36 +210,48 @@
    finally:
        db.close()
def get_rag_user_id(db, tenant_id, app_type):
    user = db.query(UserAppModel).filter(UserAppModel.app_type == app_type, UserAppModel.app_id == tenant_id).first()
    if user:
        return user.user_id
    return tenant_id
def get_data_from_bisheng_v2(names: List[str]) -> List[Dict]:
    db = SessionBisheng()
    try:
        if names:
            query = db.query(Flow.id, Flow.name, Flow.description, Flow.status, Flow.user_id) \
                .filter(Flow.name.in_(names), Flow.status==2)
                .filter(Flow.name.in_(names), Flow.status == "1")
        else:
            query = db.query(Flow.id, Flow.name, Flow.description, Flow.status, Flow.user_id).filter(Flow.status==2)
            query = db.query(Flow.id, Flow.name, Flow.description, Flow.status, Flow.user_id).filter(Flow.status == "1")
        results = query.all()
        # print(f"Executing query: {query}")
        # 格式化id为UUID
        formatted_results = [{"id":format_uuid(row[0]), "name": row[1], "description": row[2], "status": str(row[3]-1), "user_id": str(row[4])} for row in results]
        formatted_results = [
            {"id": row[0], "name": row[1], "description": row[2], "status": row[3], "user_id": str(row[4]),
             "mode": "agent-dialog"} for row in results]
        return formatted_results
    finally:
        db.close()
def get_data_from_ragflow_v2(names: List[str]) -> List[Dict]:
    db = SessionRagflow()
    try:
        if names:
            query = db.query(Dialog.id, Dialog.name, Dialog.description, Dialog.status, Dialog.tenant_id) \
                .filter( Dialog.name.in_(names))
                .filter(Dialog.name.in_(names), Dialog.status == "1")
        else:
            query = db.query(Dialog.id, Dialog.name, Dialog.description, Dialog.status, Dialog.tenant_id)
            query = db.query(Dialog.id, Dialog.name, Dialog.description, Dialog.status, Dialog.tenant_id).filter(
                Dialog.status == "1")
        results = query.all()
        formatted_results = [
            {"id": format_uuid(row[0]), "name": row[1], "description": row[2], "status": str(row[3]),
             "user_id": str(row[4])} for row in results]
            {"id": row[0], "name": row[1], "description": row[2], "status": "1" if row[3] == "1" else "2",
             "user_id": str(row[4]), "mode": "agent-dialog"} for row in results]
        return formatted_results
    finally:
        db.close()
@@ -224,27 +261,24 @@
    db = SessionDify()
    try:
        if names:
            query = db.query(DfApps.id, DfApps.name, DfApps.description, DfApps.status, DfApps.tenant_id) \
                .filter( DfApps.name.in_(names))
            query = db.query(DfApps.id, DfApps.name, DfApps.description, DfApps.status, DfApps.tenant_id, DfApps.mode) \
                .filter(DfApps.name.in_(names))
        else:
            query = db.query(DfApps.id, DfApps.name, DfApps.description, DfApps.status, DfApps.tenant_id)
            query = db.query(DfApps.id, DfApps.name, DfApps.description, DfApps.status, DfApps.tenant_id, DfApps.mode)
        results = query.all()
        formatted_results = [
            {"id": str(row[0]), "name": row[1], "description": row[2], "status": "1",
             "user_id": str(row[4])} for row in results]
             "user_id": str(row[4]), "mode": row[5]} for row in results]
        return formatted_results
    finally:
        db.close()
def update_ids_in_local_v2(data: List[Dict], dialog_type:str):
def update_ids_in_local_v2(data: List[Dict], dialog_type: str):
    db = SessionLocal()
    agent_id_list = []
    print("----------------------------------------")
    print(data)
    print("*********************************************")
    type_dict = {"1": RAGFLOW, "2": BISHENG, "4": DIFY}
    try:
        for row in data:
            agent_id_list.append(row["id"])
@@ -252,13 +286,20 @@
            if existing_agent:
                existing_agent.name = row["name"]
                existing_agent.description = row["description"]
                # existing_agent.status = row["status"]
                existing_agent.mode = row["mode"]
                # existing_agent.tenant_id = get_rag_user_id(db, row["user_id"], type_dict[dialog_type])
            else:
                existing = DialogModel(id=row["id"], name=row["name"], description=row["description"], tenant_id=row["user_id"], dialog_type=dialog_type)
                existing = DialogModel(id=row["id"], status=row["status"], name=row["name"],
                                       description=row["description"],
                                       tenant_id=get_rag_user_id(db, row["user_id"], type_dict[dialog_type]),
                                       dialog_type=dialog_type, mode=row["mode"])
                db.add(existing)
        db.commit()
        for dialog in db.query(DialogModel).filter_by(dialog_type=dialog_type).all():
            if dialog.id not in agent_id_list:
                db.query(DialogModel).filter_by(id=dialog.id).delete()
                # print(dialog.id)
                db.query(DialogModel).filter_by(id=dialog.id).update({"status": "2"})
                db.commit()
    except IntegrityError:
        db.rollback()
@@ -267,9 +308,19 @@
        db.close()
def get_data_from_ragflow_knowledge():
    ...
    db = SessionRagflow()
    try:
        results = db.query(RgKnowledge.id, RgKnowledge.name, RgKnowledge.description, RgKnowledge.status,
                           RgKnowledge.tenant_id, RgKnowledge.doc_num, RgKnowledge.permission).all()
        formatted_results = [
            {"id": row[0], "name": row[1], "description": row[2], "status": str(row[3]),
             "user_id": str(row[4]), "doc_num": row[5], "permission": row[6]} for row in results]
        return formatted_results
    finally:
        db.close()
def sync_agents_v2():
    db = SessionLocal()
@@ -279,17 +330,67 @@
        for app in app_register:
            if app["id"] == RAGFLOW:
                ragflow_data = get_data_from_ragflow_v2([])
                update_ids_in_local_v2(ragflow_data, "1")
                if ragflow_data:
                    update_ids_in_local_v2(ragflow_data, "1")
            elif app["id"] == BISHENG:
                bisheng_data = get_data_from_bisheng_v2([])
                update_ids_in_local_v2(bisheng_data, "2")
                if bisheng_data:
                    update_ids_in_local_v2(bisheng_data, "2")
            elif app["id"] == DIFY:
                dify_data = get_data_from_dify_v2([])
                update_ids_in_local_v2(dify_data, "4")
        print("Agents synchronized successfully")
                if dify_data:
                    update_ids_in_local_v2(dify_data, "4")
        print("v2 Agents synchronized successfully")
    except Exception as e:
        print(f"Failed to sync agents: {str(e)}")
        print(f"v2 Failed to sync agents: {str(e)}")
    finally:
        db.close()
def update_ids_in_local_knowledge(data, klg_type):
    type_dict = {"1": RAGFLOW, "2": BISHENG, "4": DIFY}
    db = SessionLocal()
    agent_id_list = []
    try:
        for row in data:
            agent_id_list.append(row["id"])
            existing_agent = db.query(KnowledgeModel).filter_by(id=row["id"]).first()
            if existing_agent:
                existing_agent.name = row["name"]
                existing_agent.description = row["description"]
                # existing_agent.tenant_id = get_rag_user_id(db, row["user_id"], type_dict[klg_type])
                existing_agent.permission = row["permission"]
                existing_agent.documents = row["doc_num"]
                existing_agent.status = row["status"]
            else:
                existing = KnowledgeModel(id=row["id"], name=row["name"], description=row["description"],
                                          tenant_id=get_rag_user_id(db, row["user_id"], type_dict[klg_type]),
                                          status=row["status"],
                                          knowledge_type=1, permission=row["permission"], documents=row["doc_num"])
                db.add(existing)
        db.commit()
        for dialog in db.query(KnowledgeModel).filter_by(knowledge_type=klg_type).all():
            if dialog.id not in agent_id_list:
                db.query(KnowledgeModel).filter_by(id=dialog.id).delete()
                db.commit()
    except IntegrityError:
        db.rollback()
        raise
    finally:
        db.close()
def get_one_from_ragflow_knowledge(klg_id):
    db = SessionRagflow()
    try:
        row = db.query(RgKnowledge.id, RgKnowledge.name, RgKnowledge.description, RgKnowledge.status,
                       RgKnowledge.tenant_id, RgKnowledge.doc_num, RgKnowledge.permission).filter(
            RgKnowledge.id == klg_id).first()
        return {"id": row[0], "name": row[1], "description": row[2], "status": str(row[3]),
                "user_id": str(row[4]), "doc_num": row[5], "permission": row[6]} if row else {}
    finally:
        db.close()
def sync_knowledge():
@@ -299,19 +400,109 @@
        app_register = AppRegisterDao(db).get_apps()
        for app in app_register:
            if app["id"] == RAGFLOW:
                ragflow_data = get_data_from_ragflow_knowledge([])
                update_ids_in_local_v2(ragflow_data, "1")
                ragflow_data = get_data_from_ragflow_knowledge()
                if ragflow_data:
                    update_ids_in_local_knowledge(ragflow_data, "1")
            # elif app["id"] == BISHENG:
            #     bisheng_data = get_data_from_bisheng_v2([])
            #     update_ids_in_local_v2(bisheng_data, "2")
            # elif app["id"] == DIFY:
            #     dify_data = get_data_from_dify_v2([])
            #     update_ids_in_local_v2(dify_data, "4")
        print("Agents synchronized successfully")
        print("sync knowledge successfully")
    except Exception as e:
        print(f"Failed to sync agents: {str(e)}")
        print(f"Failed to sync knowledge: {str(e)}")
    finally:
        db.close()
def update_ragflow_user_tenant(user_id: str):
    db = SessionRagflow()
    try:
        if user_id:
            db.query(RgUserTenant).filter(RgUserTenant.user_id == user_id, RgUserTenant.role == "invite").update(
                {"role": "normal"})
            db.query(RgUserTenant).filter(RgUserTenant.tenant_id == user_id, RgUserTenant.role == "invite").update(
                {"role": "normal"})
        else:
            db.query(RgUserTenant).filter(RgUserTenant.role == "invite").update({"role": "normal"})
        db.commit()
    finally:
        db.close()
def import_type_table(session: Session, node: dict, parent=None):
    resource_type = ResourceTypeModel(
        id=node['id'],
        name=node['name'],
        description=node.get('description')
    )
    if parent:
        resource_type.parent = parent
    session.add(resource_type)
    session.commit()
def import_tree(session: Session, node: dict, parent=None):
    resource = ResourceModel(
        id=node['id'],
        name=node['name'],
        url=node['url'],
        path=node.get('path'),
        perms=node['perms'],
        description=node.get('description'),
        icon=node.get('icon'),
        seq=node['seq'],
        target=node.get('target'),
        canbdeeleted=node.get('canbdeeleted'),
        resource_type_id=node['resource_type_id'],
        resource_id=node.get('resource_id'),
        status=node['status'],
        hidden=node.get('hidden')
    )
    if parent:
        resource.parent = parent
    session.add(resource)
    if 'children' in node:
        for child in node['children']:
            import_tree(session, child, parent=resource)
    session.commit()
def sync_resources_from_json():
    db = SessionLocal()
    try:
        if db.query(ResourceTypeModel).count() == 0:
            with open(os.path.join(ENV_CONF_PATH, "resource_type.json"), 'r', encoding='utf-8') as file:
                type_json_data = json.load(file)
            db.query(ResourceTypeModel).delete()
            db.commit()
            for node in type_json_data:
                import_type_table(db, node)
            print("add resourceType record successfully")
        else:
            print("sync resourcesType successfully")
        if db.query(ResourceModel).count() == 0:
            with open(os.path.join(ENV_CONF_PATH, "resource.json"), 'r', encoding='utf-8') as file:
                json_data = json.load(file)
            db.query(ResourceModel).delete()
            db.commit()
            for node in json_data:
                import_tree(db, node)
            print("add resources record successfully")
        else:
            print("sync resources successfully")
    except Exception as e:
        print(f"Failed to sync resources or resource type: {str(e)}")
    finally:
        db.close()
if __name__ == "__main__":
    a = get_data_from_dify_v2([])
    print(a)
    # a = get_data_from_dify_v2([])
    # print(a)
    update_ragflow_user_tenant("")