from datetime import datetime
|
|
from cryptography.fernet import Fernet
|
from sqlalchemy import Column, Integer, String, Table, ForeignKey, DateTime, UniqueConstraint
|
from sqlalchemy.orm import relationship, backref
|
|
from app.config.config import settings
|
from app.models.base_model import Base
|
|
user_organization_table = Table('user_organization',Base.metadata
|
, Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE'))
|
, Column('organization_id', String(36), ForeignKey('organization.id', ondelete='CASCADE')))
|
|
user_role_table = Table('user_role', Base.metadata
|
, Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE'))
|
, Column('role_id', String(36), ForeignKey('role.id', ondelete='CASCADE')))
|
|
user_group_table = Table('user_group', Base.metadata
|
, Column('user_id', Integer, ForeignKey('user.id', ondelete='CASCADE'))
|
, Column('group_id', Integer, ForeignKey('group.id', ondelete='CASCADE')))
|
cipher_suite = Fernet(settings.PASSWORD_KEY.encode("utf-8"))
|
|
|
class UserModel(Base):
|
__tablename__ = "user"
|
id = Column(Integer, primary_key=True, index=True)
|
username = Column(String(255), unique=True, index=True)
|
hashed_password = Column(String(255))
|
password = Column(String(255))
|
compellation = Column(String(255), default="")
|
phone = Column(String(255), default="")
|
email = Column(String(255), default="")
|
description = Column(String(255), default="")
|
ragflow_id = Column(String(32))
|
bisheng_id = Column(Integer)
|
login_name = Column(String(100))
|
status = Column(String(10), default="1")
|
creator = Column(String(36))
|
sex = Column(String(1))
|
permission = Column(String(16), default="general")
|
age = Column(Integer)
|
sync_flag = Column(String(36))
|
created_at = Column(DateTime, default=datetime.now())
|
updated_at = Column(DateTime, default=datetime.now(), onupdate=datetime.now())
|
|
|
|
organizations = relationship('OrganizationModel',
|
secondary=user_organization_table,
|
backref=backref('users', lazy='dynamic'))
|
|
roles = relationship('RoleModel',
|
secondary=user_role_table,
|
backref=backref('users', lazy='dynamic'))
|
|
groups = relationship('GroupModel',
|
secondary=user_group_table,
|
backref=backref('users', lazy='dynamic'))
|
|
|
|
def have_permission(self, url):
|
permissions = []
|
for resource in self.resources:
|
permissions.extend(resource)
|
|
if filter(lambda x: x.URL == url, permissions):
|
return True
|
|
permissions = []
|
for organization in self.organizations:
|
permissions.extend([resource for resource in organization.resources])
|
|
return filter(lambda x: x.NAME == url, permissions)
|
|
def __repr__(self):
|
return '<User %r>\n' % (self.NAME)
|
|
|
def to_dict(self):
|
return {
|
'userId': self.id,
|
'userName': self.username,
|
'loginName': self.login_name if self.login_name else "",
|
'status': self.status,
|
'email': self.email,
|
'phone': self.phone,
|
'permission':self.permission
|
}
|
|
def to_json(self):
|
json = {
|
'userId': self.id,
|
'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
|
'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
|
'userName': self.username,
|
'loginName': self.login_name,
|
'sex': self.sex,
|
'age': self.age,
|
"status": self.status,
|
'phone': self.phone,
|
'email': self.email,
|
# 'phoneNumber': self.phone_number
|
}
|
|
|
# json['dept'] = [organization.to_json() for organization in self.organizations]
|
|
|
json['groups'] = [group.to_dict() for group in self.groups]
|
|
|
# if len(self.roles) > 0:
|
roles = {role.id: role.to_dict() for role in self.roles}
|
# ogt_set = set()
|
# for ogt in self.organizations:
|
# if ogt.id in ogt_set:
|
# continue
|
# print(ogt.id)
|
# ogt_set.add(ogt.id)
|
# for role in ogt.roles:
|
# roles[role.id] = role.to_dict()
|
# parent_ogt = ogt.parent
|
# while parent_ogt:
|
# if parent_ogt.id not in ogt_set:
|
# ogt_set.add(ogt.id)
|
# for role in parent_ogt.roles:
|
# roles[role.id] = role.to_dict()
|
# parent_ogt = parent_ogt.parent
|
# else:
|
# break
|
|
json['roles'] = list(roles.values())
|
json['depts'] = [i.to_base_json() for i in self.organizations]
|
return json
|
|
|
def to_login_json(self):
|
json = {
|
'userId': self.id,
|
'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
|
'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
|
'userName': self.username,
|
'loginName': self.login_name,
|
'sex': self.sex,
|
'age': self.age,
|
"status": self.status,
|
'phone': self.phone,
|
'email': self.email,
|
# 'phoneNumber': self.phone_number
|
}
|
parent_dict = {}
|
children_dict = {}
|
for role in self.roles:
|
for r in role.to_json().get("resources", []):
|
if r["menuType"] != "1":
|
parent_dict[r["menuId"]] = r
|
else:
|
if r["parentId"] in children_dict:
|
children_dict[r["parentId"]].append(r)
|
else:
|
children_dict[r["parentId"]] = [r]
|
organization_roles = [role.to_json() for organization in self.organizations for role in
|
organization.roles]
|
for role in organization_roles:
|
for r in role.to_json().get("resources", []):
|
if r["menuType"] != "1":
|
parent_dict[r["menuId"]] = r
|
else:
|
if r["parentId"] in children_dict:
|
children_dict[r["parentId"]].append(r)
|
else:
|
children_dict[r["parentId"]] = [r]
|
resources = []
|
for resource in parent_dict.values():
|
resource["children"] = children_dict.get(resource["menuId"], [])
|
resources.append(resource)
|
json['resources'] = resources
|
return json
|
|
|
def get_children(self):
|
"""递归获取指定用户的子用户ID列表"""
|
users = UserModel.query.filter_by(CREATOR=self.ID).all()
|
user_id_list = [user.ID for user in users]
|
for user in users:
|
user_id_list.extend(user.get_children())
|
return user_id_list
|
|
|
def encrypted_password(self, password):
|
return cipher_suite.encrypt(str(password).encode("utf-8")).decode("utf-8")
|
|
def decrypted_password(self):
|
return cipher_suite.decrypt(self.password).decode("utf-8")
|
|
|
|
class UserAppModel(Base):
|
__tablename__ = "user_app"
|
__table_args__ = (UniqueConstraint('user_id', 'app_type', name='user_app_id_ix'),)
|
id = Column(Integer, primary_key=True, index=True)
|
username = Column(String(255))
|
password = Column(String(255))
|
email = Column(String(255), default="")
|
user_id = Column(Integer)
|
app_id = Column(String(36))
|
app_type = Column(String(16))
|
status = Column(String(10), default="1")
|
access_token = Column(String(1000))
|
refresh_token = Column(String(1000))
|
token_at = Column(DateTime, default=datetime.now())
|
created_at = Column(DateTime, default=datetime.now())
|
updated_at = Column(DateTime, default=datetime.now(), onupdate=datetime.now())
|
|
def to_json(self):
|
return {
|
'id': self.id,
|
'userName': self.username,
|
'createTime': self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
|
'updateTime': self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else "",
|
'password': self.password,
|
'email': self.email,
|
'user_id': self.user_id,
|
'app_id': self.app_id,
|
"app_type": self.app_type,
|
'status': self.status,
|
}
|
@staticmethod
|
def encrypted_password(password):
|
return cipher_suite.encrypt(password.encode("utf-8")).decode("utf-8")
|
|
@staticmethod
|
def decrypted_password(password):
|
return cipher_suite.decrypt(password).decode("utf-8")
|
|
|
class UserTokenModel(Base):
|
__tablename__ = "user_token"
|
id = Column(String(16), primary_key=True)
|
account = Column(String(255))
|
password = Column(String(255))
|
access_token = Column(String(1000))
|
refresh_token = Column(String(1000))
|
created_at = Column(DateTime, default=datetime.now())
|
updated_at = Column(DateTime, default=datetime.now())
|
|
def to_json(self):
|
return {
|
'id': self.id,
|
'account': self.username,
|
'createTime': self.created_at,
|
'updateTime': self.updated_at,
|
'password': self.password,
|
'access_token': self.access_token,
|
'refresh_token': self.refresh_token,
|
}
|
|
|
|
class UserApiTokenModel(Base):
|
__tablename__ = "user_api_token"
|
id = Column(Integer, primary_key=True)
|
user_id = Column(Integer)
|
token = Column(String(40), index=True)
|
created_at = Column(DateTime, default=datetime.now())
|
updated_at = Column(DateTime, default=datetime.now())
|
expires_at = Column(DateTime)
|
is_active = Column(Integer, default=1)
|
|
def to_json(self):
|
return {
|
'id': self.id,
|
'account': self.username,
|
'createTime': self.created_at,
|
'updateTime': self.updated_at,
|
'password': self.password,
|
'access_token': self.access_token,
|
'refresh_token': self.refresh_token,
|
}
|