from datetime import datetime # from cryptography.fernet import Fernet from sqlalchemy import Column, Integer, String, Table, ForeignKey, DateTime from sqlalchemy.orm import relationship, backref from app.config.config import settings from app.models.base_model import Base user_organization_table = Table('user_organization',Base.metadata , Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE')) , Column('organization_id', String(36), ForeignKey('organization.id', ondelete='CASCADE'))) user_role_table = Table('user_role', Base.metadata , Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE')) , Column('role_id', String(36), ForeignKey('role.id', ondelete='CASCADE'))) user_group_table = Table('user_group', Base.metadata , Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE')) , Column('group_id', Integer, ForeignKey('group.id', ondelete='CASCADE'))) # cipher_suite = Fernet(settings.PASSWORD_KEY.encode("utf-8")) class UserModel(Base): __tablename__ = "user" id = Column(Integer, primary_key=True, index=True) username = Column(String(255), unique=True, index=True) hashed_password = Column(String(255)) password = Column(String(255)) compellation = Column(String(255), nullable=False, default="") phone = Column(String(255), nullable=False, default="") email = Column(String(255), nullable=False, default="") description = Column(String(255), nullable=False, default="") ragflow_id = Column(String(32)) bisheng_id = Column(Integer) login_name = Column(String(100)) status = Column(String(10), nullable=False, default="1") creator = Column(String(36)) sex = Column(String(1)) permission = Column(String(16), nullable=False, default="general") age = Column(Integer) created_at = Column(DateTime, default=datetime.now()) updated_at = Column(DateTime, default=datetime.now(), onupdate=datetime.now()) organizations = relationship('OrganizationModel', secondary=user_organization_table, backref=backref('users', lazy='dynamic')) roles = relationship('RoleModel', secondary=user_role_table, backref=backref('users', lazy='dynamic')) groups = relationship('GroupModel', secondary=user_group_table, backref=backref('users', lazy='dynamic')) def have_permission(self, url): permissions = [] for resource in self.resources: permissions.extend(resource) if filter(lambda x: x.URL == url, permissions): return True permissions = [] for organization in self.organizations: permissions.extend([resource for resource in organization.resources]) return filter(lambda x: x.NAME == url, permissions) def __repr__(self): return '\n' % (self.NAME) def to_dict(self): return { 'userId': self.id, 'userName': self.username, 'loginName': self.login_name if self.login_name else "", 'status': self.status, } def to_json(self): json = { 'userId': self.id, 'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", 'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", 'userName': self.username, 'loginName': self.login_name, 'sex': self.sex, 'age': self.age, "status": self.status, 'phone': self.phone, 'email': self.email, # 'phoneNumber': self.phone_number } if len(self.organizations) > 0: json['dept'] = [organization.to_json() for organization in self.organizations] json['groups'] = [group.to_dict() for group in self.groups] roles = [] # if len(self.roles) > 0: roles = [role.to_json() for role in self.roles] organization_roles = [role.to_json() for organization in self.organizations for role in organization.roles] for role in organization_roles: if role not in roles: roles.append(role) json['roles'] = roles return json def to_login_json(self): json = { 'userId': self.id, 'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", 'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", 'userName': self.username, 'loginName': self.login_name, 'sex': self.sex, 'age': self.age, "status": self.status, 'phone': self.phone, 'email': self.email, # 'phoneNumber': self.phone_number } parent_dict = {} children_dict = {} for role in self.roles: for r in role.to_json().get("resources", []): if r["menuType"] != "1": parent_dict[r["menuId"]] = r else: if r["parentId"] in children_dict: children_dict[r["parentId"]].append(r) else: children_dict[r["parentId"]] = [r] organization_roles = [role.to_json() for organization in self.organizations for role in organization.roles] for role in organization_roles: for r in role.to_json().get("resources", []): if r["menuType"] != "1": parent_dict[r["menuId"]] = r else: if r["parentId"] in children_dict: children_dict[r["parentId"]].append(r) else: children_dict[r["parentId"]] = [r] resources = [] for resource in parent_dict.values(): resource["children"] = children_dict.get(resource["menuId"], []) resources.append(resource) json['resources'] = resources return json def get_children(self): """递归获取指定用户的子用户ID列表""" users = UserModel.query.filter_by(CREATOR=self.ID).all() user_id_list = [user.ID for user in users] for user in users: user_id_list.extend(user.get_children()) return user_id_list def encrypted_password(self, password): return cipher_suite.encrypt(password.encode("utf-8")).decode("utf-8") def decrypted_password(self): return cipher_suite.decrypt(self.password).decode("utf-8")