# coding:utf-8
|
|
from fastapi import APIRouter, Depends, Query, HTTPException
|
from app.api import Response, get_current_user, ResponseList
|
from app.models import klgParameter, klgIcon, KlgOwner
|
from app.models.base_model import get_db
|
from app.models.user_model import UserModel
|
from app.service.knowledge import get_knowledge_list, create_knowledge_service, sync_knowledge_service, \
|
delete_knowledge_service, update_knowledge_icon_service, get_knowledge_users_service, set_knowledge_owner_service, \
|
add_knowledge_user_service
|
from typing import Optional
|
|
knowledge_router = APIRouter()
|
|
|
@knowledge_router.get("/list", response_model=Response)
|
async def knowledge_list(
|
current: int,
|
pageSize: int,
|
keyword: str = "",
|
status: str = "",
|
knowledge: str = "1",
|
location: str = "",
|
current_user: UserModel = Depends(get_current_user),
|
db=Depends(get_db)):
|
if current and not pageSize:
|
return ResponseList(code=400, msg="缺少参数")
|
getknowledgelist = await get_knowledge_list(db, current_user.id, keyword, pageSize, current, status, knowledge,
|
location)
|
|
return Response(code=200, msg="", data=getknowledgelist)
|
|
|
@knowledge_router.post("/create", response_model=Response)
|
async def create_knowledge_api(klg: klgParameter, current_user: UserModel = Depends(get_current_user),
|
db=Depends(get_db)):
|
is_create = await create_knowledge_service(db, klg.id, klg.name, klg.description, klg.icon, klg.klgType,
|
current_user.id)
|
if not is_create:
|
return Response(code=500, msg="role knowledge failure", data={})
|
return Response(code=200, msg="role knowledge success", data={})
|
|
|
@knowledge_router.get("/update", response_model=Response)
|
async def change_knowledge_api(knowledgeId: str, current_user: UserModel = Depends(get_current_user),
|
db=Depends(get_db)):
|
# is_create = await update_dialog_status_service(db, dialog.id, dialog.status)
|
# if not is_create:
|
# return Response(code=500, msg="dialog update failure", data={})
|
return Response(code=200, msg="knowledge update success", data={})
|
|
|
@knowledge_router.get("/sync", response_model=Response)
|
async def sync_knowledge_api(knowledgeId: str, current_user: UserModel = Depends(get_current_user), db=Depends(get_db)):
|
is_create = await sync_knowledge_service(db, knowledgeId)
|
if not is_create:
|
return Response(code=500, msg="knowledge update failure", data={})
|
return Response(code=200, msg="knowledge update success", data={})
|
|
|
@knowledge_router.get("/delete", response_model=Response)
|
async def sync_knowledge_api(knowledgeId: str, current_user: UserModel = Depends(get_current_user), db=Depends(get_db)):
|
is_delete = await delete_knowledge_service(db, knowledgeId)
|
if not is_delete:
|
return Response(code=500, msg="knowledge delete failure", data={})
|
return Response(code=200, msg="knowledge delete success", data={})
|
|
|
@knowledge_router.put("/update_icon", response_model=Response)
|
async def change_dialog_icon(klg: klgIcon, current_user: UserModel = Depends(get_current_user), db=Depends(get_db)):
|
is_create = await update_knowledge_icon_service(db, klg.id, klg.icon)
|
if not is_create:
|
return Response(code=500, msg="knowledge update failure", data={})
|
return Response(code=200, msg="knowledge update success", data={})
|
|
|
@knowledge_router.get("/{knowledgeId}/users", response_model=Response)
|
async def get_kb_users(knowledgeId: str,
|
current: int,
|
pageSize: int,
|
member:int = 0,
|
current_user: UserModel = Depends(get_current_user), db=Depends(get_db)):
|
kb_user = await get_knowledge_users_service(db, knowledgeId, current_user.id, current, pageSize, member)
|
|
return Response(code=200, msg="knowledge user success", data={"rows": kb_user})
|
|
|
@knowledge_router.post("/transfer/owner", response_model=Response)
|
async def transfer_kb_owner(klgOwner: KlgOwner,
|
current_user: UserModel = Depends(get_current_user), db=Depends(get_db)):
|
is_update = await set_knowledge_owner_service(db, klgOwner.knowledgeId, klgOwner.UserId, current_user.id)
|
if not is_update:
|
return Response(code=500, msg="knowledge update failure", data={})
|
return Response(code=200, msg="knowledge update success", data={})
|
|
|
@knowledge_router.get("/{knowledgeId}/join", response_model=Response)
|
async def join_kb_user(knowledgeId: str,
|
current_user: UserModel = Depends(get_current_user), db=Depends(get_db)):
|
is_add = await add_knowledge_user_service(db, knowledgeId, current_user.id)
|
if not is_add:
|
return Response(code=500, msg="knowledge join failure", data={})
|
return Response(code=200, msg="knowledge join success", data={})
|