package etcd import ( "apsClient/conf" "context" "fmt" "go.etcd.io/etcd/client/pkg/v3/transport" "log" "net" "time" clientv3 "go.etcd.io/etcd/client/v3" ) // ServiceRegister 创建租约注册服务 type ServiceRegister struct { cli *clientv3.Client // etcd v3 client leaseID clientv3.LeaseID // 租约ID keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse // 租约keepalieve相应chan key string // key val string // value } // NewServiceRegister 新建注册服务 func NewServiceRegister(endpoints []string, key, val string, lease int64, dailTimeout int) error { // todo 证书 tlsInfo := transport.TLSInfo{ TrustedCAFile: conf.Conf.Etcd.Tls.CaFile, CertFile: conf.Conf.Etcd.Tls.CertFile, KeyFile: conf.Conf.Etcd.Tls.KeyFile, } tlsConfig, err := tlsInfo.ClientConfig() if err != nil { fmt.Printf("tlsconfig failed, err:%v\n", err) } cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: time.Duration(dailTimeout) * time.Second, TLS: tlsConfig, }) defer func(cli *clientv3.Client) { err := cli.Close() if err != nil { } }(cli) log.Printf("44444444444444444 %v", endpoints) if err != nil { return err } // put ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) _, err = cli.Put(ctx, key, val) cancel() if err != nil { fmt.Printf("put to etcd failed, err:%v\n", err) return err } //ser := &ServiceRegister{ // cli: cli, // key: key, // val: val, //} // //log.Printf("55555555555555555555555 %v", ser) //申请租约设置时间keepalive //if err := ser.putKeyWithLease(lease); err != nil { // return nil, err //} return err } // 设置租约 func (s *ServiceRegister) putKeyWithLease(lease int64) error { // 创建一个新的租约,并设置ttl时间 resp, err := s.cli.Grant(context.Background(), lease) if err != nil { return err } log.Printf("666666666666666 %v", resp) // 注册服务并绑定租约 _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID)) if err != nil { return err } // 设置续租 定期发送需求请求 // KeepAlive使给定的租约永远有效。 如果发布到通道的keepalive响应没有立即被使用, // 则租约客户端将至少每秒钟继续向etcd服务器发送保持活动请求,直到获取最新的响应为止。 // etcd client会自动发送ttl到etcd server,从而保证该租约一直有效 leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID) if err != nil { return err } log.Printf("777777777777777777 %v", leaseRespChan) s.leaseID = resp.ID s.keepAliveChan = leaseRespChan return nil } // ListenLeaseRespChan 监听 续租情况 func (s *ServiceRegister) ListenLeaseRespChan() { for _ = range s.keepAliveChan { } //for leaseKeepResp := range s.keepAliveChan { // fmt.Println("续约成功", leaseKeepResp) //} //fmt.Println("关闭续租") } // Close 注销服务 func (s *ServiceRegister) Close() error { // 撤销租约 if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil { return err } //fmt.Println("撤销租约") return s.cli.Close() } // GetServeAddr 获取本机地址 func GetServeAddr(addr string) (string, error) { conn, err := net.Dial("udp", addr) if err != nil { return "", err } conn.Close() localAddr := conn.LocalAddr().(*net.UDPAddr) //ip := strings.Split(localAddr.String(), ":")[0] fmt.Println("++++++++ ip: ", localAddr.IP.String()) return localAddr.IP.String(), nil } func SetEtcdData(key string, value string) { cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"192.168.20.119:2379"}, DialTimeout: 5 * time.Second, }) if err != nil { // handle error! fmt.Printf("connect to etcd failed, err:%v\n", err) return } fmt.Println("connect to etcd success") defer cli.Close() // put ctx, cancel := context.WithTimeout(context.Background(), time.Second) _, err = cli.Put(ctx, key, value) cancel() if err != nil { fmt.Printf("put to etcd failed, err:%v\n", err) return } } //func GetETCDLeaderEndpoint() (string, error) { // // 获取kubeconfig配置文件路径 // clientset, err := util.GetClient() // if err != nil { // return "", err // } // // // 获取ETCD主节点的地址 // pods, err := clientset.CoreV1().Pods("kube-system").List(context.TODO(), metav1.ListOptions{ // LabelSelector: "component=etcd", // }) // if err != nil { // return "", err // } // // if len(pods.Items) == 0 { // return "", fmt.Errorf("no ETCD pods found") // } // // leaderEndpoint := pods.Items[0].Status.PodIP + ":2379" // return leaderEndpoint, nil //}