zhaoqingang
2025-03-13 30ff0afd5d76a3a5aa48058210ae411253574ada
app/models/user_model.py
@@ -1,7 +1,7 @@
from datetime import datetime
# from cryptography.fernet import Fernet
from sqlalchemy import Column, Integer, String, Table, ForeignKey, DateTime
from cryptography.fernet import Fernet
from sqlalchemy import Column, Integer, String, Table, ForeignKey, DateTime, UniqueConstraint
from sqlalchemy.orm import relationship, backref
from app.config.config import settings
@@ -18,7 +18,7 @@
user_group_table = Table('user_group', Base.metadata
                           , Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE'))
                           , Column('group_id', Integer, ForeignKey('group.id', ondelete='CASCADE')))
# cipher_suite = Fernet(settings.PASSWORD_KEY.encode("utf-8"))
cipher_suite = Fernet(settings.PASSWORD_KEY.encode("utf-8"))
class UserModel(Base):
@@ -27,20 +27,22 @@
    username = Column(String(255), unique=True, index=True)
    hashed_password = Column(String(255))
    password = Column(String(255))
    compellation = Column(String(255), nullable=False, default="")
    phone = Column(String(255), nullable=False, default="")
    email = Column(String(255), nullable=False, default="")
    description = Column(String(255), nullable=False, default="")
    compellation = Column(String(255), default="")
    phone = Column(String(255),  default="")
    email = Column(String(255),  default="")
    description = Column(String(255), default="")
    ragflow_id = Column(String(32))
    bisheng_id = Column(Integer)
    login_name = Column(String(100))
    status = Column(String(10), nullable=False, default="1")
    status = Column(String(10),  default="1")
    creator = Column(String(36))
    sex = Column(String(1))
    permission = Column(String(16), nullable=False, default="general")
    permission = Column(String(16), default="general")
    age = Column(Integer)
    sync_flag = Column(String(36))
    created_at = Column(DateTime, default=datetime.now())
    updated_at = Column(DateTime, default=datetime.now(), onupdate=datetime.now())
    organizations = relationship('OrganizationModel',
@@ -81,6 +83,9 @@
            'userName': self.username,
            'loginName': self.login_name if self.login_name else "",
            'status': self.status,
            'email': self.email,
            'phone': self.phone,
            'permission':self.permission
        }
    def to_json(self):
@@ -98,22 +103,37 @@
            # 'phoneNumber': self.phone_number
        }
        if len(self.organizations) > 0:
            json['dept'] = [organization.to_json() for organization in self.organizations]
        # json['dept'] = [organization.to_json() for organization in self.organizations]
        json['groups'] = [group.to_dict() for group in self.groups]
        roles = []
        # if len(self.roles) > 0:
        roles = [role.to_json() for role in self.roles]
        organization_roles = [role.to_json() for organization in self.organizations for role in
                              organization.roles]
        for role in organization_roles:
            if role not in roles:
                roles.append(role)
        json['roles'] = roles
        roles = {role.id: role.to_dict() for role in self.roles}
        # ogt_set = set()
        # for ogt in self.organizations:
        #     if ogt.id in ogt_set:
        #         continue
        #     print(ogt.id)
        #     ogt_set.add(ogt.id)
        #     for role in ogt.roles:
        #         roles[role.id] = role.to_dict()
        #     parent_ogt = ogt.parent
        #     while parent_ogt:
        #         if parent_ogt.id not in ogt_set:
        #             ogt_set.add(ogt.id)
        #             for role in parent_ogt.roles:
        #                 roles[role.id] = role.to_dict()
        #             parent_ogt = parent_ogt.parent
        #         else:
        #             break
        json['roles'] = list(roles.values())
        json['depts'] = [i.to_base_json() for i in self.organizations]
        return json
    def to_login_json(self):
        json = {
@@ -169,7 +189,92 @@
    def encrypted_password(self, password):
        return cipher_suite.encrypt(password.encode("utf-8")).decode("utf-8")
        return cipher_suite.encrypt(str(password).encode("utf-8")).decode("utf-8")
    def decrypted_password(self):
        return cipher_suite.decrypt(self.password).decode("utf-8")
        return cipher_suite.decrypt(self.password).decode("utf-8")
class UserAppModel(Base):
    __tablename__ = "user_app"
    __table_args__ = (UniqueConstraint('user_id', 'app_type', name='user_app_id_ix'),)
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(255))
    password = Column(String(255))
    email = Column(String(255),  default="")
    user_id = Column(Integer)
    app_id = Column(String(36))
    app_type = Column(String(16))
    status = Column(String(10),  default="1")
    access_token = Column(String(1000))
    refresh_token = Column(String(1000))
    token_at = Column(DateTime, default=datetime.now())
    created_at = Column(DateTime, default=datetime.now())
    updated_at = Column(DateTime, default=datetime.now(), onupdate=datetime.now())
    def to_json(self):
        return {
            'id': self.id,
            'userName': self.username,
            'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
            'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
            'password': self.password,
            'email': self.email,
            'user_id': self.user_id,
            'app_id': self.app_id,
            "app_type": self.app_type,
            'status': self.status,
        }
    @staticmethod
    def encrypted_password(password):
        return cipher_suite.encrypt(password.encode("utf-8")).decode("utf-8")
    @staticmethod
    def decrypted_password(password):
        return cipher_suite.decrypt(password).decode("utf-8")
class UserTokenModel(Base):
    __tablename__ = "user_token"
    id = Column(String(16), primary_key=True)
    account = Column(String(255))
    password = Column(String(255))
    access_token = Column(String(1000))
    refresh_token = Column(String(1000))
    created_at = Column(DateTime, default=datetime.now())
    updated_at = Column(DateTime, default=datetime.now())
    def to_json(self):
        return {
            'id': self.id,
            'account': self.username,
            'createTime': self.created_at,
            'updateTime': self.updated_at,
            'password': self.password,
            'access_token': self.access_token,
            'refresh_token': self.refresh_token,
        }
class UserApiTokenModel(Base):
    __tablename__ = "user_api_token"
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer)
    token = Column(String(40), index=True)
    created_at = Column(DateTime, default=datetime.now())
    updated_at = Column(DateTime, default=datetime.now())
    expires_at = Column(DateTime)
    is_active = Column(Integer, default=1)
    def to_json(self):
        return {
            'id': self.id,
            'account': self.username,
            'createTime': self.created_at,
            'updateTime': self.updated_at,
            'password': self.password,
            'access_token': self.access_token,
            'refresh_token': self.refresh_token,
        }