zhangzengfei
2023-09-05 63645d248c765244488cd34dbc1bb6528ca6b7c7
system-service/serf/rpc.go
@@ -1,79 +1,79 @@
package serf
import (
   "basic.com/syncdb.git"
   "basic.com/valib/logger.git"
   "basic.com/valib/serf.git/client"
   "strconv"
   "time"
)
type RpcParam struct {
   Name             string
   Timeout          time.Duration
   FilterNodes       []string
   FilterTags          []string
   Data             []byte
}
// RpcParamTopic,放在RpcParam的Data中
type RpcParamTopic struct {
   Topic             string    `json:"topic"`
   Data              []byte   `json:"data"`
}
// RpcQuery
func RpcQuery(rpcIp string, req *RpcParam) ([]client.NodeResponse, error) {
   rpcAddr := rpcIp +":"+strconv.Itoa(syncdb.DefaultRPCPort)
   c, err := client.NewRPCClient(rpcAddr)
   if err != nil {
      return nil, err
   }
   defer c.Close()
   ackCh := make(chan string)
   respCh := make(chan client.NodeResponse)
   param := client.QueryParam {
      Name: req.Name,
      FilterNodes: req.FilterNodes,
      Payload: req.Data,
      Timeout: req.Timeout,
      AckCh: ackCh,
      RespCh: respCh,
   }
   if err = c.Query(&param); err != nil {
      return nil, err
   }
   members, err := c.Members()
   logger.Debug("len(members):", len(members), " err:", err)
   //不确定返回的响应数量,以及响应时间
   respArr := make([]client.NodeResponse, 0)
loop:
   for {
      select {
      case <-time.After(req.Timeout):
         break loop
      case r := <-respCh:
         respArr = append(respArr, r)
         logger.Debug("response :", string(r.Payload), " from:", r.From)
      default:
         time.Sleep(100 * time.Millisecond)
      }
   }
   return respArr, nil
}
// RpcUserEvent
func RpcUserEvent(rpcIp string, rpcPort int, name string, payload []byte, coalesce bool) error {
   rpcAddr := rpcIp +":"+strconv.Itoa(rpcPort)
   c, err := client.NewRPCClient(rpcAddr)
   if err != nil {
      return err
   }
   defer c.Close()
   err = c.UserEvent(name, payload, coalesce)
   return err
package serf
import (
   "basic.com/syncdb.git"
   "basic.com/valib/logger.git"
   "basic.com/valib/serf.git/client"
   "strconv"
   "time"
)
type RpcParam struct {
   Name             string
   Timeout          time.Duration
   FilterNodes       []string
   FilterTags          []string
   Data             []byte
}
// RpcParamTopic,放在RpcParam的Data中
type RpcParamTopic struct {
   Topic             string    `json:"topic"`
   Data              []byte   `json:"data"`
}
// RpcQuery
func RpcQuery(rpcIp string, req *RpcParam) ([]client.NodeResponse, error) {
   rpcAddr := rpcIp +":"+strconv.Itoa(syncdb.DefaultRPCPort)
   c, err := client.NewRPCClient(rpcAddr)
   if err != nil {
      return nil, err
   }
   defer c.Close()
   ackCh := make(chan string)
   respCh := make(chan client.NodeResponse)
   param := client.QueryParam {
      Name: req.Name,
      FilterNodes: req.FilterNodes,
      Payload: req.Data,
      Timeout: req.Timeout,
      AckCh: ackCh,
      RespCh: respCh,
   }
   if err = c.Query(&param); err != nil {
      return nil, err
   }
   members, err := c.Members()
   logger.Debug("len(members):", len(members), " err:", err)
   //不确定返回的响应数量,以及响应时间
   respArr := make([]client.NodeResponse, 0)
loop:
   for {
      select {
      case <-time.After(req.Timeout):
         break loop
      case r := <-respCh:
         respArr = append(respArr, r)
         logger.Debug("response :", string(r.Payload), " from:", r.From)
      default:
         time.Sleep(100 * time.Millisecond)
      }
   }
   return respArr, nil
}
// RpcUserEvent
func RpcUserEvent(rpcIp string, rpcPort int, name string, payload []byte, coalesce bool) error {
   rpcAddr := rpcIp +":"+strconv.Itoa(rpcPort)
   c, err := client.NewRPCClient(rpcAddr)
   if err != nil {
      return err
   }
   defer c.Close()
   err = c.UserEvent(name, payload, coalesce)
   return err
}