zhaoqingang
2024-12-12 057f034d4cd728c1bd0284e7c6b4a47739d5220d
app/models/user_model.py
@@ -1,6 +1,24 @@
from sqlalchemy import Column, Integer, String
from datetime import datetime
from cryptography.fernet import Fernet
from sqlalchemy import Column, Integer, String, Table, ForeignKey, DateTime, UniqueConstraint
from sqlalchemy.orm import relationship, backref
from app.config.config import settings
from app.models.base_model import Base
user_organization_table = Table('user_organization',Base.metadata
                                   , Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE'))
                                   , Column('organization_id', String(36), ForeignKey('organization.id', ondelete='CASCADE')))
user_role_table = Table('user_role', Base.metadata
                           , Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE'))
                           , Column('role_id', String(36), ForeignKey('role.id', ondelete='CASCADE')))
user_group_table = Table('user_group', Base.metadata
                           , Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE'))
                           , Column('group_id', Integer, ForeignKey('group.id', ondelete='CASCADE')))
cipher_suite = Fernet(settings.PASSWORD_KEY.encode("utf-8"))
class UserModel(Base):
@@ -8,9 +26,203 @@
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(255), unique=True, index=True)
    hashed_password = Column(String(255))
    compellation = Column(String(255), nullable=False, default="")
    phone = Column(String(255), nullable=False, default="")
    email = Column(String(255), nullable=False, default="")
    description = Column(String(255), nullable=False, default="")
    ragflow_id = Column(String(32), unique=True, index=True)
    bisheng_id = Column(Integer, unique=True, index=True)
    password = Column(String(255))
    compellation = Column(String(255), default="")
    phone = Column(String(255),  default="")
    email = Column(String(255),  default="")
    description = Column(String(255), default="")
    ragflow_id = Column(String(32))
    bisheng_id = Column(Integer)
    login_name = Column(String(100))
    status = Column(String(10),  default="1")
    creator = Column(String(36))
    sex = Column(String(1))
    permission = Column(String(16), default="general")
    age = Column(Integer)
    created_at = Column(DateTime, default=datetime.now())
    updated_at = Column(DateTime, default=datetime.now(), onupdate=datetime.now())
    organizations = relationship('OrganizationModel',
                                    secondary=user_organization_table,
                                    backref=backref('users', lazy='dynamic'))
    roles = relationship('RoleModel',
                            secondary=user_role_table,
                            backref=backref('users', lazy='dynamic'))
    groups = relationship('GroupModel',
                         secondary=user_group_table,
                         backref=backref('users', lazy='dynamic'))
    def have_permission(self, url):
        permissions = []
        for resource in self.resources:
            permissions.extend(resource)
        if filter(lambda x: x.URL == url, permissions):
            return True
        permissions = []
        for organization in self.organizations:
            permissions.extend([resource for resource in organization.resources])
        return filter(lambda x: x.NAME == url, permissions)
    def __repr__(self):
        return '<User %r>\n' % (self.NAME)
    def to_dict(self):
        return {
            'userId': self.id,
            'userName': self.username,
            'loginName': self.login_name if self.login_name else "",
            'status': self.status,
            'email': self.email,
            'phone': self.phone,
            'permission':self.permission
        }
    def to_json(self):
        json = {
            'userId': self.id,
            'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
            'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
            'userName': self.username,
            'loginName': self.login_name,
            'sex': self.sex,
            'age': self.age,
            "status": self.status,
            'phone': self.phone,
            'email': self.email,
            # 'phoneNumber': self.phone_number
        }
        # json['dept'] = [organization.to_json() for organization in self.organizations]
        json['groups'] = [group.to_dict() for group in self.groups]
        # if len(self.roles) > 0:
        roles = {role.id: role.to_dict() for role in self.roles}
        ogt_set = set()
        for ogt in self.organizations:
            if ogt.id in ogt_set:
                continue
            ogt_set.add(ogt.id)
            for role in ogt.roles:
                roles[role.id] = role.to_dict()
            parent_ogt = ogt.parent
            while parent_ogt:
                if parent_ogt.id not in ogt_set:
                    ogt_set.add(ogt.id)
                    for role in ogt.roles:
                        roles[role.id] = role.to_dict()
                    parent_ogt = ogt.parent
        json['roles'] = list(roles.values())
        return json
    def to_login_json(self):
        json = {
            'userId': self.id,
            'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
            'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
            'userName': self.username,
            'loginName': self.login_name,
            'sex': self.sex,
            'age': self.age,
            "status": self.status,
            'phone': self.phone,
            'email': self.email,
            # 'phoneNumber': self.phone_number
        }
        parent_dict = {}
        children_dict = {}
        for role in self.roles:
            for r in  role.to_json().get("resources", []):
                if r["menuType"] != "1":
                    parent_dict[r["menuId"]] = r
                else:
                    if r["parentId"] in children_dict:
                        children_dict[r["parentId"]].append(r)
                    else:
                        children_dict[r["parentId"]] = [r]
        organization_roles = [role.to_json() for organization in self.organizations for role in
                              organization.roles]
        for role in organization_roles:
            for r in role.to_json().get("resources", []):
                if r["menuType"] != "1":
                    parent_dict[r["menuId"]] = r
                else:
                    if r["parentId"] in children_dict:
                        children_dict[r["parentId"]].append(r)
                    else:
                        children_dict[r["parentId"]] = [r]
        resources = []
        for resource in parent_dict.values():
            resource["children"] = children_dict.get(resource["menuId"], [])
            resources.append(resource)
        json['resources'] = resources
        return json
    def get_children(self):
        """递归获取指定用户的子用户ID列表"""
        users = UserModel.query.filter_by(CREATOR=self.ID).all()
        user_id_list = [user.ID for user in users]
        for user in users:
            user_id_list.extend(user.get_children())
        return user_id_list
    def encrypted_password(self, password):
        return cipher_suite.encrypt(password.encode("utf-8")).decode("utf-8")
    def decrypted_password(self):
        return cipher_suite.decrypt(self.password).decode("utf-8")
class UserAppModel(Base):
    __tablename__ = "user_app"
    __table_args__ = (UniqueConstraint('user_id', 'app_type', name='user_app_id_ix'),)
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(255))
    password = Column(String(255))
    email = Column(String(255),  default="")
    user_id = Column(Integer)
    app_id = Column(String(36))
    app_type = Column(String(16))
    status = Column(String(10),  default="1")
    access_token = Column(String(1000))
    refresh_token = Column(String(1000))
    token_at = Column(DateTime, default=datetime.now())
    created_at = Column(DateTime, default=datetime.now())
    updated_at = Column(DateTime, default=datetime.now(), onupdate=datetime.now())
    def to_json(self):
        return {
            'id': self.id,
            'userName': self.username,
            'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
            'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
            'password': self.password,
            'email': self.email,
            'user_id': self.user_id,
            'app_id': self.app_id,
            "app_type": self.app_type,
            'status': self.status,
        }
    def encrypted_password(self, password):
        return cipher_suite.encrypt(password.encode("utf-8")).decode("utf-8")
    def decrypted_password(self):
        return cipher_suite.decrypt(self.password).decode("utf-8")