package repository import ( "encoding/json" "errors" "fmt" "gat1400Exchange/client" "gat1400Exchange/models" "gat1400Exchange/pkg/snowflake" "gat1400Exchange/service" "gat1400Exchange/vo" "time" ) type SubscribeRepository struct { } func NewSubscribeRepository() SubscribeRepository { return SubscribeRepository{} } func (s *SubscribeRepository) CreateSubscribe(sid string, req *vo.Subscribe) error { triggerTime := time.Now().Format("20060102150405") req.SubscribeID = triggerTime + snowflake.GenerateIdStr() var subscribeMsg vo.RequestSubscribe subscribeMsg.SubscribeListObject.SubscribeObject = []vo.Subscribe{*req} // 查找下级 var platform models.SubPlatform err := platform.FindById(sid) if err != nil { return err } uri := fmt.Sprintf("http://%s:%d/VIID/Subscribes", platform.RemoteIP, platform.RemotePort) body, _ := json.Marshal(subscribeMsg) if client.Subscribe(uri, body) != vo.StatusSuccess { return errors.New("发送订阅消息失败") } var sub = models.Subscribe{ Id: req.SubscribeID, Status: req.SubscribeStatus, FromId: sid, Ext: *req, } err = sub.Save() if err != nil { return err } return err } func (s *SubscribeRepository) CancelSubscribe(sid string, req *vo.Subscribe) error { var subscribeMsg vo.RequestCancelSubscribe subscribeMsg.SubscribeObject = *req if req.SubscribeID == "" { return errors.New("订阅消息主题为空") } // 查找下级 var platform models.SubPlatform err := platform.FindById(sid) if err != nil { return err } uri := fmt.Sprintf("http://%s:%d/VIID/Subscribes/%s", platform.RemoteIP, platform.RemotePort, req.SubscribeID) body, _ := json.Marshal(subscribeMsg) if client.UpdateSubscribe(uri, body) != vo.StatusSuccess { return errors.New("发送订阅消息失败") } var sub = models.Subscribe{ Id: req.SubscribeID, Status: req.SubscribeStatus, FromId: sid, Ext: *req, } err = sub.Save() if err != nil { return err } return err } func (s *SubscribeRepository) UpdateSubscribe(sid string, req *vo.Subscribe) error { var subscribeMsg vo.RequestSubscribe subscribeMsg.SubscribeListObject.SubscribeObject = []vo.Subscribe{*req} // 查找下级 var platform models.SubPlatform err := platform.FindById(sid) if err != nil { return err } uri := fmt.Sprintf("http://%s:%d/VIID/Subscribes", platform.RemoteIP, platform.RemotePort) body, _ := json.Marshal(subscribeMsg) if client.UpdateSubscribe(uri, body) != vo.StatusSuccess { return errors.New("发送订阅消息失败") } var sub = models.Subscribe{ Id: req.SubscribeID, Status: req.SubscribeStatus, FromId: sid, Ext: *req, } err = sub.Save() if err != nil { return err } return err } func (s *SubscribeRepository) RemoteList(sid string) ([]byte, error) { // 查找下级 var platform models.SubPlatform err := platform.FindById(sid) if err != nil { return nil, err } uri := fmt.Sprintf("http://%s:%d/VIID/Subscribes", platform.RemoteIP, platform.RemotePort) return client.GetSubscribes(uri) } func (s *SubscribeRepository) SaveReceiveSubscribe(fromId string, subscribe *vo.Subscribe) error { var sub = models.Subscribe{ Id: subscribe.SubscribeID, Status: subscribe.SubscribeStatus, FromId: fromId, Ext: *subscribe, } err := sub.Save() if err != nil { return err } service.UpdateNotificationTask(sub.Id, vo.Msg_Type_Create_Subscribe, &sub) return err } func (s *SubscribeRepository) UpdateReceiveSubscribe(subscribe *vo.Subscribe) error { var sub = models.Subscribe{} err := sub.FindById(subscribe.SubscribeID) if err != nil { return err } sub.Status = subscribe.SubscribeStatus sub.Ext = *subscribe service.UpdateNotificationTask(subscribe.SubscribeID, vo.Msg_Type_Update_Subscribe, &sub) return sub.Save() } func (s *SubscribeRepository) DeleteReceiveSubscribe(id string) error { var sub = models.Subscribe{} err := sub.DeleteById(id) if err != nil { return err } service.UpdateNotificationTask(id, vo.Msg_Type_Delete_Subscribe, nil) return err } func (s *SubscribeRepository) ListByFromId(id string) ([]models.Subscribe, error) { var sub models.Subscribe return sub.FindByFromId(id) }