package etcd
|
|
import (
|
"aps_crm/conf"
|
"context"
|
"fmt"
|
"go.etcd.io/etcd/client/pkg/v3/transport"
|
"log"
|
"net"
|
"time"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
)
|
|
// ServiceRegister 创建租约注册服务
|
type ServiceRegister struct {
|
cli *clientv3.Client // etcd v3 client
|
leaseID clientv3.LeaseID // 租约ID
|
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse // 租约keepalieve相应chan
|
key string // key
|
val string // value
|
}
|
|
// NewServiceRegister 新建注册服务
|
func NewServiceRegister(endpoints []string, key, val string, lease int64, dailTimeout int) error {
|
|
// todo 证书
|
tlsInfo := transport.TLSInfo{
|
TrustedCAFile: conf.Conf.Etcd.Tls.CaFile,
|
CertFile: conf.Conf.Etcd.Tls.CertFile,
|
KeyFile: conf.Conf.Etcd.Tls.KeyFile,
|
}
|
|
tlsConfig, err := tlsInfo.ClientConfig()
|
|
if err != nil {
|
fmt.Printf("tlsconfig failed, err:%v\n", err)
|
}
|
|
cli, err := clientv3.New(clientv3.Config{
|
Endpoints: endpoints,
|
DialTimeout: time.Duration(dailTimeout) * time.Second,
|
TLS: tlsConfig,
|
})
|
|
defer func(cli *clientv3.Client) {
|
err := cli.Close()
|
if err != nil {
|
|
}
|
}(cli)
|
log.Printf("44444444444444444 %v", endpoints)
|
|
if err != nil {
|
return err
|
}
|
|
// put
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
_, err = cli.Put(ctx, key, val)
|
cancel()
|
if err != nil {
|
fmt.Printf("put to etcd failed, err:%v\n", err)
|
return err
|
}
|
|
//ser := &ServiceRegister{
|
// cli: cli,
|
// key: key,
|
// val: val,
|
//}
|
//
|
//log.Printf("55555555555555555555555 %v", ser)
|
//申请租约设置时间keepalive
|
//if err := ser.putKeyWithLease(lease); err != nil {
|
// return nil, err
|
//}
|
|
return err
|
}
|
|
// 设置租约
|
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
|
// 创建一个新的租约,并设置ttl时间
|
resp, err := s.cli.Grant(context.Background(), lease)
|
if err != nil {
|
return err
|
}
|
log.Printf("666666666666666 %v", resp)
|
|
// 注册服务并绑定租约
|
_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
|
if err != nil {
|
return err
|
}
|
|
// 设置续租 定期发送需求请求
|
// KeepAlive使给定的租约永远有效。 如果发布到通道的keepalive响应没有立即被使用,
|
// 则租约客户端将至少每秒钟继续向etcd服务器发送保持活动请求,直到获取最新的响应为止。
|
// etcd client会自动发送ttl到etcd server,从而保证该租约一直有效
|
leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
|
if err != nil {
|
return err
|
}
|
log.Printf("777777777777777777 %v", leaseRespChan)
|
|
s.leaseID = resp.ID
|
s.keepAliveChan = leaseRespChan
|
|
return nil
|
}
|
|
// ListenLeaseRespChan 监听 续租情况
|
func (s *ServiceRegister) ListenLeaseRespChan() {
|
for _ = range s.keepAliveChan {
|
}
|
|
//for leaseKeepResp := range s.keepAliveChan {
|
// fmt.Println("续约成功", leaseKeepResp)
|
//}
|
|
//fmt.Println("关闭续租")
|
}
|
|
// Close 注销服务
|
func (s *ServiceRegister) Close() error {
|
// 撤销租约
|
if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
|
return err
|
}
|
|
//fmt.Println("撤销租约")
|
return s.cli.Close()
|
}
|
|
// GetServeAddr 获取本机地址
|
func GetServeAddr(addr string) (string, error) {
|
conn, err := net.Dial("udp", addr)
|
if err != nil {
|
return "", err
|
}
|
|
conn.Close()
|
|
localAddr := conn.LocalAddr().(*net.UDPAddr)
|
//ip := strings.Split(localAddr.String(), ":")[0]
|
|
fmt.Println("++++++++ ip: ", localAddr.IP.String())
|
return localAddr.IP.String(), nil
|
}
|
|
func SetEtcdData(key string, value string) {
|
cli, err := clientv3.New(clientv3.Config{
|
Endpoints: []string{"192.168.20.119:2379"},
|
DialTimeout: 5 * time.Second,
|
})
|
if err != nil {
|
// handle error!
|
fmt.Printf("connect to etcd failed, err:%v\n", err)
|
return
|
}
|
fmt.Println("connect to etcd success")
|
defer cli.Close()
|
// put
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
_, err = cli.Put(ctx, key, value)
|
cancel()
|
if err != nil {
|
fmt.Printf("put to etcd failed, err:%v\n", err)
|
return
|
}
|
}
|
|
//func GetETCDLeaderEndpoint() (string, error) {
|
// // 获取kubeconfig配置文件路径
|
// clientset, err := util.GetClient()
|
// if err != nil {
|
// return "", err
|
// }
|
//
|
// // 获取ETCD主节点的地址
|
// pods, err := clientset.CoreV1().Pods("kube-system").List(context.TODO(), metav1.ListOptions{
|
// LabelSelector: "component=etcd",
|
// })
|
// if err != nil {
|
// return "", err
|
// }
|
//
|
// if len(pods.Items) == 0 {
|
// return "", fmt.Errorf("no ETCD pods found")
|
// }
|
//
|
// leaderEndpoint := pods.Items[0].SalesReturnStatus.PodIP + ":2379"
|
// return leaderEndpoint, nil
|
//}
|