zhaoqingang
2025-03-13 30ff0afd5d76a3a5aa48058210ae411253574ada
app/models/user_model.py
@@ -1,8 +1,10 @@
from datetime import datetime
from sqlalchemy import Column, Integer, String, Table, ForeignKey, DateTime
from cryptography.fernet import Fernet
from sqlalchemy import Column, Integer, String, Table, ForeignKey, DateTime, UniqueConstraint
from sqlalchemy.orm import relationship, backref
from app.config.config import settings
from app.models.base_model import Base
user_organization_table = Table('user_organization',Base.metadata
@@ -16,7 +18,7 @@
user_group_table = Table('user_group', Base.metadata
                           , Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE'))
                           , Column('group_id', Integer, ForeignKey('group.id', ondelete='CASCADE')))
cipher_suite = Fernet(settings.PASSWORD_KEY.encode("utf-8"))
class UserModel(Base):
@@ -24,19 +26,23 @@
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(255), unique=True, index=True)
    hashed_password = Column(String(255))
    compellation = Column(String(255), nullable=False, default="")
    phone = Column(String(255), nullable=False, default="")
    email = Column(String(255), nullable=False, default="")
    description = Column(String(255), nullable=False, default="")
    ragflow_id = Column(String(32), unique=True, index=True)
    bisheng_id = Column(Integer, unique=True, index=True)
    password = Column(String(255))
    compellation = Column(String(255), default="")
    phone = Column(String(255),  default="")
    email = Column(String(255),  default="")
    description = Column(String(255), default="")
    ragflow_id = Column(String(32))
    bisheng_id = Column(Integer)
    login_name = Column(String(100))
    status = Column(String(10), nullable=False, default="1")
    status = Column(String(10),  default="1")
    creator = Column(String(36))
    sex = Column(String(1))
    permission = Column(String(16), default="general")
    age = Column(Integer)
    sync_flag = Column(String(36))
    created_at = Column(DateTime, default=datetime.now())
    updated_at = Column(DateTime, default=datetime.now(), onupdate=datetime.now())
    organizations = relationship('OrganizationModel',
@@ -77,6 +83,9 @@
            'userName': self.username,
            'loginName': self.login_name if self.login_name else "",
            'status': self.status,
            'email': self.email,
            'phone': self.phone,
            'permission':self.permission
        }
    def to_json(self):
@@ -94,21 +103,79 @@
            # 'phoneNumber': self.phone_number
        }
        if len(self.organizations) > 0:
            json['dept'] = [organization.to_json() for organization in self.organizations]
        # json['dept'] = [organization.to_json() for organization in self.organizations]
        json['groups'] = [group.to_dict() for group in self.groups]
        roles = []
        # if len(self.roles) > 0:
        roles = [role.to_json() for role in self.roles]
        roles = {role.id: role.to_dict() for role in self.roles}
        # ogt_set = set()
        # for ogt in self.organizations:
        #     if ogt.id in ogt_set:
        #         continue
        #     print(ogt.id)
        #     ogt_set.add(ogt.id)
        #     for role in ogt.roles:
        #         roles[role.id] = role.to_dict()
        #     parent_ogt = ogt.parent
        #     while parent_ogt:
        #         if parent_ogt.id not in ogt_set:
        #             ogt_set.add(ogt.id)
        #             for role in parent_ogt.roles:
        #                 roles[role.id] = role.to_dict()
        #             parent_ogt = parent_ogt.parent
        #         else:
        #             break
        json['roles'] = list(roles.values())
        json['depts'] = [i.to_base_json() for i in self.organizations]
        return json
    def to_login_json(self):
        json = {
            'userId': self.id,
            'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
            'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
            'userName': self.username,
            'loginName': self.login_name,
            'sex': self.sex,
            'age': self.age,
            "status": self.status,
            'phone': self.phone,
            'email': self.email,
            # 'phoneNumber': self.phone_number
        }
        parent_dict = {}
        children_dict = {}
        for role in self.roles:
            for r in  role.to_json().get("resources", []):
                if r["menuType"] != "1":
                    parent_dict[r["menuId"]] = r
                else:
                    if r["parentId"] in children_dict:
                        children_dict[r["parentId"]].append(r)
                    else:
                        children_dict[r["parentId"]] = [r]
        organization_roles = [role.to_json() for organization in self.organizations for role in
                              organization.roles]
        for role in organization_roles:
            if role not in roles:
                roles.append(role)
        json['roles'] = roles
            for r in role.to_json().get("resources", []):
                if r["menuType"] != "1":
                    parent_dict[r["menuId"]] = r
                else:
                    if r["parentId"] in children_dict:
                        children_dict[r["parentId"]].append(r)
                    else:
                        children_dict[r["parentId"]] = [r]
        resources = []
        for resource in parent_dict.values():
            resource["children"] = children_dict.get(resource["menuId"], [])
            resources.append(resource)
        json['resources'] = resources
        return json
@@ -118,4 +185,96 @@
        user_id_list = [user.ID for user in users]
        for user in users:
            user_id_list.extend(user.get_children())
        return user_id_list
        return user_id_list
    def encrypted_password(self, password):
        return cipher_suite.encrypt(str(password).encode("utf-8")).decode("utf-8")
    def decrypted_password(self):
        return cipher_suite.decrypt(self.password).decode("utf-8")
class UserAppModel(Base):
    __tablename__ = "user_app"
    __table_args__ = (UniqueConstraint('user_id', 'app_type', name='user_app_id_ix'),)
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(255))
    password = Column(String(255))
    email = Column(String(255),  default="")
    user_id = Column(Integer)
    app_id = Column(String(36))
    app_type = Column(String(16))
    status = Column(String(10),  default="1")
    access_token = Column(String(1000))
    refresh_token = Column(String(1000))
    token_at = Column(DateTime, default=datetime.now())
    created_at = Column(DateTime, default=datetime.now())
    updated_at = Column(DateTime, default=datetime.now(), onupdate=datetime.now())
    def to_json(self):
        return {
            'id': self.id,
            'userName': self.username,
            'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
            'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
            'password': self.password,
            'email': self.email,
            'user_id': self.user_id,
            'app_id': self.app_id,
            "app_type": self.app_type,
            'status': self.status,
        }
    @staticmethod
    def encrypted_password(password):
        return cipher_suite.encrypt(password.encode("utf-8")).decode("utf-8")
    @staticmethod
    def decrypted_password(password):
        return cipher_suite.decrypt(password).decode("utf-8")
class UserTokenModel(Base):
    __tablename__ = "user_token"
    id = Column(String(16), primary_key=True)
    account = Column(String(255))
    password = Column(String(255))
    access_token = Column(String(1000))
    refresh_token = Column(String(1000))
    created_at = Column(DateTime, default=datetime.now())
    updated_at = Column(DateTime, default=datetime.now())
    def to_json(self):
        return {
            'id': self.id,
            'account': self.username,
            'createTime': self.created_at,
            'updateTime': self.updated_at,
            'password': self.password,
            'access_token': self.access_token,
            'refresh_token': self.refresh_token,
        }
class UserApiTokenModel(Base):
    __tablename__ = "user_api_token"
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer)
    token = Column(String(40), index=True)
    created_at = Column(DateTime, default=datetime.now())
    updated_at = Column(DateTime, default=datetime.now())
    expires_at = Column(DateTime)
    is_active = Column(Integer, default=1)
    def to_json(self):
        return {
            'id': self.id,
            'account': self.username,
            'createTime': self.created_at,
            'updateTime': self.updated_at,
            'password': self.password,
            'access_token': self.access_token,
            'refresh_token': self.refresh_token,
        }