zhangqian
2023-09-01 f4c6c982a275fcdead46a7bdb5704fc39b4f1bb0
接收工艺模型新增消息,plc读取时间可配置
1个文件已添加
8个文件已修改
706 ■■■■ 已修改文件
conf/apsClient.json 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
conf/config.go 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constvar/const.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
crontask/cron_task.go 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
model/process_model.go 264 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/consumer.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/msg_handler.go 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/nsq.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pkg/etcd/client.go 369 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
conf/apsClient.json
@@ -27,16 +27,10 @@
    "nsqdAddr": "121.31.232.83:4150",
    "nsqlookupdAddr":""
  },
  "PLCAddresses":[
    {
      "fieldName": "示例字段1",
      "address": "0x0001"
    },
    {
      "fieldName": "示例字段2",
      "address": "0x0002"
    }
  ]
  "plc": {
    "finishNumberTimeInterval": 100,
    "totalNumberTimeInterval": 1000
  }
}
conf/config.go
@@ -66,9 +66,9 @@
        NsqlookupdAddr string
    }
    PLCAddressItem struct {
        FieldName string
        Address   int
    plc struct {
        FinishNumberTimeInterval int
        TotalNumberTimeInterval  int
    }
    config struct {
@@ -87,8 +87,8 @@
        //NsqConf
        NsqConf nsqConf
        //PLC write address
        PLCAddresses []PLCAddressItem
        //PLC
        PLC plc
    }
)
@@ -127,6 +127,6 @@
    log.Println("......................................................")
    log.Printf("   System:                %+v", Conf.System)
    log.Printf("   Log:                   %+v", Conf.Log)
    log.Printf("   plc address:                  %+v", Conf.PLCAddresses)
    log.Printf("   plc :                  %+v", Conf.PLC)
    log.Println("......................................................")
}
constvar/const.go
@@ -6,6 +6,7 @@
    NsqTopicSendPlcAddress        = "aps.%v.sendPlcAddress"
    NsqTopicProcessParamsRequest  = "aps.%v.processParams.request"
    NsqTopicProcessParamsResponse = "aps.%v.processParams.response"
    NsqTopicApsProcessParams      = "aps.%v.aps.processParams" //有了新的工艺模型
)
type PlcStartAddressType int
crontask/cron_task.go
@@ -1,6 +1,7 @@
package crontask
import (
    "apsClient/conf"
    "apsClient/constvar"
    "apsClient/pkg/ecode"
    "apsClient/pkg/logx"
@@ -10,8 +11,17 @@
)
func InitTask() error {
    finishNumberTimeInterval := conf.Conf.PLC.FinishNumberTimeInterval
    totalNumberTimeInterval := conf.Conf.PLC.TotalNumberTimeInterval
    if finishNumberTimeInterval == 0 {
        finishNumberTimeInterval = 6
    }
    if totalNumberTimeInterval == 0 {
        totalNumberTimeInterval = 60
    }
    s := gocron.NewScheduler(time.UTC)
    _, err := s.Every(9).Seconds().StartImmediately().Do(func() {
    _, err := s.Every(finishNumberTimeInterval).Seconds().StartImmediately().Do(func() {
        plcConfig, code := service.NewDevicePlcService().GetDevicePlc()
        if code != ecode.OK {
            return
@@ -27,7 +37,7 @@
        return err
    }
    s.Every(60).Seconds().StartImmediately().Do(func() {
    s.Every(totalNumberTimeInterval).Seconds().StartImmediately().Do(func() {
        plcConfig, code := service.NewDevicePlcService().GetDevicePlc()
        if code != ecode.OK {
            return
model/process_model.go
New file
@@ -0,0 +1,264 @@
package model
import (
    "apsClient/pkg/sqlitex"
    "fmt"
    "gorm.io/gorm"
)
type (
    // ProcessModel 工艺流程参数
    ProcessModel struct {
        gorm.Model `json:"-"`
        Number     string                 `gorm:"index;column:number;type:varchar(255);not null;default '';comment:工艺模型编号" json:"number"` //工艺模型编号
        OrderId    string                 `gorm:"column:order_id;type:varchar(255);not null;default '';comment:订单id" json:"orderId"`      //订单id
        Product    string                 `gorm:"column:product;type:varchar(255);not null;default '';comment:产品名称" json:"product"`       //产品名称
        Procedure  string                 `gorm:"column:procedure;type:varchar(255);not null;default '';comment:工序" json:"procedure"`     //工序
        WorkOrder  string                 `gorm:"column:work_order;type:varchar(255);not null;default '';comment:工单" json:"workOrder"`    //工单
        Device     string                 `gorm:"column:device;type:varchar(255);not null;default '';comment:设备" json:"device"`           //设备
        Params     string                 `json:"-" gorm:"type:text;comment:工艺参数键值对json串"`
        ParamsMap  map[string]interface{} `json:"paramsMap" gorm:"-"`
    }
    ProcessModelSearch struct {
        ProcessModel
        Order    string
        PageNum  int
        PageSize int
        Orm      *gorm.DB
    }
)
func (slf *ProcessModel) TableName() string {
    return "process_model"
}
func NewProcessModelSearch() *ProcessModelSearch {
    return &ProcessModelSearch{Orm: sqlitex.GetDB()}
}
func (slf *ProcessModelSearch) SetOrm(tx *gorm.DB) *ProcessModelSearch {
    slf.Orm = tx
    return slf
}
func (slf *ProcessModelSearch) SetPage(page, size int) *ProcessModelSearch {
    slf.PageNum, slf.PageSize = page, size
    return slf
}
func (slf *ProcessModelSearch) SetOrder(order string) *ProcessModelSearch {
    slf.Order = order
    return slf
}
func (slf *ProcessModelSearch) SetID(id uint) *ProcessModelSearch {
    slf.ID = id
    return slf
}
func (slf *ProcessModelSearch) SetNumber(number string) *ProcessModelSearch {
    slf.Number = number
    return slf
}
func (slf *ProcessModelSearch) SetWorkOrder(workOrder string) *ProcessModelSearch {
    slf.WorkOrder = workOrder
    return slf
}
func (slf *ProcessModelSearch) SetOrderId(orderId string) *ProcessModelSearch {
    slf.OrderId = orderId
    return slf
}
func (slf *ProcessModelSearch) SetProduct(product string) *ProcessModelSearch {
    slf.Product = product
    return slf
}
func (slf *ProcessModelSearch) SetProcedure(procedure string) *ProcessModelSearch {
    slf.Procedure = procedure
    return slf
}
func (slf *ProcessModelSearch) SetDevice(device string) *ProcessModelSearch {
    slf.Device = device
    return slf
}
func (slf *ProcessModelSearch) build() *gorm.DB {
    var db = slf.Orm.Table(slf.TableName())
    if slf.ID != 0 {
        db = db.Where("id = ?", slf.ID)
    }
    if len(slf.WorkOrder) != 0 {
        db = db.Where("work_order = ?", slf.WorkOrder)
    }
    if len(slf.OrderId) != 0 {
        db = db.Where("order_id = ?", slf.OrderId)
    }
    if len(slf.Product) != 0 {
        db = db.Where("product = ?", slf.Product)
    }
    if len(slf.Procedure) != 0 {
        db = db.Where("`procedure` = ?", slf.Procedure)
    }
    if len(slf.Device) != 0 {
        db = db.Where("device = ?", slf.Device)
    }
    if len(slf.Number) != 0 {
        db = db.Where("number = ?", slf.Number)
    }
    if slf.Order != "" {
        db = db.Order(slf.Order)
    }
    return db
}
// Create 单条插入
func (slf *ProcessModelSearch) Create(record *ProcessModel) error {
    var db = slf.build()
    if err := db.Create(record).Error; err != nil {
        return fmt.Errorf("create err: %v, record: %+v", err, record)
    }
    return nil
}
func (slf *ProcessModelSearch) Save(record *ProcessModel) error {
    var db = slf.build()
    if err := db.Omit("CreatedAt").Save(record).Error; err != nil {
        return fmt.Errorf("save err: %v, record: %+v", err, record)
    }
    return nil
}
func (slf *ProcessModelSearch) UpdateByMap(upMap map[string]interface{}) error {
    var (
        db = slf.build()
    )
    if err := db.Updates(upMap).Error; err != nil {
        return fmt.Errorf("update by map err: %v, upMap: %+v", err, upMap)
    }
    return nil
}
func (slf *ProcessModelSearch) UpdateByQuery(query string, args []interface{}, upMap map[string]interface{}) error {
    var (
        db = slf.Orm.Table(slf.TableName()).Where(query, args...)
    )
    if err := db.Updates(upMap).Error; err != nil {
        return fmt.Errorf("update by query err: %v, query: %s, args: %+v, upMap: %+v", err, query, args, upMap)
    }
    return nil
}
func (slf *ProcessModelSearch) Delete() error {
    var db = slf.build()
    if err := db.Unscoped().Delete(&ProcessModel{}).Error; err != nil {
        return err
    }
    return nil
}
func (slf *ProcessModelSearch) First() (*ProcessModel, error) {
    var (
        record = new(ProcessModel)
        db     = slf.build()
    )
    if err := db.First(record).Error; err != nil {
        return record, err
    }
    return record, nil
}
func (slf *ProcessModelSearch) Find() ([]*ProcessModel, int64, error) {
    var (
        records = make([]*ProcessModel, 0)
        total   int64
        db      = slf.build()
    )
    if err := db.Count(&total).Error; err != nil {
        return records, total, fmt.Errorf("find count err: %v", err)
    }
    if slf.PageNum*slf.PageSize > 0 {
        db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
    }
    if err := db.Find(&records).Error; err != nil {
        return records, total, fmt.Errorf("find records err: %v", err)
    }
    return records, total, nil
}
func (slf *ProcessModelSearch) FindNotTotal() ([]*ProcessModel, error) {
    var (
        records = make([]*ProcessModel, 0)
        db      = slf.build()
    )
    if slf.PageNum*slf.PageSize > 0 {
        db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
    }
    if err := db.Find(&records).Error; err != nil {
        return records, fmt.Errorf("find records err: %v", err)
    }
    return records, nil
}
// FindByQuery 指定条件查询.
func (slf *ProcessModelSearch) FindByQuery(query string, args []interface{}) ([]*ProcessModel, int64, error) {
    var (
        records = make([]*ProcessModel, 0)
        total   int64
        db      = slf.Orm.Table(slf.TableName()).Where(query, args...)
    )
    if err := db.Count(&total).Error; err != nil {
        return records, total, fmt.Errorf("find by query count err: %v", err)
    }
    if slf.PageNum*slf.PageSize > 0 {
        db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
    }
    if err := db.Find(&records).Error; err != nil {
        return records, total, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args)
    }
    return records, total, nil
}
// FindByQueryNotTotal 指定条件查询&不查询总条数.
func (slf *ProcessModelSearch) FindByQueryNotTotal(query string, args []interface{}) ([]*ProcessModel, error) {
    var (
        records = make([]*ProcessModel, 0)
        db      = slf.Orm.Table(slf.TableName()).Where(query, args...)
    )
    if slf.PageNum*slf.PageSize > 0 {
        db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
    }
    if err := db.Find(&records).Error; err != nil {
        return records, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args)
    }
    return records, nil
}
nsq/consumer.go
@@ -24,6 +24,8 @@
        handler = &PlcAddress{Topic: topic}
    case fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId):
        handler = &ProcessParams{Topic: topic}
    case fmt.Sprintf(constvar.NsqTopicApsProcessParams, conf.Conf.NsqConf.NodeId):
        handler = &ProcessParamsSync{Topic: topic}
    }
    c.AddHandler(handler.HandleMessage)
nsq/msg_handler.go
@@ -134,12 +134,6 @@
func (slf *ProcessParams) HandleMessage(data []byte) (err error) {
    logx.Infof("get an process params message :%s", data)
    var resp = new(common.ResponseProcessParams)
    err = json.Unmarshal(data, &resp)
    if err != nil {
        logx.Errorf("ScheduleTask HandleMessage Unmarshal json err: %v", err.Error())
        return err
    }
    //通知回复收到
    ReceivedMessageChan <- &ReceivedMessage{
        Topic:   slf.Topic,
@@ -147,3 +141,23 @@
    }
    return nil
}
type ProcessParamsSync struct {
    Topic string
}
func (slf *ProcessParamsSync) HandleMessage(data []byte) (err error) {
    logx.Infof("get an process params sync message :%s", data)
    var processModel model.ProcessModel
    err = json.Unmarshal(data, &processModel)
    if err != nil {
        logx.Infof("unmarshal process params sync err :%s", err)
        return err
    }
    err = model.NewProcessModelSearch().Create(&processModel)
    if err != nil {
        logx.Infof("save process params sync err :%s", err)
        return err
    }
    return nil
}
nsq/nsq.go
@@ -41,5 +41,9 @@
        _ = Consume(fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
    })
    safe.Go(func() {
        _ = Consume(fmt.Sprintf(constvar.NsqTopicApsProcessParams, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
    })
    return nil
}
pkg/etcd/client.go
@@ -1,195 +1,196 @@
package etcd
import (
    "apsClient/conf"
    "context"
    "fmt"
    "go.etcd.io/etcd/client/pkg/v3/transport"
    "log"
    "net"
    "time"
    clientv3 "go.etcd.io/etcd/client/v3"
)
// ServiceRegister 创建租约注册服务
type ServiceRegister struct {
    cli           *clientv3.Client                        // etcd v3 client
    leaseID       clientv3.LeaseID                        // 租约ID
    keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse // 租约keepalieve相应chan
    key           string                                  // key
    val           string                                  // value
}
// NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, key, val string, lease int64, dailTimeout int) error {
    // todo 证书
    tlsInfo := transport.TLSInfo{
        TrustedCAFile: conf.Conf.Etcd.Tls.CaFile,
        CertFile:      conf.Conf.Etcd.Tls.CertFile,
        KeyFile:       conf.Conf.Etcd.Tls.KeyFile,
    }
    tlsConfig, err := tlsInfo.ClientConfig()
    if err != nil {
        fmt.Printf("tlsconfig failed, err:%v\n", err)
    }
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: time.Duration(dailTimeout) * time.Second,
        TLS:         tlsConfig,
    })
    defer func(cli *clientv3.Client) {
        err := cli.Close()
        if err != nil {
        }
    }(cli)
    log.Printf("44444444444444444  %v", endpoints)
    if err != nil {
        return err
    }
    // put
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    _, err = cli.Put(ctx, key, val)
    cancel()
    if err != nil {
        fmt.Printf("put to etcd failed, err:%v\n", err)
        return err
    }
    //ser := &ServiceRegister{
    //    cli: cli,
    //    key: key,
    //    val: val,
    //}
    //
    //log.Printf("55555555555555555555555  %v", ser)
    //申请租约设置时间keepalive
    //if err := ser.putKeyWithLease(lease); err != nil {
    //    return nil, err
    //}
    return err
}
// 设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
    // 创建一个新的租约,并设置ttl时间
    resp, err := s.cli.Grant(context.Background(), lease)
    if err != nil {
        return err
    }
    log.Printf("666666666666666  %v", resp)
    // 注册服务并绑定租约
    _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
    if err != nil {
        return err
    }
    // 设置续租 定期发送需求请求
    // KeepAlive使给定的租约永远有效。 如果发布到通道的keepalive响应没有立即被使用,
    // 则租约客户端将至少每秒钟继续向etcd服务器发送保持活动请求,直到获取最新的响应为止。
    // etcd client会自动发送ttl到etcd server,从而保证该租约一直有效
    leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
    if err != nil {
        return err
    }
    log.Printf("777777777777777777  %v", leaseRespChan)
    s.leaseID = resp.ID
    s.keepAliveChan = leaseRespChan
    return nil
}
// ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
    for _ = range s.keepAliveChan {
    }
    //for leaseKeepResp := range s.keepAliveChan {
    //    fmt.Println("续约成功", leaseKeepResp)
    //}
    //fmt.Println("关闭续租")
}
// Close 注销服务
func (s *ServiceRegister) Close() error {
    // 撤销租约
    if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
        return err
    }
    //fmt.Println("撤销租约")
    return s.cli.Close()
}
// GetServeAddr 获取本机地址
func GetServeAddr(addr string) (string, error) {
    conn, err := net.Dial("udp", addr)
    if err != nil {
        return "", err
    }
    conn.Close()
    localAddr := conn.LocalAddr().(*net.UDPAddr)
    //ip := strings.Split(localAddr.String(), ":")[0]
    fmt.Println("++++++++ ip: ", localAddr.IP.String())
    return localAddr.IP.String(), nil
}
func SetEtcdData(key string, value string) {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"192.168.20.119:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        // handle error!
        fmt.Printf("connect to etcd failed, err:%v\n", err)
        return
    }
    fmt.Println("connect to etcd success")
    defer cli.Close()
    // put
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    _, err = cli.Put(ctx, key, value)
    cancel()
    if err != nil {
        fmt.Printf("put to etcd failed, err:%v\n", err)
        return
    }
}
//func GetETCDLeaderEndpoint() (string, error) {
//    // 获取kubeconfig配置文件路径
//    clientset, err := util.GetClient()
//
//import (
//    "apsClient/conf"
//    "context"
//    "fmt"
//    "go.etcd.io/etcd/client/pkg/v3/transport"
//    "log"
//    "net"
//    "time"
//
//    clientv3 "go.etcd.io/etcd/client/v3"
//)
//
//// ServiceRegister 创建租约注册服务
//type ServiceRegister struct {
//    cli           *clientv3.Client                        // etcd v3 client
//    leaseID       clientv3.LeaseID                        // 租约ID
//    keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse // 租约keepalieve相应chan
//    key           string                                  // key
//    val           string                                  // value
//}
//
//// NewServiceRegister 新建注册服务
//func NewServiceRegister(endpoints []string, key, val string, lease int64, dailTimeout int) error {
//
//    // todo 证书
//    tlsInfo := transport.TLSInfo{
//        TrustedCAFile: conf.Conf.Etcd.Tls.CaFile,
//        CertFile:      conf.Conf.Etcd.Tls.CertFile,
//        KeyFile:       conf.Conf.Etcd.Tls.KeyFile,
//    }
//
//    tlsConfig, err := tlsInfo.ClientConfig()
//
//    if err != nil {
//        fmt.Printf("tlsconfig failed, err:%v\n", err)
//    }
//
//    cli, err := clientv3.New(clientv3.Config{
//        Endpoints:   endpoints,
//        DialTimeout: time.Duration(dailTimeout) * time.Second,
//        TLS:         tlsConfig,
//    })
//
//    defer func(cli *clientv3.Client) {
//        err := cli.Close()
//        if err != nil {
//
//        }
//    }(cli)
//    log.Printf("44444444444444444  %v", endpoints)
//
//    if err != nil {
//        return err
//    }
//
//    // put
//    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
//    _, err = cli.Put(ctx, key, val)
//    cancel()
//    if err != nil {
//        fmt.Printf("put to etcd failed, err:%v\n", err)
//        return err
//    }
//
//    //ser := &ServiceRegister{
//    //    cli: cli,
//    //    key: key,
//    //    val: val,
//    //}
//    //
//    //log.Printf("55555555555555555555555  %v", ser)
//    //申请租约设置时间keepalive
//    //if err := ser.putKeyWithLease(lease); err != nil {
//    //    return nil, err
//    //}
//
//    return err
//}
//
//// 设置租约
//func (s *ServiceRegister) putKeyWithLease(lease int64) error {
//    // 创建一个新的租约,并设置ttl时间
//    resp, err := s.cli.Grant(context.Background(), lease)
//    if err != nil {
//        return err
//    }
//    log.Printf("666666666666666  %v", resp)
//
//    // 注册服务并绑定租约
//    _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
//    if err != nil {
//        return err
//    }
//
//    // 设置续租 定期发送需求请求
//    // KeepAlive使给定的租约永远有效。 如果发布到通道的keepalive响应没有立即被使用,
//    // 则租约客户端将至少每秒钟继续向etcd服务器发送保持活动请求,直到获取最新的响应为止。
//    // etcd client会自动发送ttl到etcd server,从而保证该租约一直有效
//    leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
//    if err != nil {
//        return err
//    }
//    log.Printf("777777777777777777  %v", leaseRespChan)
//
//    s.leaseID = resp.ID
//    s.keepAliveChan = leaseRespChan
//
//    return nil
//}
//
//// ListenLeaseRespChan 监听 续租情况
//func (s *ServiceRegister) ListenLeaseRespChan() {
//    for _ = range s.keepAliveChan {
//    }
//
//    //for leaseKeepResp := range s.keepAliveChan {
//    //    fmt.Println("续约成功", leaseKeepResp)
//    //}
//
//    //fmt.Println("关闭续租")
//}
//
//// Close 注销服务
//func (s *ServiceRegister) Close() error {
//    // 撤销租约
//    if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
//        return err
//    }
//
//    //fmt.Println("撤销租约")
//    return s.cli.Close()
//}
//
//// GetServeAddr 获取本机地址
//func GetServeAddr(addr string) (string, error) {
//    conn, err := net.Dial("udp", addr)
//    if err != nil {
//        return "", err
//    }
//
//    // 获取ETCD主节点的地址
//    pods, err := clientset.CoreV1().Pods("kube-system").List(context.TODO(), metav1.ListOptions{
//        LabelSelector: "component=etcd",
//    conn.Close()
//
//    localAddr := conn.LocalAddr().(*net.UDPAddr)
//    //ip := strings.Split(localAddr.String(), ":")[0]
//
//    fmt.Println("++++++++ ip: ", localAddr.IP.String())
//    return localAddr.IP.String(), nil
//}
//
//func SetEtcdData(key string, value string) {
//    cli, err := clientv3.New(clientv3.Config{
//        Endpoints:   []string{"192.168.20.119:2379"},
//        DialTimeout: 5 * time.Second,
//    })
//    if err != nil {
//        return "", err
//        // handle error!
//        fmt.Printf("connect to etcd failed, err:%v\n", err)
//        return
//    }
//
//    if len(pods.Items) == 0 {
//        return "", fmt.Errorf("no ETCD pods found")
//    fmt.Println("connect to etcd success")
//    defer cli.Close()
//    // put
//    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
//    _, err = cli.Put(ctx, key, value)
//    cancel()
//    if err != nil {
//        fmt.Printf("put to etcd failed, err:%v\n", err)
//        return
//    }
//
//    leaderEndpoint := pods.Items[0].Status.PodIP + ":2379"
//    return leaderEndpoint, nil
//}
//
////func GetETCDLeaderEndpoint() (string, error) {
////    // 获取kubeconfig配置文件路径
////    clientset, err := util.GetClient()
////    if err != nil {
////        return "", err
////    }
////
////    // 获取ETCD主节点的地址
////    pods, err := clientset.CoreV1().Pods("kube-system").List(context.TODO(), metav1.ListOptions{
////        LabelSelector: "component=etcd",
////    })
////    if err != nil {
////        return "", err
////    }
////
////    if len(pods.Items) == 0 {
////        return "", fmt.Errorf("no ETCD pods found")
////    }
////
////    leaderEndpoint := pods.Items[0].Status.PodIP + ":2379"
////    return leaderEndpoint, nil
////}