import json from datetime import datetime from enum import IntEnum import pytz from sqlalchemy import Column, String, Enum as SQLAlchemyEnum, Integer, DateTime, JSON, TEXT from app.models.agent_model import AgentType # from app.models import current_time from app.models.base_model import Base def current_time(): tz = pytz.timezone('Asia/Shanghai') return datetime.now(tz) class SessionModel(Base): __tablename__ = "sessions" id = Column(String(255), primary_key=True) name = Column(String(255)) agent_id = Column(String(255)) agent_type = Column(SQLAlchemyEnum(AgentType), nullable=False) # 目前只存basic的,ragflow和bisheng的调接口获取 create_date = Column(DateTime, default=current_time) # 创建时间,默认值为当前时区时间 update_date = Column(DateTime, default=current_time, onupdate=current_time, index=True) # 更新时间,默认值为当前时区时间,更新时自动更新 tenant_id = Column(Integer) # 创建人 message = Column(TEXT) # 说明 conversation_id = Column(String(64)) workflow = Column(Integer, default=0) # to_dict 方法 def to_dict(self): return { 'id': self.id, 'name': self.name, 'agent_type': self.agent_type, 'agent_id': self.agent_id, 'workflow': self.workflow if self.workflow else 0, 'create_date': self.create_date.strftime("%Y-%m-%d %H:%M:%S"), 'update_date': self.update_date.strftime("%Y-%m-%d %H:%M:%S"), } def log_to_json(self): return { 'id': self.id, 'name': self.name, 'agent_type': self.agent_type, 'agent_id': self.agent_id, 'create_date': self.create_date.strftime("%Y-%m-%d %H:%M:%S"), 'update_date': self.update_date.strftime("%Y-%m-%d %H:%M:%S"), 'message': json.loads(self.message) } def add_message(self, message: dict): if self.message is None: self.message = '[]' try: msg = json.loads(self.message) msg.append(message) except Exception as e: return self.message = json.dumps(msg)