| | |
| | | from fastapi import APIRouter, Depends |
| | | from app.api import Response, pwd_context, get_current_user |
| | | from app.models.base_model import get_db |
| | | from app.models.user import PageParameter |
| | | from app.models.user import PageParameter, UserStatus, UserInfo, LoginData |
| | | from app.models.user_model import UserModel |
| | | from app.service.user import get_user_list |
| | | from app.service.user import get_user_list, edit_user_status, delete_user_data, create_user, edit_user_data, \ |
| | | edit_user_pwd |
| | | |
| | | user_router = APIRouter() |
| | | |
| | |
| | | return Response(code=200, msg="", data=await get_user_list(db, paras.page_size, paras.page_index, paras.keyword)) |
| | | |
| | | |
| | | @user_router.post("/add_user", response_model=Response) |
| | | async def add_user(user: UserInfo, current_user: UserModel = Depends(get_current_user), db=Depends(get_db)): |
| | | if not user.userName: |
| | | return Response(code=400, msg="The userName cannot be empty!") |
| | | db_user = db.query(UserModel).filter(UserModel.username == user.userName).first() |
| | | if db_user: |
| | | return Response(code=200, msg="user already created") |
| | | pwd = user.pwd |
| | | if not pwd: |
| | | pwd = "000000" |
| | | is_create = await create_user(db, user.userName, user.email, user.phone, user.loginName, pwd, user.roles, user.groups) |
| | | if not is_create: |
| | | return Response(code=500, msg="user create failure", data={}) |
| | | return Response(code=200, msg="user create successfully", data={}) |
| | | |
| | | |
| | | @user_router.put("/edit_user", response_model=Response) |
| | | async def edit_user(user: UserInfo, current_user: UserModel = Depends(get_current_user), db=Depends(get_db)): |
| | | # if not user.userName: |
| | | # return Response(code=400, msg="The userName cannot be empty!") |
| | | user_info = db.query(UserModel).filter(UserModel.id == user.userId).first() |
| | | if not user_info: |
| | | return Response(code=200, msg="user does not exist") |
| | | # db_user = db.query(UserModel).filter(UserModel.username == user.userName).first() |
| | | # if db_user and db_user.id != user.userId: |
| | | # return Response(code=200, msg="user already created") |
| | | is_edit = await edit_user_data(db, user.userId, user.email, user.phone, user.loginName, user.roles, user.groups) |
| | | if not is_edit: |
| | | return Response(code=500, msg="user edit failure", data={}) |
| | | return Response(code=200, msg="user edit successfully", data={}) |
| | | |
| | | |
| | | @user_router.put("/change_status", response_model=Response) |
| | | async def change_user_status(user: UserStatus, current_user: UserModel = Depends(get_current_user), db=Depends(get_db)): |
| | | if user.status not in ["0", "1"]: |
| | | return Response(code=400, msg="The status cannot be {}!".format(user.status)) |
| | | db_user = db.query(UserModel).filter(UserModel.id == user.userId).first() |
| | | if not db_user: |
| | | return Response(code=200, msg="user does not exist") |
| | | is_edit = await edit_user_status(db, user.status,user.userId) |
| | | if not is_edit: |
| | | return Response(code=500, msg="user status edit failure", data={}) |
| | | return Response(code=200, msg="user status edit successfully", data={}) |
| | | |
| | | |
| | | @user_router.delete("/delete_user/{user_id}", response_model=Response) |
| | | async def delete_user(user_id, current_user: UserModel = Depends(get_current_user), db=Depends(get_db)): |
| | | db_user = db.query(UserModel).filter(UserModel.id == user_id).first() |
| | | if not db_user: |
| | | return Response(code=200, msg="user does not exist") |
| | | is_edit = await delete_user_data(db, user_id) |
| | | if not is_edit: |
| | | return Response(code=500, msg="user delete failure", data={}) |
| | | return Response(code=200, msg="user delete successfully", data={}) |
| | | |
| | | |
| | | @user_router.put("/reset_pwd", response_model=Response) |
| | | async def reset_user_pwd(user: UserStatus, current_user: UserModel = Depends(get_current_user), db=Depends(get_db)): |
| | | db_user = db.query(UserModel).filter(UserModel.id == user.userId).first() |
| | | if not db_user: |
| | | return Response(code=200, msg="user does not exist") |
| | | is_edit = await edit_user_pwd(db, user.userId) |
| | | if not is_edit: |
| | | return Response(code=500, msg="user pwd reset failure", data={}) |
| | | return Response(code=200, msg="user pwd reset successfully", data={}) |