| | |
| | | package repository |
| | | |
| | | import ( |
| | | "encoding/json" |
| | | "errors" |
| | | "fmt" |
| | | "gat1400Exchange/client" |
| | | "gat1400Exchange/models" |
| | | "gat1400Exchange/pkg/snowflake" |
| | | "gat1400Exchange/service" |
| | | "gat1400Exchange/vo" |
| | | "time" |
| | | ) |
| | | |
| | | type SubscribeRepository struct { |
| | |
| | | return SubscribeRepository{} |
| | | } |
| | | |
| | | func (s *SubscribeRepository) CreateSubscribe(fromId string, subscribe *vo.Subscribe) error { |
| | | func (s *SubscribeRepository) CreateSubscribe(sid string, req *vo.Subscribe) error { |
| | | triggerTime := time.Now().Format("20060102150405") |
| | | req.SubscribeID = triggerTime + snowflake.GenerateIdStr() |
| | | |
| | | var subscribeMsg vo.RequestSubscribe |
| | | subscribeMsg.SubscribeListObject.SubscribeObject = []vo.Subscribe{*req} |
| | | |
| | | // 查找下级 |
| | | var platform models.SubPlatform |
| | | err := platform.FindById(sid) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | uri := fmt.Sprintf("http://%s:%d/VIID/Subscribes", platform.RemoteIP, platform.RemotePort) |
| | | body, _ := json.Marshal(subscribeMsg) |
| | | if client.Subscribe(uri, body) != vo.StatusSuccess { |
| | | return errors.New("发送订阅消息失败") |
| | | } |
| | | |
| | | var sub = models.Subscribe{ |
| | | Id: req.SubscribeID, |
| | | Status: req.SubscribeStatus, |
| | | FromId: sid, |
| | | Ext: *req, |
| | | } |
| | | |
| | | err = sub.Save() |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | return err |
| | | } |
| | | |
| | | func (s *SubscribeRepository) CancelSubscribe(sid string, req *vo.Subscribe) error { |
| | | var subscribeMsg vo.RequestCancelSubscribe |
| | | subscribeMsg.SubscribeObject = *req |
| | | |
| | | if req.SubscribeID == "" { |
| | | return errors.New("订阅消息主题为空") |
| | | } |
| | | |
| | | // 查找下级 |
| | | var platform models.SubPlatform |
| | | err := platform.FindById(sid) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | uri := fmt.Sprintf("http://%s:%d/VIID/Subscribes/%s", platform.RemoteIP, platform.RemotePort, req.SubscribeID) |
| | | body, _ := json.Marshal(subscribeMsg) |
| | | if client.UpdateSubscribe(uri, body) != vo.StatusSuccess { |
| | | return errors.New("发送订阅消息失败") |
| | | } |
| | | |
| | | var sub = models.Subscribe{ |
| | | Id: req.SubscribeID, |
| | | Status: req.SubscribeStatus, |
| | | FromId: sid, |
| | | Ext: *req, |
| | | } |
| | | |
| | | err = sub.Save() |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | return err |
| | | } |
| | | |
| | | func (s *SubscribeRepository) UpdateSubscribe(sid string, req *vo.Subscribe) error { |
| | | var subscribeMsg vo.RequestSubscribe |
| | | subscribeMsg.SubscribeListObject.SubscribeObject = []vo.Subscribe{*req} |
| | | |
| | | // 查找下级 |
| | | var platform models.SubPlatform |
| | | err := platform.FindById(sid) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | uri := fmt.Sprintf("http://%s:%d/VIID/Subscribes", platform.RemoteIP, platform.RemotePort) |
| | | body, _ := json.Marshal(subscribeMsg) |
| | | if client.UpdateSubscribe(uri, body) != vo.StatusSuccess { |
| | | return errors.New("发送订阅消息失败") |
| | | } |
| | | |
| | | var sub = models.Subscribe{ |
| | | Id: req.SubscribeID, |
| | | Status: req.SubscribeStatus, |
| | | FromId: sid, |
| | | Ext: *req, |
| | | } |
| | | |
| | | err = sub.Save() |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | return err |
| | | } |
| | | |
| | | func (s *SubscribeRepository) RemoteList(sid string) ([]byte, error) { |
| | | // 查找下级 |
| | | var platform models.SubPlatform |
| | | err := platform.FindById(sid) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | |
| | | uri := fmt.Sprintf("http://%s:%d/VIID/Subscribes", platform.RemoteIP, platform.RemotePort) |
| | | |
| | | return client.GetSubscribes(uri) |
| | | } |
| | | |
| | | func (s *SubscribeRepository) SaveReceiveSubscribe(fromId string, subscribe *vo.Subscribe) error { |
| | | var sub = models.Subscribe{ |
| | | Id: subscribe.SubscribeID, |
| | | Status: subscribe.SubscribeStatus, |
| | |
| | | return err |
| | | } |
| | | |
| | | service.UpdateTaskProcs(sub.Id, vo.Msg_Type_Create_Subscribe, &sub) |
| | | service.UpdateNotificationTask(sub.Id, vo.Msg_Type_Create_Subscribe, &sub) |
| | | |
| | | return err |
| | | } |
| | | |
| | | func (s *SubscribeRepository) UpdateSubscribe(subscribe *vo.Subscribe) error { |
| | | func (s *SubscribeRepository) UpdateReceiveSubscribe(subscribe *vo.Subscribe) error { |
| | | var sub = models.Subscribe{} |
| | | err := sub.FindById(subscribe.SubscribeID) |
| | | if err != nil { |
| | |
| | | sub.Status = subscribe.SubscribeStatus |
| | | sub.Ext = *subscribe |
| | | |
| | | service.UpdateTaskProcs(subscribe.SubscribeID, vo.Msg_Type_Update_Subscribe, nil) |
| | | service.UpdateNotificationTask(subscribe.SubscribeID, vo.Msg_Type_Update_Subscribe, &sub) |
| | | |
| | | return sub.Save() |
| | | } |
| | | |
| | | func (s *SubscribeRepository) DeleteSubscribe(id string) error { |
| | | func (s *SubscribeRepository) DeleteReceiveSubscribe(id string) error { |
| | | var sub = models.Subscribe{} |
| | | err := sub.DeleteById(id) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | service.UpdateTaskProcs(id, vo.Msg_Type_Delete_Subscribe, nil) |
| | | service.UpdateNotificationTask(id, vo.Msg_Type_Delete_Subscribe, nil) |
| | | |
| | | return err |
| | | } |
| | | |
| | | func (s *SubscribeRepository) List() ([]models.Subscribe, error) { |
| | | func (s *SubscribeRepository) ListByFromId(id string) ([]models.Subscribe, error) { |
| | | var sub models.Subscribe |
| | | |
| | | return sub.FindAll() |
| | | return sub.FindByFromId(id) |
| | | } |