zhaoqingang
2024-12-18 c941b948f1fa8eff615985a83110a401ae426ab3
app/models/user_model.py
@@ -1,7 +1,7 @@
from datetime import datetime
# from cryptography.fernet import Fernet
from sqlalchemy import Column, Integer, String, Table, ForeignKey, DateTime
from cryptography.fernet import Fernet
from sqlalchemy import Column, Integer, String, Table, ForeignKey, DateTime, UniqueConstraint
from sqlalchemy.orm import relationship, backref
from app.config.config import settings
@@ -18,7 +18,7 @@
user_group_table = Table('user_group', Base.metadata
                           , Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE'))
                           , Column('group_id', Integer, ForeignKey('group.id', ondelete='CASCADE')))
# cipher_suite = Fernet(settings.PASSWORD_KEY.encode("utf-8"))
cipher_suite = Fernet(settings.PASSWORD_KEY.encode("utf-8"))
class UserModel(Base):
@@ -27,20 +27,22 @@
    username = Column(String(255), unique=True, index=True)
    hashed_password = Column(String(255))
    password = Column(String(255))
    compellation = Column(String(255), nullable=False, default="")
    phone = Column(String(255), nullable=False, default="")
    email = Column(String(255), nullable=False, default="")
    description = Column(String(255), nullable=False, default="")
    compellation = Column(String(255), default="")
    phone = Column(String(255),  default="")
    email = Column(String(255),  default="")
    description = Column(String(255), default="")
    ragflow_id = Column(String(32))
    bisheng_id = Column(Integer)
    login_name = Column(String(100))
    status = Column(String(10), nullable=False, default="1")
    status = Column(String(10),  default="1")
    creator = Column(String(36))
    sex = Column(String(1))
    permission = Column(String(16), nullable=False, default="general")
    permission = Column(String(16), default="general")
    age = Column(Integer)
    sync_flag = Column(String(36))
    created_at = Column(DateTime, default=datetime.now())
    updated_at = Column(DateTime, default=datetime.now(), onupdate=datetime.now())
    organizations = relationship('OrganizationModel',
@@ -81,6 +83,9 @@
            'userName': self.username,
            'loginName': self.login_name if self.login_name else "",
            'status': self.status,
            'email': self.email,
            'phone': self.phone,
            'permission':self.permission
        }
    def to_json(self):
@@ -98,22 +103,36 @@
            # 'phoneNumber': self.phone_number
        }
        if len(self.organizations) > 0:
            json['dept'] = [organization.to_json() for organization in self.organizations]
        # json['dept'] = [organization.to_json() for organization in self.organizations]
        json['groups'] = [group.to_dict() for group in self.groups]
        roles = []
        # if len(self.roles) > 0:
        roles = [role.to_json() for role in self.roles]
        organization_roles = [role.to_json() for organization in self.organizations for role in
                              organization.roles]
        for role in organization_roles:
            if role not in roles:
                roles.append(role)
        json['roles'] = roles
        roles = {role.id: role.to_dict() for role in self.roles}
        ogt_set = set()
        for ogt in self.organizations:
            if ogt.id in ogt_set:
                continue
            print(ogt.id)
            ogt_set.add(ogt.id)
            for role in ogt.roles:
                roles[role.id] = role.to_dict()
            parent_ogt = ogt.parent
            while parent_ogt:
                if parent_ogt.id not in ogt_set:
                    ogt_set.add(ogt.id)
                    for role in parent_ogt.roles:
                        roles[role.id] = role.to_dict()
                    parent_ogt = parent_ogt.parent
                else:
                    break
        json['roles'] = list(roles.values())
        return json
    def to_login_json(self):
        json = {
@@ -172,4 +191,44 @@
        return cipher_suite.encrypt(password.encode("utf-8")).decode("utf-8")
    def decrypted_password(self):
        return cipher_suite.decrypt(self.password).decode("utf-8")
        return cipher_suite.decrypt(self.password).decode("utf-8")
class UserAppModel(Base):
    __tablename__ = "user_app"
    __table_args__ = (UniqueConstraint('user_id', 'app_type', name='user_app_id_ix'),)
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(255))
    password = Column(String(255))
    email = Column(String(255),  default="")
    user_id = Column(Integer)
    app_id = Column(String(36))
    app_type = Column(String(16))
    status = Column(String(10),  default="1")
    access_token = Column(String(1000))
    refresh_token = Column(String(1000))
    token_at = Column(DateTime, default=datetime.now())
    created_at = Column(DateTime, default=datetime.now())
    updated_at = Column(DateTime, default=datetime.now(), onupdate=datetime.now())
    def to_json(self):
        return {
            'id': self.id,
            'userName': self.username,
            'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
            'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
            'password': self.password,
            'email': self.email,
            'user_id': self.user_id,
            'app_id': self.app_id,
            "app_type": self.app_type,
            'status': self.status,
        }
    @staticmethod
    def encrypted_password(password):
        return cipher_suite.encrypt(password.encode("utf-8")).decode("utf-8")
    @staticmethod
    def decrypted_password(password):
        return cipher_suite.decrypt(password).decode("utf-8")