From 8a9efac2030ee03a5145d5cfa58c9b6edbf67ce2 Mon Sep 17 00:00:00 2001 From: zhaoqingang <zhaoqg0118@163.com> Date: 星期四, 13 二月 2025 11:05:00 +0800 Subject: [PATCH] 同步token --- app/models/user_model.py | 229 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 227 insertions(+), 2 deletions(-) diff --git a/app/models/user_model.py b/app/models/user_model.py index 3a07ae2..b76cdc8 100644 --- a/app/models/user_model.py +++ b/app/models/user_model.py @@ -1,10 +1,235 @@ -from sqlalchemy import Column, Integer, String +from datetime import datetime +from cryptography.fernet import Fernet +from sqlalchemy import Column, Integer, String, Table, ForeignKey, DateTime, UniqueConstraint +from sqlalchemy.orm import relationship, backref + +from app.config.config import settings from app.models.base_model import Base + +user_organization_table = Table('user_organization',Base.metadata + , Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE')) + , Column('organization_id', String(36), ForeignKey('organization.id', ondelete='CASCADE'))) + +user_role_table = Table('user_role', Base.metadata + , Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE')) + , Column('role_id', String(36), ForeignKey('role.id', ondelete='CASCADE'))) + +user_group_table = Table('user_group', Base.metadata + , Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE')) + , Column('group_id', Integer, ForeignKey('group.id', ondelete='CASCADE'))) +cipher_suite = Fernet(settings.PASSWORD_KEY.encode("utf-8")) class UserModel(Base): __tablename__ = "user" id = Column(Integer, primary_key=True, index=True) username = Column(String(255), unique=True, index=True) - hashed_password = Column(String(255)) \ No newline at end of file + hashed_password = Column(String(255)) + password = Column(String(255)) + compellation = Column(String(255), default="") + phone = Column(String(255), default="") + email = Column(String(255), default="") + description = Column(String(255), default="") + ragflow_id = Column(String(32)) + bisheng_id = Column(Integer) + login_name = Column(String(100)) + status = Column(String(10), default="1") + creator = Column(String(36)) + sex = Column(String(1)) + permission = Column(String(16), default="general") + age = Column(Integer) + sync_flag = Column(String(36)) + created_at = Column(DateTime, default=datetime.now()) + updated_at = Column(DateTime, default=datetime.now(), onupdate=datetime.now()) + + + + organizations = relationship('OrganizationModel', + secondary=user_organization_table, + backref=backref('users', lazy='dynamic')) + + roles = relationship('RoleModel', + secondary=user_role_table, + backref=backref('users', lazy='dynamic')) + + groups = relationship('GroupModel', + secondary=user_group_table, + backref=backref('users', lazy='dynamic')) + + + + def have_permission(self, url): + permissions = [] + for resource in self.resources: + permissions.extend(resource) + + if filter(lambda x: x.URL == url, permissions): + return True + + permissions = [] + for organization in self.organizations: + permissions.extend([resource for resource in organization.resources]) + + return filter(lambda x: x.NAME == url, permissions) + + def __repr__(self): + return '<User %r>\n' % (self.NAME) + + + def to_dict(self): + return { + 'userId': self.id, + 'userName': self.username, + 'loginName': self.login_name if self.login_name else "", + 'status': self.status, + 'email': self.email, + 'phone': self.phone, + 'permission':self.permission + } + + def to_json(self): + json = { + 'userId': self.id, + 'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", + 'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", + 'userName': self.username, + 'loginName': self.login_name, + 'sex': self.sex, + 'age': self.age, + "status": self.status, + 'phone': self.phone, + 'email': self.email, + # 'phoneNumber': self.phone_number + } + + + # json['dept'] = [organization.to_json() for organization in self.organizations] + + + json['groups'] = [group.to_dict() for group in self.groups] + + + # if len(self.roles) > 0: + roles = {role.id: role.to_dict() for role in self.roles} + # ogt_set = set() + # for ogt in self.organizations: + # if ogt.id in ogt_set: + # continue + # print(ogt.id) + # ogt_set.add(ogt.id) + # for role in ogt.roles: + # roles[role.id] = role.to_dict() + # parent_ogt = ogt.parent + # while parent_ogt: + # if parent_ogt.id not in ogt_set: + # ogt_set.add(ogt.id) + # for role in parent_ogt.roles: + # roles[role.id] = role.to_dict() + # parent_ogt = parent_ogt.parent + # else: + # break + + json['roles'] = list(roles.values()) + json['depts'] = [i.to_base_json() for i in self.organizations] + return json + + + def to_login_json(self): + json = { + 'userId': self.id, + 'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", + 'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", + 'userName': self.username, + 'loginName': self.login_name, + 'sex': self.sex, + 'age': self.age, + "status": self.status, + 'phone': self.phone, + 'email': self.email, + # 'phoneNumber': self.phone_number + } + parent_dict = {} + children_dict = {} + for role in self.roles: + for r in role.to_json().get("resources", []): + if r["menuType"] != "1": + parent_dict[r["menuId"]] = r + else: + if r["parentId"] in children_dict: + children_dict[r["parentId"]].append(r) + else: + children_dict[r["parentId"]] = [r] + organization_roles = [role.to_json() for organization in self.organizations for role in + organization.roles] + for role in organization_roles: + for r in role.to_json().get("resources", []): + if r["menuType"] != "1": + parent_dict[r["menuId"]] = r + else: + if r["parentId"] in children_dict: + children_dict[r["parentId"]].append(r) + else: + children_dict[r["parentId"]] = [r] + resources = [] + for resource in parent_dict.values(): + resource["children"] = children_dict.get(resource["menuId"], []) + resources.append(resource) + json['resources'] = resources + return json + + + def get_children(self): + """閫掑綊鑾峰彇鎸囧畾鐢ㄦ埛鐨勫瓙鐢ㄦ埛ID鍒楄〃""" + users = UserModel.query.filter_by(CREATOR=self.ID).all() + user_id_list = [user.ID for user in users] + for user in users: + user_id_list.extend(user.get_children()) + return user_id_list + + + def encrypted_password(self, password): + return cipher_suite.encrypt(password.encode("utf-8")).decode("utf-8") + + def decrypted_password(self): + return cipher_suite.decrypt(self.password).decode("utf-8") + + + +class UserAppModel(Base): + __tablename__ = "user_app" + __table_args__ = (UniqueConstraint('user_id', 'app_type', name='user_app_id_ix'),) + id = Column(Integer, primary_key=True, index=True) + username = Column(String(255)) + password = Column(String(255)) + email = Column(String(255), default="") + user_id = Column(Integer) + app_id = Column(String(36)) + app_type = Column(String(16)) + status = Column(String(10), default="1") + access_token = Column(String(1000)) + refresh_token = Column(String(1000)) + token_at = Column(DateTime, default=datetime.now()) + created_at = Column(DateTime, default=datetime.now()) + updated_at = Column(DateTime, default=datetime.now(), onupdate=datetime.now()) + + def to_json(self): + return { + 'id': self.id, + 'userName': self.username, + 'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", + 'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "", + 'password': self.password, + 'email': self.email, + 'user_id': self.user_id, + 'app_id': self.app_id, + "app_type": self.app_type, + 'status': self.status, + } + @staticmethod + def encrypted_password(password): + return cipher_suite.encrypt(password.encode("utf-8")).decode("utf-8") + + @staticmethod + def decrypted_password(password): + return cipher_suite.decrypt(password).decode("utf-8") \ No newline at end of file -- Gitblit v1.8.0