| | |
| | | package etcd |
| | | |
| | | import ( |
| | | "apsClient/conf" |
| | | "context" |
| | | "fmt" |
| | | "go.etcd.io/etcd/client/pkg/v3/transport" |
| | | "log" |
| | | "net" |
| | | "time" |
| | | |
| | | clientv3 "go.etcd.io/etcd/client/v3" |
| | | ) |
| | | |
| | | // ServiceRegister 创建租约注册服务 |
| | | type ServiceRegister struct { |
| | | cli *clientv3.Client // etcd v3 client |
| | | leaseID clientv3.LeaseID // 租约ID |
| | | keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse // 租约keepalieve相应chan |
| | | key string // key |
| | | val string // value |
| | | } |
| | | |
| | | // NewServiceRegister 新建注册服务 |
| | | func NewServiceRegister(endpoints []string, key, val string, lease int64, dailTimeout int) error { |
| | | |
| | | // todo 证书 |
| | | tlsInfo := transport.TLSInfo{ |
| | | TrustedCAFile: conf.Conf.Etcd.Tls.CaFile, |
| | | CertFile: conf.Conf.Etcd.Tls.CertFile, |
| | | KeyFile: conf.Conf.Etcd.Tls.KeyFile, |
| | | } |
| | | |
| | | tlsConfig, err := tlsInfo.ClientConfig() |
| | | |
| | | if err != nil { |
| | | fmt.Printf("tlsconfig failed, err:%v\n", err) |
| | | } |
| | | |
| | | cli, err := clientv3.New(clientv3.Config{ |
| | | Endpoints: endpoints, |
| | | DialTimeout: time.Duration(dailTimeout) * time.Second, |
| | | TLS: tlsConfig, |
| | | }) |
| | | |
| | | defer func(cli *clientv3.Client) { |
| | | err := cli.Close() |
| | | if err != nil { |
| | | |
| | | } |
| | | }(cli) |
| | | log.Printf("44444444444444444 %v", endpoints) |
| | | |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | // put |
| | | ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) |
| | | _, err = cli.Put(ctx, key, val) |
| | | cancel() |
| | | if err != nil { |
| | | fmt.Printf("put to etcd failed, err:%v\n", err) |
| | | return err |
| | | } |
| | | |
| | | //ser := &ServiceRegister{ |
| | | // cli: cli, |
| | | // key: key, |
| | | // val: val, |
| | | //} |
| | | // |
| | | //log.Printf("55555555555555555555555 %v", ser) |
| | | //申请租约设置时间keepalive |
| | | //if err := ser.putKeyWithLease(lease); err != nil { |
| | | // return nil, err |
| | | //} |
| | | |
| | | return err |
| | | } |
| | | |
| | | // 设置租约 |
| | | func (s *ServiceRegister) putKeyWithLease(lease int64) error { |
| | | // 创建一个新的租约,并设置ttl时间 |
| | | resp, err := s.cli.Grant(context.Background(), lease) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | log.Printf("666666666666666 %v", resp) |
| | | |
| | | // 注册服务并绑定租约 |
| | | _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID)) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | // 设置续租 定期发送需求请求 |
| | | // KeepAlive使给定的租约永远有效。 如果发布到通道的keepalive响应没有立即被使用, |
| | | // 则租约客户端将至少每秒钟继续向etcd服务器发送保持活动请求,直到获取最新的响应为止。 |
| | | // etcd client会自动发送ttl到etcd server,从而保证该租约一直有效 |
| | | leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | log.Printf("777777777777777777 %v", leaseRespChan) |
| | | |
| | | s.leaseID = resp.ID |
| | | s.keepAliveChan = leaseRespChan |
| | | |
| | | return nil |
| | | } |
| | | |
| | | // ListenLeaseRespChan 监听 续租情况 |
| | | func (s *ServiceRegister) ListenLeaseRespChan() { |
| | | for _ = range s.keepAliveChan { |
| | | } |
| | | |
| | | //for leaseKeepResp := range s.keepAliveChan { |
| | | // fmt.Println("续约成功", leaseKeepResp) |
| | | //} |
| | | |
| | | //fmt.Println("关闭续租") |
| | | } |
| | | |
| | | // Close 注销服务 |
| | | func (s *ServiceRegister) Close() error { |
| | | // 撤销租约 |
| | | if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil { |
| | | return err |
| | | } |
| | | |
| | | //fmt.Println("撤销租约") |
| | | return s.cli.Close() |
| | | } |
| | | |
| | | // GetServeAddr 获取本机地址 |
| | | func GetServeAddr(addr string) (string, error) { |
| | | conn, err := net.Dial("udp", addr) |
| | | if err != nil { |
| | | return "", err |
| | | } |
| | | |
| | | conn.Close() |
| | | |
| | | localAddr := conn.LocalAddr().(*net.UDPAddr) |
| | | //ip := strings.Split(localAddr.String(), ":")[0] |
| | | |
| | | fmt.Println("++++++++ ip: ", localAddr.IP.String()) |
| | | return localAddr.IP.String(), nil |
| | | } |
| | | |
| | | func SetEtcdData(key string, value string) { |
| | | cli, err := clientv3.New(clientv3.Config{ |
| | | Endpoints: []string{"192.168.20.119:2379"}, |
| | | DialTimeout: 5 * time.Second, |
| | | }) |
| | | if err != nil { |
| | | // handle error! |
| | | fmt.Printf("connect to etcd failed, err:%v\n", err) |
| | | return |
| | | } |
| | | fmt.Println("connect to etcd success") |
| | | defer cli.Close() |
| | | // put |
| | | ctx, cancel := context.WithTimeout(context.Background(), time.Second) |
| | | _, err = cli.Put(ctx, key, value) |
| | | cancel() |
| | | if err != nil { |
| | | fmt.Printf("put to etcd failed, err:%v\n", err) |
| | | return |
| | | } |
| | | } |
| | | |
| | | //func GetETCDLeaderEndpoint() (string, error) { |
| | | // // 获取kubeconfig配置文件路径 |
| | | // clientset, err := util.GetClient() |
| | | // |
| | | //import ( |
| | | // "apsClient/conf" |
| | | // "context" |
| | | // "fmt" |
| | | // "go.etcd.io/etcd/client/pkg/v3/transport" |
| | | // "log" |
| | | // "net" |
| | | // "time" |
| | | // |
| | | // clientv3 "go.etcd.io/etcd/client/v3" |
| | | //) |
| | | // |
| | | //// ServiceRegister 创建租约注册服务 |
| | | //type ServiceRegister struct { |
| | | // cli *clientv3.Client // etcd v3 client |
| | | // leaseID clientv3.LeaseID // 租约ID |
| | | // keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse // 租约keepalieve相应chan |
| | | // key string // key |
| | | // val string // value |
| | | //} |
| | | // |
| | | //// NewServiceRegister 新建注册服务 |
| | | //func NewServiceRegister(endpoints []string, key, val string, lease int64, dailTimeout int) error { |
| | | // |
| | | // // todo 证书 |
| | | // tlsInfo := transport.TLSInfo{ |
| | | // TrustedCAFile: conf.Conf.Etcd.Tls.CaFile, |
| | | // CertFile: conf.Conf.Etcd.Tls.CertFile, |
| | | // KeyFile: conf.Conf.Etcd.Tls.KeyFile, |
| | | // } |
| | | // |
| | | // tlsConfig, err := tlsInfo.ClientConfig() |
| | | // |
| | | // if err != nil { |
| | | // fmt.Printf("tlsconfig failed, err:%v\n", err) |
| | | // } |
| | | // |
| | | // cli, err := clientv3.New(clientv3.Config{ |
| | | // Endpoints: endpoints, |
| | | // DialTimeout: time.Duration(dailTimeout) * time.Second, |
| | | // TLS: tlsConfig, |
| | | // }) |
| | | // |
| | | // defer func(cli *clientv3.Client) { |
| | | // err := cli.Close() |
| | | // if err != nil { |
| | | // |
| | | // } |
| | | // }(cli) |
| | | // log.Printf("44444444444444444 %v", endpoints) |
| | | // |
| | | // if err != nil { |
| | | // return err |
| | | // } |
| | | // |
| | | // // put |
| | | // ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) |
| | | // _, err = cli.Put(ctx, key, val) |
| | | // cancel() |
| | | // if err != nil { |
| | | // fmt.Printf("put to etcd failed, err:%v\n", err) |
| | | // return err |
| | | // } |
| | | // |
| | | // //ser := &ServiceRegister{ |
| | | // // cli: cli, |
| | | // // key: key, |
| | | // // val: val, |
| | | // //} |
| | | // // |
| | | // //log.Printf("55555555555555555555555 %v", ser) |
| | | // //申请租约设置时间keepalive |
| | | // //if err := ser.putKeyWithLease(lease); err != nil { |
| | | // // return nil, err |
| | | // //} |
| | | // |
| | | // return err |
| | | //} |
| | | // |
| | | //// 设置租约 |
| | | //func (s *ServiceRegister) putKeyWithLease(lease int64) error { |
| | | // // 创建一个新的租约,并设置ttl时间 |
| | | // resp, err := s.cli.Grant(context.Background(), lease) |
| | | // if err != nil { |
| | | // return err |
| | | // } |
| | | // log.Printf("666666666666666 %v", resp) |
| | | // |
| | | // // 注册服务并绑定租约 |
| | | // _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID)) |
| | | // if err != nil { |
| | | // return err |
| | | // } |
| | | // |
| | | // // 设置续租 定期发送需求请求 |
| | | // // KeepAlive使给定的租约永远有效。 如果发布到通道的keepalive响应没有立即被使用, |
| | | // // 则租约客户端将至少每秒钟继续向etcd服务器发送保持活动请求,直到获取最新的响应为止。 |
| | | // // etcd client会自动发送ttl到etcd server,从而保证该租约一直有效 |
| | | // leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID) |
| | | // if err != nil { |
| | | // return err |
| | | // } |
| | | // log.Printf("777777777777777777 %v", leaseRespChan) |
| | | // |
| | | // s.leaseID = resp.ID |
| | | // s.keepAliveChan = leaseRespChan |
| | | // |
| | | // return nil |
| | | //} |
| | | // |
| | | //// ListenLeaseRespChan 监听 续租情况 |
| | | //func (s *ServiceRegister) ListenLeaseRespChan() { |
| | | // for _ = range s.keepAliveChan { |
| | | // } |
| | | // |
| | | // //for leaseKeepResp := range s.keepAliveChan { |
| | | // // fmt.Println("续约成功", leaseKeepResp) |
| | | // //} |
| | | // |
| | | // //fmt.Println("关闭续租") |
| | | //} |
| | | // |
| | | //// Close 注销服务 |
| | | //func (s *ServiceRegister) Close() error { |
| | | // // 撤销租约 |
| | | // if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil { |
| | | // return err |
| | | // } |
| | | // |
| | | // //fmt.Println("撤销租约") |
| | | // return s.cli.Close() |
| | | //} |
| | | // |
| | | //// GetServeAddr 获取本机地址 |
| | | //func GetServeAddr(addr string) (string, error) { |
| | | // conn, err := net.Dial("udp", addr) |
| | | // if err != nil { |
| | | // return "", err |
| | | // } |
| | | // |
| | | // // 获取ETCD主节点的地址 |
| | | // pods, err := clientset.CoreV1().Pods("kube-system").List(context.TODO(), metav1.ListOptions{ |
| | | // LabelSelector: "component=etcd", |
| | | // conn.Close() |
| | | // |
| | | // localAddr := conn.LocalAddr().(*net.UDPAddr) |
| | | // //ip := strings.Split(localAddr.String(), ":")[0] |
| | | // |
| | | // fmt.Println("++++++++ ip: ", localAddr.IP.String()) |
| | | // return localAddr.IP.String(), nil |
| | | //} |
| | | // |
| | | //func SetEtcdData(key string, value string) { |
| | | // cli, err := clientv3.New(clientv3.Config{ |
| | | // Endpoints: []string{"192.168.20.119:2379"}, |
| | | // DialTimeout: 5 * time.Second, |
| | | // }) |
| | | // if err != nil { |
| | | // return "", err |
| | | // // handle error! |
| | | // fmt.Printf("connect to etcd failed, err:%v\n", err) |
| | | // return |
| | | // } |
| | | // |
| | | // if len(pods.Items) == 0 { |
| | | // return "", fmt.Errorf("no ETCD pods found") |
| | | // fmt.Println("connect to etcd success") |
| | | // defer cli.Close() |
| | | // // put |
| | | // ctx, cancel := context.WithTimeout(context.Background(), time.Second) |
| | | // _, err = cli.Put(ctx, key, value) |
| | | // cancel() |
| | | // if err != nil { |
| | | // fmt.Printf("put to etcd failed, err:%v\n", err) |
| | | // return |
| | | // } |
| | | // |
| | | // leaderEndpoint := pods.Items[0].Status.PodIP + ":2379" |
| | | // return leaderEndpoint, nil |
| | | //} |
| | | // |
| | | ////func GetETCDLeaderEndpoint() (string, error) { |
| | | //// // 获取kubeconfig配置文件路径 |
| | | //// clientset, err := util.GetClient() |
| | | //// if err != nil { |
| | | //// return "", err |
| | | //// } |
| | | //// |
| | | //// // 获取ETCD主节点的地址 |
| | | //// pods, err := clientset.CoreV1().Pods("kube-system").List(context.TODO(), metav1.ListOptions{ |
| | | //// LabelSelector: "component=etcd", |
| | | //// }) |
| | | //// if err != nil { |
| | | //// return "", err |
| | | //// } |
| | | //// |
| | | //// if len(pods.Items) == 0 { |
| | | //// return "", fmt.Errorf("no ETCD pods found") |
| | | //// } |
| | | //// |
| | | //// leaderEndpoint := pods.Items[0].Status.PodIP + ":2379" |
| | | //// return leaderEndpoint, nil |
| | | ////} |