from datetime import datetime from cryptography.fernet import Fernet from sqlalchemy import Column, Integer, String, Table, ForeignKey, DateTime, UniqueConstraint from sqlalchemy.orm import relationship, backref from app.config.config import settings from app.models.base_model import Base user_organization_table = Table('user_organization',Base.metadata , Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE')) , Column('organization_id', String(36), ForeignKey('organization.id', ondelete='CASCADE'))) user_role_table = Table('user_role', Base.metadata , Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE')) , Column('role_id', String(36), ForeignKey('role.id', ondelete='CASCADE'))) user_group_table = Table('user_group', Base.metadata , Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE')) , Column('group_id', Integer, ForeignKey('group.id', ondelete='CASCADE'))) cipher_suite = Fernet(settings.PASSWORD_KEY.encode("utf-8")) class UserModel(Base): __tablename__ = "user" id = Column(Integer, primary_key=True, index=True) username = Column(String(255), unique=True, index=True) hashed_password = Column(String(255)) password = Column(String(255)) compellation = Column(String(255), default="") phone = Column(String(255), default="") email = Column(String(255), default="") description = Column(String(255), default="") ragflow_id = Column(String(32)) bisheng_id = Column(Integer) login_name = Column(String(100)) status = Column(String(10), default="1") creator = Column(String(36)) sex = Column(String(1)) permission = Column(String(16), default="general") age = Column(Integer) sync_flag = Column(String(36)) created_at = Column(DateTime, default=datetime.now()) updated_at = Column(DateTime, default=datetime.now(), onupdate=datetime.now()) organizations = relationship('OrganizationModel', secondary=user_organization_table, backref=backref('users', lazy='dynamic')) roles = relationship('RoleModel', secondary=user_role_table, backref=backref('users', lazy='dynamic')) groups = relationship('GroupModel', secondary=user_group_table, backref=backref('users', lazy='dynamic')) def have_permission(self, url): permissions = [] for resource in self.resources: permissions.extend(resource) if filter(lambda x: x.URL == url, permissions): return True permissions = [] for organization in self.organizations: permissions.extend([resource for resource in organization.resources]) return filter(lambda x: x.NAME == url, permissions) def __repr__(self): return '\n' % (self.NAME) def to_dict(self): return { 'userId': self.id, 'userName': self.username, 'loginName': self.login_name if self.login_name else "", 'status': self.status, 'email': self.email, 'phone': self.phone, 'permission':self.permission } def to_json(self): json = { 'userId': self.id, 'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", 'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", 'userName': self.username, 'loginName': self.login_name, 'sex': self.sex, 'age': self.age, "status": self.status, 'phone': self.phone, 'email': self.email, # 'phoneNumber': self.phone_number } # json['dept'] = [organization.to_json() for organization in self.organizations] json['groups'] = [group.to_dict() for group in self.groups] # if len(self.roles) > 0: roles = {role.id: role.to_dict() for role in self.roles} # ogt_set = set() # for ogt in self.organizations: # if ogt.id in ogt_set: # continue # print(ogt.id) # ogt_set.add(ogt.id) # for role in ogt.roles: # roles[role.id] = role.to_dict() # parent_ogt = ogt.parent # while parent_ogt: # if parent_ogt.id not in ogt_set: # ogt_set.add(ogt.id) # for role in parent_ogt.roles: # roles[role.id] = role.to_dict() # parent_ogt = parent_ogt.parent # else: # break json['roles'] = list(roles.values()) json['depts'] = [i.to_base_json() for i in self.organizations] return json def to_login_json(self): json = { 'userId': self.id, 'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", 'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", 'userName': self.username, 'loginName': self.login_name, 'sex': self.sex, 'age': self.age, "status": self.status, 'phone': self.phone, 'email': self.email, # 'phoneNumber': self.phone_number } parent_dict = {} children_dict = {} for role in self.roles: for r in role.to_json().get("resources", []): if r["menuType"] != "1": parent_dict[r["menuId"]] = r else: if r["parentId"] in children_dict: children_dict[r["parentId"]].append(r) else: children_dict[r["parentId"]] = [r] organization_roles = [role.to_json() for organization in self.organizations for role in organization.roles] for role in organization_roles: for r in role.to_json().get("resources", []): if r["menuType"] != "1": parent_dict[r["menuId"]] = r else: if r["parentId"] in children_dict: children_dict[r["parentId"]].append(r) else: children_dict[r["parentId"]] = [r] resources = [] for resource in parent_dict.values(): resource["children"] = children_dict.get(resource["menuId"], []) resources.append(resource) json['resources'] = resources return json def get_children(self): """递归获取指定用户的子用户ID列表""" users = UserModel.query.filter_by(CREATOR=self.ID).all() user_id_list = [user.ID for user in users] for user in users: user_id_list.extend(user.get_children()) return user_id_list def encrypted_password(self, password): return cipher_suite.encrypt(password.encode("utf-8")).decode("utf-8") def decrypted_password(self): return cipher_suite.decrypt(self.password).decode("utf-8") class UserAppModel(Base): __tablename__ = "user_app" __table_args__ = (UniqueConstraint('user_id', 'app_type', name='user_app_id_ix'),) id = Column(Integer, primary_key=True, index=True) username = Column(String(255)) password = Column(String(255)) email = Column(String(255), default="") user_id = Column(Integer) app_id = Column(String(36)) app_type = Column(String(16)) status = Column(String(10), default="1") access_token = Column(String(1000)) refresh_token = Column(String(1000)) token_at = Column(DateTime, default=datetime.now()) created_at = Column(DateTime, default=datetime.now()) updated_at = Column(DateTime, default=datetime.now(), onupdate=datetime.now()) def to_json(self): return { 'id': self.id, 'userName': self.username, 'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", 'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", 'password': self.password, 'email': self.email, 'user_id': self.user_id, 'app_id': self.app_id, "app_type": self.app_type, 'status': self.status, } @staticmethod def encrypted_password(password): return cipher_suite.encrypt(password.encode("utf-8")).decode("utf-8") @staticmethod def decrypted_password(password): return cipher_suite.decrypt(password).decode("utf-8") class UserTokenModel(Base): __tablename__ = "user_token" id = Column(String(16), primary_key=True) account = Column(String(255)) password = Column(String(255)) access_token = Column(String(1000)) refresh_token = Column(String(1000)) created_at = Column(DateTime, default=datetime.now()) updated_at = Column(DateTime, default=datetime.now()) def to_json(self): return { 'id': self.id, 'account': self.username, 'createTime': self.created_at, 'updateTime': self.updated_at, 'password': self.password, 'access_token': self.access_token, 'refresh_token': self.refresh_token, } @staticmethod def encrypted_password(password): return cipher_suite.encrypt(password.encode("utf-8")).decode("utf-8") @staticmethod def decrypted_password(password): return cipher_suite.decrypt(password).decode("utf-8")