| | |
| | | package serf |
| | | |
| | | import ( |
| | | "basic.com/syncdb.git" |
| | | "basic.com/valib/logger.git" |
| | | "basic.com/valib/serf.git/client" |
| | | "strconv" |
| | | "time" |
| | | ) |
| | | |
| | | type RpcParam struct { |
| | | Name string |
| | | Timeout time.Duration |
| | | FilterNodes []string |
| | | FilterTags []string |
| | | Data []byte |
| | | } |
| | | |
| | | // RpcParamTopic,放在RpcParam的Data中 |
| | | type RpcParamTopic struct { |
| | | Topic string `json:"topic"` |
| | | Data []byte `json:"data"` |
| | | } |
| | | |
| | | // RpcQuery |
| | | func RpcQuery(rpcIp string, req *RpcParam) ([]client.NodeResponse, error) { |
| | | rpcAddr := rpcIp +":"+strconv.Itoa(syncdb.DefaultRPCPort) |
| | | c, err := client.NewRPCClient(rpcAddr) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | defer c.Close() |
| | | ackCh := make(chan string) |
| | | respCh := make(chan client.NodeResponse) |
| | | |
| | | param := client.QueryParam { |
| | | Name: req.Name, |
| | | FilterNodes: req.FilterNodes, |
| | | Payload: req.Data, |
| | | Timeout: req.Timeout, |
| | | AckCh: ackCh, |
| | | RespCh: respCh, |
| | | } |
| | | |
| | | if err = c.Query(¶m); err != nil { |
| | | return nil, err |
| | | } |
| | | members, err := c.Members() |
| | | logger.Debug("len(members):", len(members), " err:", err) |
| | | |
| | | //不确定返回的响应数量,以及响应时间 |
| | | respArr := make([]client.NodeResponse, 0) |
| | | loop: |
| | | for { |
| | | select { |
| | | case <-time.After(req.Timeout): |
| | | break loop |
| | | case r := <-respCh: |
| | | respArr = append(respArr, r) |
| | | logger.Debug("response :", string(r.Payload), " from:", r.From) |
| | | default: |
| | | time.Sleep(100 * time.Millisecond) |
| | | } |
| | | } |
| | | return respArr, nil |
| | | } |
| | | |
| | | // RpcUserEvent |
| | | func RpcUserEvent(rpcIp string, rpcPort int, name string, payload []byte, coalesce bool) error { |
| | | rpcAddr := rpcIp +":"+strconv.Itoa(rpcPort) |
| | | c, err := client.NewRPCClient(rpcAddr) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer c.Close() |
| | | |
| | | err = c.UserEvent(name, payload, coalesce) |
| | | return err |
| | | package serf
|
| | |
|
| | | import (
|
| | | "basic.com/syncdb.git"
|
| | | "basic.com/valib/logger.git"
|
| | | "basic.com/valib/serf.git/client"
|
| | | "strconv"
|
| | | "time"
|
| | | )
|
| | |
|
| | | type RpcParam struct {
|
| | | Name string
|
| | | Timeout time.Duration
|
| | | FilterNodes []string
|
| | | FilterTags []string
|
| | | Data []byte
|
| | | }
|
| | |
|
| | | // RpcParamTopic,放在RpcParam的Data中
|
| | | type RpcParamTopic struct {
|
| | | Topic string `json:"topic"`
|
| | | Data []byte `json:"data"`
|
| | | }
|
| | |
|
| | | // RpcQuery
|
| | | func RpcQuery(rpcIp string, req *RpcParam) ([]client.NodeResponse, error) {
|
| | | rpcAddr := rpcIp +":"+strconv.Itoa(syncdb.DefaultRPCPort)
|
| | | c, err := client.NewRPCClient(rpcAddr)
|
| | | if err != nil {
|
| | | return nil, err
|
| | | }
|
| | | defer c.Close()
|
| | | ackCh := make(chan string)
|
| | | respCh := make(chan client.NodeResponse)
|
| | |
|
| | | param := client.QueryParam {
|
| | | Name: req.Name,
|
| | | FilterNodes: req.FilterNodes,
|
| | | Payload: req.Data,
|
| | | Timeout: req.Timeout,
|
| | | AckCh: ackCh,
|
| | | RespCh: respCh,
|
| | | }
|
| | |
|
| | | if err = c.Query(¶m); err != nil {
|
| | | return nil, err
|
| | | }
|
| | | members, err := c.Members()
|
| | | logger.Debug("len(members):", len(members), " err:", err)
|
| | |
|
| | | //不确定返回的响应数量,以及响应时间
|
| | | respArr := make([]client.NodeResponse, 0)
|
| | | loop:
|
| | | for {
|
| | | select {
|
| | | case <-time.After(req.Timeout):
|
| | | break loop
|
| | | case r := <-respCh:
|
| | | respArr = append(respArr, r)
|
| | | logger.Debug("response :", string(r.Payload), " from:", r.From)
|
| | | default:
|
| | | time.Sleep(100 * time.Millisecond)
|
| | | }
|
| | | }
|
| | | return respArr, nil
|
| | | }
|
| | |
|
| | | // RpcUserEvent
|
| | | func RpcUserEvent(rpcIp string, rpcPort int, name string, payload []byte, coalesce bool) error {
|
| | | rpcAddr := rpcIp +":"+strconv.Itoa(rpcPort)
|
| | | c, err := client.NewRPCClient(rpcAddr)
|
| | | if err != nil {
|
| | | return err
|
| | | }
|
| | | defer c.Close()
|
| | |
|
| | | err = c.UserEvent(name, payload, coalesce)
|
| | | return err
|
| | | } |