from pickle import PROTO
|
from typing import Dict, List, Tuple
|
|
from sqlalchemy import create_engine, Column, String, Integer, Text
|
from sqlalchemy.exc import IntegrityError
|
from sqlalchemy.orm import sessionmaker
|
|
from app.config.config import settings
|
from app.config.const import RAGFLOW, BISHENG, DIFY
|
from app.models.dialog_model import DialogModel
|
from app.models.user_model import UserAppModel
|
from app.models.agent_model import AgentModel
|
from app.models.base_model import SessionLocal, Base
|
from app.service.v2.app_register import AppRegisterDao
|
|
# 创建数据库引擎和会话工厂
|
engine_bisheng = create_engine(settings.sgb_db_url)
|
engine_ragflow = create_engine(settings.fwr_db_url)
|
engine_dify = create_engine(settings.dify_database_url)
|
|
SessionBisheng = sessionmaker(autocommit=False, autoflush=False, bind=engine_bisheng)
|
SessionRagflow = sessionmaker(autocommit=False, autoflush=False, bind=engine_ragflow)
|
SessionDify = sessionmaker(autocommit=False, autoflush=False, bind=engine_dify)
|
|
|
class Flow(Base):
|
__tablename__ = 'flow'
|
id = Column(String(255), primary_key=True)
|
name = Column(String(255), nullable=False)
|
status = Column(Integer, nullable=False)
|
description = Column(String(255), nullable=False)
|
user_id = Column(Integer, nullable=False)
|
|
|
class Dialog(Base):
|
__tablename__ = 'dialog'
|
id = Column(String(255), primary_key=True)
|
name = Column(String(255), nullable=False)
|
status = Column(String(1), nullable=False)
|
description = Column(String(255), nullable=False)
|
tenant_id = Column(String(36), nullable=False)
|
|
|
class DfApps(Base):
|
__tablename__ = 'apps'
|
id = Column(String(36), primary_key=True)
|
name = Column(String(255), nullable=False)
|
status = Column(String(16), nullable=False)
|
description = Column(Text, nullable=False)
|
tenant_id = Column(String(36), nullable=False)
|
|
|
# 解析名字
|
def parse_names(names_str: str) -> List[str]:
|
return [name.strip() for name in names_str.split(',')]
|
|
|
BISHENG_NAMES_TO_SYNC = parse_names(settings.fetch_sgb_agent)
|
RAGFLOW_NAMES_TO_SYNC = parse_names(settings.fetch_fwr_agent)
|
|
|
def get_data_from_bisheng(names: List[str]) -> List[Tuple]:
|
db = SessionBisheng()
|
try:
|
if names:
|
query = db.query(Flow.id, Flow.name) \
|
.filter(Flow.status == 2, Flow.name.in_(names))
|
else:
|
query = db.query(Flow.id, Flow.name) \
|
.filter(Flow.status == 2)
|
|
results = query.all()
|
print(f"Executing query: {query}")
|
# 格式化id为UUID
|
formatted_results = [(format_uuid(row[0]), row[1]) for row in results]
|
return formatted_results
|
finally:
|
db.close()
|
|
|
def format_uuid(uuid_str: str) -> str:
|
# 确保输入字符串长度为32
|
if len(uuid_str) != 32:
|
raise ValueError("Input string must be 32 characters long")
|
|
# 插入连字符
|
formatted_uuid = f"{uuid_str[:8]}-{uuid_str[8:12]}-{uuid_str[12:16]}-{uuid_str[16:20]}-{uuid_str[20:]}"
|
return formatted_uuid
|
|
|
def get_data_from_ragflow(names: List[str]) -> List[Tuple]:
|
db = SessionRagflow()
|
try:
|
if names:
|
query = db.query(Dialog.id, Dialog.name) \
|
.filter(Dialog.status == 1, Dialog.name.in_(names))
|
else:
|
query = db.query(Dialog.id, Dialog.name) \
|
.filter(Dialog.status == 1)
|
|
results = query.all()
|
print(f"Executing query: {query}")
|
return results
|
finally:
|
db.close()
|
|
|
def update_ids_in_local(data: List[Tuple]):
|
db = SessionLocal()
|
try:
|
for row in data:
|
name = row[1]
|
new_id = row[0]
|
existing_agent = db.query(AgentModel).filter_by(name=name).first()
|
if existing_agent:
|
existing_agent.id = new_id
|
db.add(existing_agent)
|
db.commit()
|
except IntegrityError:
|
db.rollback()
|
raise
|
finally:
|
db.close()
|
|
|
def initialize_agents():
|
db = SessionLocal()
|
try:
|
count = db.query(AgentModel).count()
|
if count > 0:
|
result = db.query(AgentModel).delete()
|
db.commit() # 提交事务
|
initial_agents = [
|
# ('80ee430a-e396-48c4-a12c-7c7cdf5eda51', 1, '报告生成', 'DIFY', 'report'),
|
('basic_excel_merge', 2, '报表合并', 'BASIC', 'excelMerge'),
|
('7638f00638a24c21a68ec6c49b304a35', 4, '文档智能', 'DIFY', 'documentIa'),
|
('da3451da89d911efb9490242ac190006', 3, '知识问答', 'RAGFLOW', 'knowledgeQA'),
|
('e96eb7a589db11ef87d20242ac190006', 5, '智能问答', 'RAGFLOW', 'chat'),
|
('basic_excel_talk', 6, '智能数据', 'BASIC', 'excelTalk'),
|
('basic_question_talk', 7, '出题组卷', 'BASIC', 'questionTalk'),
|
('9d75142a-66eb-4e23-b7d4-03efe4584915', 8, '小数绘图', 'DIFY', 'imageTalk'),
|
('basic_paper_talk', 9, '文档出卷', 'BASIC', 'paperTalk'),
|
('basic_report_clean', 10, '文档报告', 'DIFY', 'reportWorkflow')
|
]
|
|
for agent in initial_agents:
|
agent_id = format_uuid(agent[0]) if len(agent[0]) == 32 else agent[0]
|
db.add(AgentModel(id=agent_id, sort=agent[1], name=agent[2], agent_type=agent[3], type=agent[4]))
|
|
db.commit()
|
print("Initial agents inserted successfully")
|
except IntegrityError:
|
db.rollback()
|
raise
|
finally:
|
db.close()
|
|
|
def sync_agents():
|
try:
|
# bisheng_data = get_data_from_bisheng(BISHENG_NAMES_TO_SYNC)
|
ragflow_data = get_data_from_ragflow(RAGFLOW_NAMES_TO_SYNC)
|
|
# update_ids_in_local(bisheng_data)
|
update_ids_in_local(ragflow_data)
|
|
print("Agents synchronized successfully")
|
except Exception as e:
|
print(f"Failed to sync agents: {str(e)}")
|
|
|
def update_ids_in_local(data: List[Tuple]):
|
db = SessionLocal()
|
try:
|
for row in data:
|
name = row[1]
|
new_id = row[0]
|
existing_agent = db.query(AgentModel).filter_by(name=name).first()
|
if existing_agent:
|
existing_agent.id = new_id
|
db.add(existing_agent)
|
db.commit()
|
except IntegrityError:
|
db.rollback()
|
raise
|
finally:
|
db.close()
|
|
def get_rag_user_id(db, tenant_id, app_type):
|
|
user = db.query(UserAppModel).filter(UserAppModel.app_type==app_type, UserAppModel.app_id==tenant_id).first()
|
if user:
|
return user.user_id
|
return tenant_id
|
|
|
def get_data_from_bisheng_v2(names: List[str]) -> List[Dict]:
|
db = SessionBisheng()
|
try:
|
if names:
|
query = db.query(Flow.id, Flow.name, Flow.description, Flow.status, Flow.user_id) \
|
.filter(Flow.name.in_(names), Flow.status==2)
|
else:
|
query = db.query(Flow.id, Flow.name, Flow.description, Flow.status, Flow.user_id).filter(Flow.status==2)
|
|
results = query.all()
|
# print(f"Executing query: {query}")
|
# 格式化id为UUID
|
formatted_results = [{"id":format_uuid(row[0]), "name": row[1], "description": row[2], "status": str(row[3]-1), "user_id": str(row[4])} for row in results]
|
return formatted_results
|
finally:
|
db.close()
|
|
def get_data_from_ragflow_v2(names: List[str]) -> List[Dict]:
|
db = SessionRagflow()
|
try:
|
if names:
|
query = db.query(Dialog.id, Dialog.name, Dialog.description, Dialog.status, Dialog.tenant_id) \
|
.filter( Dialog.name.in_(names))
|
else:
|
query = db.query(Dialog.id, Dialog.name, Dialog.description, Dialog.status, Dialog.tenant_id)
|
|
results = query.all()
|
formatted_results = [
|
{"id": format_uuid(row[0]), "name": row[1], "description": row[2], "status": str(row[3]),
|
"user_id": str(row[4])} for row in results]
|
return formatted_results
|
finally:
|
db.close()
|
|
|
def get_data_from_dify_v2(names: List[str]) -> List[Dict]:
|
db = SessionDify()
|
try:
|
if names:
|
query = db.query(DfApps.id, DfApps.name, DfApps.description, DfApps.status, DfApps.tenant_id) \
|
.filter( DfApps.name.in_(names))
|
else:
|
query = db.query(DfApps.id, DfApps.name, DfApps.description, DfApps.status, DfApps.tenant_id)
|
|
results = query.all()
|
formatted_results = [
|
{"id": str(row[0]), "name": row[1], "description": row[2], "status": "1",
|
"user_id": str(row[4])} for row in results]
|
return formatted_results
|
finally:
|
db.close()
|
|
|
|
def update_ids_in_local_v2(data: List[Dict], dialog_type:str):
|
db = SessionLocal()
|
agent_id_list = []
|
type_dict = {"1": RAGFLOW,"2": BISHENG,"4": DIFY}
|
try:
|
for row in data:
|
agent_id_list.append(row["id"])
|
existing_agent = db.query(DialogModel).filter_by(id=row["id"]).first()
|
if existing_agent:
|
existing_agent.name = row["name"]
|
existing_agent.description = row["description"]
|
existing_agent.tenant_id = get_rag_user_id(db, row["user_id"], type_dict[dialog_type])
|
else:
|
existing = DialogModel(id=row["id"], name=row["name"], description=row["description"], tenant_id=get_rag_user_id(db, row["user_id"], type_dict[dialog_type]), dialog_type=dialog_type)
|
db.add(existing)
|
db.commit()
|
for dialog in db.query(DialogModel).filter_by(dialog_type=dialog_type).all():
|
if dialog.id not in agent_id_list:
|
db.query(DialogModel).filter_by(id=dialog.id).delete()
|
db.commit()
|
except IntegrityError:
|
db.rollback()
|
raise
|
finally:
|
db.close()
|
|
|
|
def get_data_from_ragflow_knowledge():
|
db = SessionRagflow()
|
try:
|
|
results = db.query(Dialog.id, Dialog.name, Dialog.description, Dialog.status, Dialog.tenant_id).all()
|
formatted_results = [
|
{"id": format_uuid(row[0]), "name": row[1], "description": row[2], "status": str(row[3]),
|
"user_id": str(row[4])} for row in results]
|
return formatted_results
|
finally:
|
db.close()
|
|
def sync_agents_v2():
|
db = SessionLocal()
|
|
try:
|
app_register = AppRegisterDao(db).get_apps()
|
for app in app_register:
|
if app["id"] == RAGFLOW:
|
ragflow_data = get_data_from_ragflow_v2([])
|
update_ids_in_local_v2(ragflow_data, "1")
|
elif app["id"] == BISHENG:
|
bisheng_data = get_data_from_bisheng_v2([])
|
update_ids_in_local_v2(bisheng_data, "2")
|
elif app["id"] == DIFY:
|
dify_data = get_data_from_dify_v2([])
|
update_ids_in_local_v2(dify_data, "4")
|
print("Agents synchronized successfully")
|
except Exception as e:
|
print(f"Failed to sync agents: {str(e)}")
|
|
|
|
def sync_knowledge():
|
db = SessionLocal()
|
|
try:
|
app_register = AppRegisterDao(db).get_apps()
|
for app in app_register:
|
if app["id"] == RAGFLOW:
|
ragflow_data = get_data_from_ragflow_knowledge([])
|
update_ids_in_local_v2(ragflow_data, "1")
|
# elif app["id"] == BISHENG:
|
# bisheng_data = get_data_from_bisheng_v2([])
|
# update_ids_in_local_v2(bisheng_data, "2")
|
# elif app["id"] == DIFY:
|
# dify_data = get_data_from_dify_v2([])
|
# update_ids_in_local_v2(dify_data, "4")
|
print("Agents synchronized successfully")
|
except Exception as e:
|
print(f"Failed to sync agents: {str(e)}")
|
|
|
if __name__ == "__main__":
|
a = get_data_from_dify_v2([])
|
print(a)
|