zhangqian
2023-09-01 f4c6c982a275fcdead46a7bdb5704fc39b4f1bb0
pkg/etcd/client.go
@@ -1,195 +1,196 @@
package etcd
import (
   "apsClient/conf"
   "context"
   "fmt"
   "go.etcd.io/etcd/client/pkg/v3/transport"
   "log"
   "net"
   "time"
   clientv3 "go.etcd.io/etcd/client/v3"
)
// ServiceRegister 创建租约注册服务
type ServiceRegister struct {
   cli           *clientv3.Client                        // etcd v3 client
   leaseID       clientv3.LeaseID                        // 租约ID
   keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse // 租约keepalieve相应chan
   key           string                                  // key
   val           string                                  // value
}
// NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, key, val string, lease int64, dailTimeout int) error {
   // todo 证书
   tlsInfo := transport.TLSInfo{
      TrustedCAFile: conf.Conf.Etcd.Tls.CaFile,
      CertFile:      conf.Conf.Etcd.Tls.CertFile,
      KeyFile:       conf.Conf.Etcd.Tls.KeyFile,
   }
   tlsConfig, err := tlsInfo.ClientConfig()
   if err != nil {
      fmt.Printf("tlsconfig failed, err:%v\n", err)
   }
   cli, err := clientv3.New(clientv3.Config{
      Endpoints:   endpoints,
      DialTimeout: time.Duration(dailTimeout) * time.Second,
      TLS:         tlsConfig,
   })
   defer func(cli *clientv3.Client) {
      err := cli.Close()
      if err != nil {
      }
   }(cli)
   log.Printf("44444444444444444  %v", endpoints)
   if err != nil {
      return err
   }
   // put
   ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
   _, err = cli.Put(ctx, key, val)
   cancel()
   if err != nil {
      fmt.Printf("put to etcd failed, err:%v\n", err)
      return err
   }
   //ser := &ServiceRegister{
   //   cli: cli,
   //   key: key,
   //   val: val,
   //}
   //
   //log.Printf("55555555555555555555555  %v", ser)
   //申请租约设置时间keepalive
   //if err := ser.putKeyWithLease(lease); err != nil {
   //   return nil, err
   //}
   return err
}
// 设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
   // 创建一个新的租约,并设置ttl时间
   resp, err := s.cli.Grant(context.Background(), lease)
   if err != nil {
      return err
   }
   log.Printf("666666666666666  %v", resp)
   // 注册服务并绑定租约
   _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
   if err != nil {
      return err
   }
   // 设置续租 定期发送需求请求
   // KeepAlive使给定的租约永远有效。 如果发布到通道的keepalive响应没有立即被使用,
   // 则租约客户端将至少每秒钟继续向etcd服务器发送保持活动请求,直到获取最新的响应为止。
   // etcd client会自动发送ttl到etcd server,从而保证该租约一直有效
   leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
   if err != nil {
      return err
   }
   log.Printf("777777777777777777  %v", leaseRespChan)
   s.leaseID = resp.ID
   s.keepAliveChan = leaseRespChan
   return nil
}
// ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
   for _ = range s.keepAliveChan {
   }
   //for leaseKeepResp := range s.keepAliveChan {
   //   fmt.Println("续约成功", leaseKeepResp)
   //}
   //fmt.Println("关闭续租")
}
// Close 注销服务
func (s *ServiceRegister) Close() error {
   // 撤销租约
   if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
      return err
   }
   //fmt.Println("撤销租约")
   return s.cli.Close()
}
// GetServeAddr 获取本机地址
func GetServeAddr(addr string) (string, error) {
   conn, err := net.Dial("udp", addr)
   if err != nil {
      return "", err
   }
   conn.Close()
   localAddr := conn.LocalAddr().(*net.UDPAddr)
   //ip := strings.Split(localAddr.String(), ":")[0]
   fmt.Println("++++++++ ip: ", localAddr.IP.String())
   return localAddr.IP.String(), nil
}
func SetEtcdData(key string, value string) {
   cli, err := clientv3.New(clientv3.Config{
      Endpoints:   []string{"192.168.20.119:2379"},
      DialTimeout: 5 * time.Second,
   })
   if err != nil {
      // handle error!
      fmt.Printf("connect to etcd failed, err:%v\n", err)
      return
   }
   fmt.Println("connect to etcd success")
   defer cli.Close()
   // put
   ctx, cancel := context.WithTimeout(context.Background(), time.Second)
   _, err = cli.Put(ctx, key, value)
   cancel()
   if err != nil {
      fmt.Printf("put to etcd failed, err:%v\n", err)
      return
   }
}
//func GetETCDLeaderEndpoint() (string, error) {
//   // 获取kubeconfig配置文件路径
//   clientset, err := util.GetClient()
//
//import (
//   "apsClient/conf"
//   "context"
//   "fmt"
//   "go.etcd.io/etcd/client/pkg/v3/transport"
//   "log"
//   "net"
//   "time"
//
//   clientv3 "go.etcd.io/etcd/client/v3"
//)
//
//// ServiceRegister 创建租约注册服务
//type ServiceRegister struct {
//   cli           *clientv3.Client                        // etcd v3 client
//   leaseID       clientv3.LeaseID                        // 租约ID
//   keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse // 租约keepalieve相应chan
//   key           string                                  // key
//   val           string                                  // value
//}
//
//// NewServiceRegister 新建注册服务
//func NewServiceRegister(endpoints []string, key, val string, lease int64, dailTimeout int) error {
//
//   // todo 证书
//   tlsInfo := transport.TLSInfo{
//      TrustedCAFile: conf.Conf.Etcd.Tls.CaFile,
//      CertFile:      conf.Conf.Etcd.Tls.CertFile,
//      KeyFile:       conf.Conf.Etcd.Tls.KeyFile,
//   }
//
//   tlsConfig, err := tlsInfo.ClientConfig()
//
//   if err != nil {
//      fmt.Printf("tlsconfig failed, err:%v\n", err)
//   }
//
//   cli, err := clientv3.New(clientv3.Config{
//      Endpoints:   endpoints,
//      DialTimeout: time.Duration(dailTimeout) * time.Second,
//      TLS:         tlsConfig,
//   })
//
//   defer func(cli *clientv3.Client) {
//      err := cli.Close()
//      if err != nil {
//
//      }
//   }(cli)
//   log.Printf("44444444444444444  %v", endpoints)
//
//   if err != nil {
//      return err
//   }
//
//   // put
//   ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
//   _, err = cli.Put(ctx, key, val)
//   cancel()
//   if err != nil {
//      fmt.Printf("put to etcd failed, err:%v\n", err)
//      return err
//   }
//
//   //ser := &ServiceRegister{
//   //   cli: cli,
//   //   key: key,
//   //   val: val,
//   //}
//   //
//   //log.Printf("55555555555555555555555  %v", ser)
//   //申请租约设置时间keepalive
//   //if err := ser.putKeyWithLease(lease); err != nil {
//   //   return nil, err
//   //}
//
//   return err
//}
//
//// 设置租约
//func (s *ServiceRegister) putKeyWithLease(lease int64) error {
//   // 创建一个新的租约,并设置ttl时间
//   resp, err := s.cli.Grant(context.Background(), lease)
//   if err != nil {
//      return err
//   }
//   log.Printf("666666666666666  %v", resp)
//
//   // 注册服务并绑定租约
//   _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
//   if err != nil {
//      return err
//   }
//
//   // 设置续租 定期发送需求请求
//   // KeepAlive使给定的租约永远有效。 如果发布到通道的keepalive响应没有立即被使用,
//   // 则租约客户端将至少每秒钟继续向etcd服务器发送保持活动请求,直到获取最新的响应为止。
//   // etcd client会自动发送ttl到etcd server,从而保证该租约一直有效
//   leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
//   if err != nil {
//      return err
//   }
//   log.Printf("777777777777777777  %v", leaseRespChan)
//
//   s.leaseID = resp.ID
//   s.keepAliveChan = leaseRespChan
//
//   return nil
//}
//
//// ListenLeaseRespChan 监听 续租情况
//func (s *ServiceRegister) ListenLeaseRespChan() {
//   for _ = range s.keepAliveChan {
//   }
//
//   //for leaseKeepResp := range s.keepAliveChan {
//   //   fmt.Println("续约成功", leaseKeepResp)
//   //}
//
//   //fmt.Println("关闭续租")
//}
//
//// Close 注销服务
//func (s *ServiceRegister) Close() error {
//   // 撤销租约
//   if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
//      return err
//   }
//
//   //fmt.Println("撤销租约")
//   return s.cli.Close()
//}
//
//// GetServeAddr 获取本机地址
//func GetServeAddr(addr string) (string, error) {
//   conn, err := net.Dial("udp", addr)
//   if err != nil {
//      return "", err
//   }
//
//   // 获取ETCD主节点的地址
//   pods, err := clientset.CoreV1().Pods("kube-system").List(context.TODO(), metav1.ListOptions{
//      LabelSelector: "component=etcd",
//   conn.Close()
//
//   localAddr := conn.LocalAddr().(*net.UDPAddr)
//   //ip := strings.Split(localAddr.String(), ":")[0]
//
//   fmt.Println("++++++++ ip: ", localAddr.IP.String())
//   return localAddr.IP.String(), nil
//}
//
//func SetEtcdData(key string, value string) {
//   cli, err := clientv3.New(clientv3.Config{
//      Endpoints:   []string{"192.168.20.119:2379"},
//      DialTimeout: 5 * time.Second,
//   })
//   if err != nil {
//      return "", err
//      // handle error!
//      fmt.Printf("connect to etcd failed, err:%v\n", err)
//      return
//   }
//
//   if len(pods.Items) == 0 {
//      return "", fmt.Errorf("no ETCD pods found")
//   fmt.Println("connect to etcd success")
//   defer cli.Close()
//   // put
//   ctx, cancel := context.WithTimeout(context.Background(), time.Second)
//   _, err = cli.Put(ctx, key, value)
//   cancel()
//   if err != nil {
//      fmt.Printf("put to etcd failed, err:%v\n", err)
//      return
//   }
//
//   leaderEndpoint := pods.Items[0].Status.PodIP + ":2379"
//   return leaderEndpoint, nil
//}
//
////func GetETCDLeaderEndpoint() (string, error) {
////   // 获取kubeconfig配置文件路径
////   clientset, err := util.GetClient()
////   if err != nil {
////      return "", err
////   }
////
////   // 获取ETCD主节点的地址
////   pods, err := clientset.CoreV1().Pods("kube-system").List(context.TODO(), metav1.ListOptions{
////      LabelSelector: "component=etcd",
////   })
////   if err != nil {
////      return "", err
////   }
////
////   if len(pods.Items) == 0 {
////      return "", fmt.Errorf("no ETCD pods found")
////   }
////
////   leaderEndpoint := pods.Items[0].Status.PodIP + ":2379"
////   return leaderEndpoint, nil
////}