From f4c6c982a275fcdead46a7bdb5704fc39b4f1bb0 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期五, 01 九月 2023 16:46:14 +0800 Subject: [PATCH] 接收工艺模型新增消息,plc读取时间可配置 --- constvar/const.go | 1 nsq/msg_handler.go | 26 + conf/config.go | 12 conf/apsClient.json | 14 model/process_model.go | 264 ++++++++++++++++++++++ crontask/cron_task.go | 14 + nsq/consumer.go | 2 nsq/nsq.go | 4 pkg/etcd/client.go | 369 +++++++++++++++--------------- 9 files changed, 498 insertions(+), 208 deletions(-) diff --git a/conf/apsClient.json b/conf/apsClient.json index 677bcec..ed4e16c 100644 --- a/conf/apsClient.json +++ b/conf/apsClient.json @@ -27,16 +27,10 @@ "nsqdAddr": "121.31.232.83:4150", "nsqlookupdAddr":"" }, - "PLCAddresses":[ - { - "fieldName": "绀轰緥瀛楁1", - "address": "0x0001" - }, - { - "fieldName": "绀轰緥瀛楁2", - "address": "0x0002" - } - ] + "plc": { + "finishNumberTimeInterval": 100, + "totalNumberTimeInterval": 1000 + } } diff --git a/conf/config.go b/conf/config.go index 0216051..65b12ca 100644 --- a/conf/config.go +++ b/conf/config.go @@ -66,9 +66,9 @@ NsqlookupdAddr string } - PLCAddressItem struct { - FieldName string - Address int + plc struct { + FinishNumberTimeInterval int + TotalNumberTimeInterval int } config struct { @@ -87,8 +87,8 @@ //NsqConf NsqConf nsqConf - //PLC write address - PLCAddresses []PLCAddressItem + //PLC + PLC plc } ) @@ -127,6 +127,6 @@ log.Println("......................................................") log.Printf(" System: %+v", Conf.System) log.Printf(" Log: %+v", Conf.Log) - log.Printf(" plc address: %+v", Conf.PLCAddresses) + log.Printf(" plc : %+v", Conf.PLC) log.Println("......................................................") } diff --git a/constvar/const.go b/constvar/const.go index 2ec69ac..3856612 100644 --- a/constvar/const.go +++ b/constvar/const.go @@ -6,6 +6,7 @@ NsqTopicSendPlcAddress = "aps.%v.sendPlcAddress" NsqTopicProcessParamsRequest = "aps.%v.processParams.request" NsqTopicProcessParamsResponse = "aps.%v.processParams.response" + NsqTopicApsProcessParams = "aps.%v.aps.processParams" //鏈変簡鏂扮殑宸ヨ壓妯″瀷 ) type PlcStartAddressType int diff --git a/crontask/cron_task.go b/crontask/cron_task.go index e27df3e..0ad2210 100644 --- a/crontask/cron_task.go +++ b/crontask/cron_task.go @@ -1,6 +1,7 @@ package crontask import ( + "apsClient/conf" "apsClient/constvar" "apsClient/pkg/ecode" "apsClient/pkg/logx" @@ -10,8 +11,17 @@ ) func InitTask() error { + + finishNumberTimeInterval := conf.Conf.PLC.FinishNumberTimeInterval + totalNumberTimeInterval := conf.Conf.PLC.TotalNumberTimeInterval + if finishNumberTimeInterval == 0 { + finishNumberTimeInterval = 6 + } + if totalNumberTimeInterval == 0 { + totalNumberTimeInterval = 60 + } s := gocron.NewScheduler(time.UTC) - _, err := s.Every(9).Seconds().StartImmediately().Do(func() { + _, err := s.Every(finishNumberTimeInterval).Seconds().StartImmediately().Do(func() { plcConfig, code := service.NewDevicePlcService().GetDevicePlc() if code != ecode.OK { return @@ -27,7 +37,7 @@ return err } - s.Every(60).Seconds().StartImmediately().Do(func() { + s.Every(totalNumberTimeInterval).Seconds().StartImmediately().Do(func() { plcConfig, code := service.NewDevicePlcService().GetDevicePlc() if code != ecode.OK { return diff --git a/model/process_model.go b/model/process_model.go new file mode 100644 index 0000000..e217d51 --- /dev/null +++ b/model/process_model.go @@ -0,0 +1,264 @@ +package model + +import ( + "apsClient/pkg/sqlitex" + "fmt" + "gorm.io/gorm" +) + +type ( + // ProcessModel 宸ヨ壓娴佺▼鍙傛暟 + ProcessModel struct { + gorm.Model `json:"-"` + Number string `gorm:"index;column:number;type:varchar(255);not null;default '';comment:宸ヨ壓妯″瀷缂栧彿" json:"number"` //宸ヨ壓妯″瀷缂栧彿 + OrderId string `gorm:"column:order_id;type:varchar(255);not null;default '';comment:璁㈠崟id" json:"orderId"` //璁㈠崟id + Product string `gorm:"column:product;type:varchar(255);not null;default '';comment:浜у搧鍚嶇О" json:"product"` //浜у搧鍚嶇О + Procedure string `gorm:"column:procedure;type:varchar(255);not null;default '';comment:宸ュ簭" json:"procedure"` //宸ュ簭 + WorkOrder string `gorm:"column:work_order;type:varchar(255);not null;default '';comment:宸ュ崟" json:"workOrder"` //宸ュ崟 + Device string `gorm:"column:device;type:varchar(255);not null;default '';comment:璁惧" json:"device"` //璁惧 + Params string `json:"-" gorm:"type:text;comment:宸ヨ壓鍙傛暟閿�煎json涓�"` + ParamsMap map[string]interface{} `json:"paramsMap" gorm:"-"` + } + + ProcessModelSearch struct { + ProcessModel + Order string + PageNum int + PageSize int + Orm *gorm.DB + } +) + +func (slf *ProcessModel) TableName() string { + return "process_model" +} + +func NewProcessModelSearch() *ProcessModelSearch { + return &ProcessModelSearch{Orm: sqlitex.GetDB()} +} + +func (slf *ProcessModelSearch) SetOrm(tx *gorm.DB) *ProcessModelSearch { + slf.Orm = tx + return slf +} + +func (slf *ProcessModelSearch) SetPage(page, size int) *ProcessModelSearch { + slf.PageNum, slf.PageSize = page, size + return slf +} + +func (slf *ProcessModelSearch) SetOrder(order string) *ProcessModelSearch { + slf.Order = order + return slf +} + +func (slf *ProcessModelSearch) SetID(id uint) *ProcessModelSearch { + slf.ID = id + return slf +} + +func (slf *ProcessModelSearch) SetNumber(number string) *ProcessModelSearch { + slf.Number = number + return slf +} + +func (slf *ProcessModelSearch) SetWorkOrder(workOrder string) *ProcessModelSearch { + slf.WorkOrder = workOrder + return slf +} +func (slf *ProcessModelSearch) SetOrderId(orderId string) *ProcessModelSearch { + slf.OrderId = orderId + return slf +} +func (slf *ProcessModelSearch) SetProduct(product string) *ProcessModelSearch { + slf.Product = product + return slf +} +func (slf *ProcessModelSearch) SetProcedure(procedure string) *ProcessModelSearch { + slf.Procedure = procedure + return slf +} +func (slf *ProcessModelSearch) SetDevice(device string) *ProcessModelSearch { + slf.Device = device + return slf +} + +func (slf *ProcessModelSearch) build() *gorm.DB { + var db = slf.Orm.Table(slf.TableName()) + + if slf.ID != 0 { + db = db.Where("id = ?", slf.ID) + } + + if len(slf.WorkOrder) != 0 { + db = db.Where("work_order = ?", slf.WorkOrder) + } + + if len(slf.OrderId) != 0 { + db = db.Where("order_id = ?", slf.OrderId) + } + + if len(slf.Product) != 0 { + db = db.Where("product = ?", slf.Product) + } + + if len(slf.Procedure) != 0 { + db = db.Where("`procedure` = ?", slf.Procedure) + } + + if len(slf.Device) != 0 { + db = db.Where("device = ?", slf.Device) + } + + if len(slf.Number) != 0 { + db = db.Where("number = ?", slf.Number) + } + + if slf.Order != "" { + db = db.Order(slf.Order) + } + + return db +} + +// Create 鍗曟潯鎻掑叆 +func (slf *ProcessModelSearch) Create(record *ProcessModel) error { + var db = slf.build() + + if err := db.Create(record).Error; err != nil { + return fmt.Errorf("create err: %v, record: %+v", err, record) + } + + return nil +} + +func (slf *ProcessModelSearch) Save(record *ProcessModel) error { + var db = slf.build() + + if err := db.Omit("CreatedAt").Save(record).Error; err != nil { + return fmt.Errorf("save err: %v, record: %+v", err, record) + } + + return nil +} + +func (slf *ProcessModelSearch) UpdateByMap(upMap map[string]interface{}) error { + var ( + db = slf.build() + ) + + if err := db.Updates(upMap).Error; err != nil { + return fmt.Errorf("update by map err: %v, upMap: %+v", err, upMap) + } + + return nil +} + +func (slf *ProcessModelSearch) UpdateByQuery(query string, args []interface{}, upMap map[string]interface{}) error { + var ( + db = slf.Orm.Table(slf.TableName()).Where(query, args...) + ) + + if err := db.Updates(upMap).Error; err != nil { + return fmt.Errorf("update by query err: %v, query: %s, args: %+v, upMap: %+v", err, query, args, upMap) + } + + return nil +} + +func (slf *ProcessModelSearch) Delete() error { + var db = slf.build() + + if err := db.Unscoped().Delete(&ProcessModel{}).Error; err != nil { + return err + } + + return nil +} + +func (slf *ProcessModelSearch) First() (*ProcessModel, error) { + var ( + record = new(ProcessModel) + db = slf.build() + ) + + if err := db.First(record).Error; err != nil { + return record, err + } + + return record, nil +} + +func (slf *ProcessModelSearch) Find() ([]*ProcessModel, int64, error) { + var ( + records = make([]*ProcessModel, 0) + total int64 + db = slf.build() + ) + + if err := db.Count(&total).Error; err != nil { + return records, total, fmt.Errorf("find count err: %v", err) + } + if slf.PageNum*slf.PageSize > 0 { + db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) + } + if err := db.Find(&records).Error; err != nil { + return records, total, fmt.Errorf("find records err: %v", err) + } + + return records, total, nil +} + +func (slf *ProcessModelSearch) FindNotTotal() ([]*ProcessModel, error) { + var ( + records = make([]*ProcessModel, 0) + db = slf.build() + ) + + if slf.PageNum*slf.PageSize > 0 { + db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) + } + if err := db.Find(&records).Error; err != nil { + return records, fmt.Errorf("find records err: %v", err) + } + + return records, nil +} + +// FindByQuery 鎸囧畾鏉′欢鏌ヨ. +func (slf *ProcessModelSearch) FindByQuery(query string, args []interface{}) ([]*ProcessModel, int64, error) { + var ( + records = make([]*ProcessModel, 0) + total int64 + db = slf.Orm.Table(slf.TableName()).Where(query, args...) + ) + + if err := db.Count(&total).Error; err != nil { + return records, total, fmt.Errorf("find by query count err: %v", err) + } + if slf.PageNum*slf.PageSize > 0 { + db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) + } + if err := db.Find(&records).Error; err != nil { + return records, total, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args) + } + + return records, total, nil +} + +// FindByQueryNotTotal 鎸囧畾鏉′欢鏌ヨ&涓嶆煡璇㈡�绘潯鏁�. +func (slf *ProcessModelSearch) FindByQueryNotTotal(query string, args []interface{}) ([]*ProcessModel, error) { + var ( + records = make([]*ProcessModel, 0) + db = slf.Orm.Table(slf.TableName()).Where(query, args...) + ) + + if slf.PageNum*slf.PageSize > 0 { + db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) + } + if err := db.Find(&records).Error; err != nil { + return records, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args) + } + + return records, nil +} diff --git a/nsq/consumer.go b/nsq/consumer.go index d7b55e3..f4a2b66 100644 --- a/nsq/consumer.go +++ b/nsq/consumer.go @@ -24,6 +24,8 @@ handler = &PlcAddress{Topic: topic} case fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId): handler = &ProcessParams{Topic: topic} + case fmt.Sprintf(constvar.NsqTopicApsProcessParams, conf.Conf.NsqConf.NodeId): + handler = &ProcessParamsSync{Topic: topic} } c.AddHandler(handler.HandleMessage) diff --git a/nsq/msg_handler.go b/nsq/msg_handler.go index be6dea7..69ca36e 100644 --- a/nsq/msg_handler.go +++ b/nsq/msg_handler.go @@ -134,12 +134,6 @@ func (slf *ProcessParams) HandleMessage(data []byte) (err error) { logx.Infof("get an process params message :%s", data) - var resp = new(common.ResponseProcessParams) - err = json.Unmarshal(data, &resp) - if err != nil { - logx.Errorf("ScheduleTask HandleMessage Unmarshal json err: %v", err.Error()) - return err - } //閫氱煡鍥炲鏀跺埌 ReceivedMessageChan <- &ReceivedMessage{ Topic: slf.Topic, @@ -147,3 +141,23 @@ } return nil } + +type ProcessParamsSync struct { + Topic string +} + +func (slf *ProcessParamsSync) HandleMessage(data []byte) (err error) { + logx.Infof("get an process params sync message :%s", data) + var processModel model.ProcessModel + err = json.Unmarshal(data, &processModel) + if err != nil { + logx.Infof("unmarshal process params sync err :%s", err) + return err + } + err = model.NewProcessModelSearch().Create(&processModel) + if err != nil { + logx.Infof("save process params sync err :%s", err) + return err + } + return nil +} diff --git a/nsq/nsq.go b/nsq/nsq.go index f0eb882..79d792b 100644 --- a/nsq/nsq.go +++ b/nsq/nsq.go @@ -41,5 +41,9 @@ _ = Consume(fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) }) + safe.Go(func() { + _ = Consume(fmt.Sprintf(constvar.NsqTopicApsProcessParams, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) + }) + return nil } diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 25f5dce..c999c34 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -1,195 +1,196 @@ package etcd -import ( - "apsClient/conf" - "context" - "fmt" - "go.etcd.io/etcd/client/pkg/v3/transport" - "log" - "net" - "time" - - clientv3 "go.etcd.io/etcd/client/v3" -) - -// ServiceRegister 鍒涘缓绉熺害娉ㄥ唽鏈嶅姟 -type ServiceRegister struct { - cli *clientv3.Client // etcd v3 client - leaseID clientv3.LeaseID // 绉熺害ID - keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse // 绉熺害keepalieve鐩稿簲chan - key string // key - val string // value -} - -// NewServiceRegister 鏂板缓娉ㄥ唽鏈嶅姟 -func NewServiceRegister(endpoints []string, key, val string, lease int64, dailTimeout int) error { - - // todo 璇佷功 - tlsInfo := transport.TLSInfo{ - TrustedCAFile: conf.Conf.Etcd.Tls.CaFile, - CertFile: conf.Conf.Etcd.Tls.CertFile, - KeyFile: conf.Conf.Etcd.Tls.KeyFile, - } - - tlsConfig, err := tlsInfo.ClientConfig() - - if err != nil { - fmt.Printf("tlsconfig failed, err:%v\n", err) - } - - cli, err := clientv3.New(clientv3.Config{ - Endpoints: endpoints, - DialTimeout: time.Duration(dailTimeout) * time.Second, - TLS: tlsConfig, - }) - - defer func(cli *clientv3.Client) { - err := cli.Close() - if err != nil { - - } - }(cli) - log.Printf("44444444444444444 %v", endpoints) - - if err != nil { - return err - } - - // put - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - _, err = cli.Put(ctx, key, val) - cancel() - if err != nil { - fmt.Printf("put to etcd failed, err:%v\n", err) - return err - } - - //ser := &ServiceRegister{ - // cli: cli, - // key: key, - // val: val, - //} - // - //log.Printf("55555555555555555555555 %v", ser) - //鐢宠绉熺害璁剧疆鏃堕棿keepalive - //if err := ser.putKeyWithLease(lease); err != nil { - // return nil, err - //} - - return err -} - -// 璁剧疆绉熺害 -func (s *ServiceRegister) putKeyWithLease(lease int64) error { - // 鍒涘缓涓�涓柊鐨勭绾︼紝骞惰缃畉tl鏃堕棿 - resp, err := s.cli.Grant(context.Background(), lease) - if err != nil { - return err - } - log.Printf("666666666666666 %v", resp) - - // 娉ㄥ唽鏈嶅姟骞剁粦瀹氱绾� - _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID)) - if err != nil { - return err - } - - // 璁剧疆缁 瀹氭湡鍙戦�侀渶姹傝姹� - // KeepAlive浣跨粰瀹氱殑绉熺害姘歌繙鏈夋晥銆� 濡傛灉鍙戝竷鍒伴�氶亾鐨刱eepalive鍝嶅簲娌℃湁绔嬪嵆琚娇鐢紝 - // 鍒欑绾﹀鎴风灏嗚嚦灏戞瘡绉掗挓缁х画鍚慹tcd鏈嶅姟鍣ㄥ彂閫佷繚鎸佹椿鍔ㄨ姹傦紝鐩村埌鑾峰彇鏈�鏂扮殑鍝嶅簲涓烘銆� - // etcd client浼氳嚜鍔ㄥ彂閫乼tl鍒癳tcd server锛屼粠鑰屼繚璇佽绉熺害涓�鐩存湁鏁� - leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID) - if err != nil { - return err - } - log.Printf("777777777777777777 %v", leaseRespChan) - - s.leaseID = resp.ID - s.keepAliveChan = leaseRespChan - - return nil -} - -// ListenLeaseRespChan 鐩戝惉 缁鎯呭喌 -func (s *ServiceRegister) ListenLeaseRespChan() { - for _ = range s.keepAliveChan { - } - - //for leaseKeepResp := range s.keepAliveChan { - // fmt.Println("缁害鎴愬姛", leaseKeepResp) - //} - - //fmt.Println("鍏抽棴缁") -} - -// Close 娉ㄩ攢鏈嶅姟 -func (s *ServiceRegister) Close() error { - // 鎾ら攢绉熺害 - if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil { - return err - } - - //fmt.Println("鎾ら攢绉熺害") - return s.cli.Close() -} - -// GetServeAddr 鑾峰彇鏈満鍦板潃 -func GetServeAddr(addr string) (string, error) { - conn, err := net.Dial("udp", addr) - if err != nil { - return "", err - } - - conn.Close() - - localAddr := conn.LocalAddr().(*net.UDPAddr) - //ip := strings.Split(localAddr.String(), ":")[0] - - fmt.Println("++++++++ ip: ", localAddr.IP.String()) - return localAddr.IP.String(), nil -} - -func SetEtcdData(key string, value string) { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: []string{"192.168.20.119:2379"}, - DialTimeout: 5 * time.Second, - }) - if err != nil { - // handle error! - fmt.Printf("connect to etcd failed, err:%v\n", err) - return - } - fmt.Println("connect to etcd success") - defer cli.Close() - // put - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - _, err = cli.Put(ctx, key, value) - cancel() - if err != nil { - fmt.Printf("put to etcd failed, err:%v\n", err) - return - } -} - -//func GetETCDLeaderEndpoint() (string, error) { -// // 鑾峰彇kubeconfig閰嶇疆鏂囦欢璺緞 -// clientset, err := util.GetClient() +// +//import ( +// "apsClient/conf" +// "context" +// "fmt" +// "go.etcd.io/etcd/client/pkg/v3/transport" +// "log" +// "net" +// "time" +// +// clientv3 "go.etcd.io/etcd/client/v3" +//) +// +//// ServiceRegister 鍒涘缓绉熺害娉ㄥ唽鏈嶅姟 +//type ServiceRegister struct { +// cli *clientv3.Client // etcd v3 client +// leaseID clientv3.LeaseID // 绉熺害ID +// keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse // 绉熺害keepalieve鐩稿簲chan +// key string // key +// val string // value +//} +// +//// NewServiceRegister 鏂板缓娉ㄥ唽鏈嶅姟 +//func NewServiceRegister(endpoints []string, key, val string, lease int64, dailTimeout int) error { +// +// // todo 璇佷功 +// tlsInfo := transport.TLSInfo{ +// TrustedCAFile: conf.Conf.Etcd.Tls.CaFile, +// CertFile: conf.Conf.Etcd.Tls.CertFile, +// KeyFile: conf.Conf.Etcd.Tls.KeyFile, +// } +// +// tlsConfig, err := tlsInfo.ClientConfig() +// +// if err != nil { +// fmt.Printf("tlsconfig failed, err:%v\n", err) +// } +// +// cli, err := clientv3.New(clientv3.Config{ +// Endpoints: endpoints, +// DialTimeout: time.Duration(dailTimeout) * time.Second, +// TLS: tlsConfig, +// }) +// +// defer func(cli *clientv3.Client) { +// err := cli.Close() +// if err != nil { +// +// } +// }(cli) +// log.Printf("44444444444444444 %v", endpoints) +// +// if err != nil { +// return err +// } +// +// // put +// ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) +// _, err = cli.Put(ctx, key, val) +// cancel() +// if err != nil { +// fmt.Printf("put to etcd failed, err:%v\n", err) +// return err +// } +// +// //ser := &ServiceRegister{ +// // cli: cli, +// // key: key, +// // val: val, +// //} +// // +// //log.Printf("55555555555555555555555 %v", ser) +// //鐢宠绉熺害璁剧疆鏃堕棿keepalive +// //if err := ser.putKeyWithLease(lease); err != nil { +// // return nil, err +// //} +// +// return err +//} +// +//// 璁剧疆绉熺害 +//func (s *ServiceRegister) putKeyWithLease(lease int64) error { +// // 鍒涘缓涓�涓柊鐨勭绾︼紝骞惰缃畉tl鏃堕棿 +// resp, err := s.cli.Grant(context.Background(), lease) +// if err != nil { +// return err +// } +// log.Printf("666666666666666 %v", resp) +// +// // 娉ㄥ唽鏈嶅姟骞剁粦瀹氱绾� +// _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID)) +// if err != nil { +// return err +// } +// +// // 璁剧疆缁 瀹氭湡鍙戦�侀渶姹傝姹� +// // KeepAlive浣跨粰瀹氱殑绉熺害姘歌繙鏈夋晥銆� 濡傛灉鍙戝竷鍒伴�氶亾鐨刱eepalive鍝嶅簲娌℃湁绔嬪嵆琚娇鐢紝 +// // 鍒欑绾﹀鎴风灏嗚嚦灏戞瘡绉掗挓缁х画鍚慹tcd鏈嶅姟鍣ㄥ彂閫佷繚鎸佹椿鍔ㄨ姹傦紝鐩村埌鑾峰彇鏈�鏂扮殑鍝嶅簲涓烘銆� +// // etcd client浼氳嚜鍔ㄥ彂閫乼tl鍒癳tcd server锛屼粠鑰屼繚璇佽绉熺害涓�鐩存湁鏁� +// leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID) +// if err != nil { +// return err +// } +// log.Printf("777777777777777777 %v", leaseRespChan) +// +// s.leaseID = resp.ID +// s.keepAliveChan = leaseRespChan +// +// return nil +//} +// +//// ListenLeaseRespChan 鐩戝惉 缁鎯呭喌 +//func (s *ServiceRegister) ListenLeaseRespChan() { +// for _ = range s.keepAliveChan { +// } +// +// //for leaseKeepResp := range s.keepAliveChan { +// // fmt.Println("缁害鎴愬姛", leaseKeepResp) +// //} +// +// //fmt.Println("鍏抽棴缁") +//} +// +//// Close 娉ㄩ攢鏈嶅姟 +//func (s *ServiceRegister) Close() error { +// // 鎾ら攢绉熺害 +// if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil { +// return err +// } +// +// //fmt.Println("鎾ら攢绉熺害") +// return s.cli.Close() +//} +// +//// GetServeAddr 鑾峰彇鏈満鍦板潃 +//func GetServeAddr(addr string) (string, error) { +// conn, err := net.Dial("udp", addr) // if err != nil { // return "", err // } // -// // 鑾峰彇ETCD涓昏妭鐐圭殑鍦板潃 -// pods, err := clientset.CoreV1().Pods("kube-system").List(context.TODO(), metav1.ListOptions{ -// LabelSelector: "component=etcd", +// conn.Close() +// +// localAddr := conn.LocalAddr().(*net.UDPAddr) +// //ip := strings.Split(localAddr.String(), ":")[0] +// +// fmt.Println("++++++++ ip: ", localAddr.IP.String()) +// return localAddr.IP.String(), nil +//} +// +//func SetEtcdData(key string, value string) { +// cli, err := clientv3.New(clientv3.Config{ +// Endpoints: []string{"192.168.20.119:2379"}, +// DialTimeout: 5 * time.Second, // }) // if err != nil { -// return "", err +// // handle error! +// fmt.Printf("connect to etcd failed, err:%v\n", err) +// return // } -// -// if len(pods.Items) == 0 { -// return "", fmt.Errorf("no ETCD pods found") +// fmt.Println("connect to etcd success") +// defer cli.Close() +// // put +// ctx, cancel := context.WithTimeout(context.Background(), time.Second) +// _, err = cli.Put(ctx, key, value) +// cancel() +// if err != nil { +// fmt.Printf("put to etcd failed, err:%v\n", err) +// return // } -// -// leaderEndpoint := pods.Items[0].Status.PodIP + ":2379" -// return leaderEndpoint, nil //} +// +////func GetETCDLeaderEndpoint() (string, error) { +//// // 鑾峰彇kubeconfig閰嶇疆鏂囦欢璺緞 +//// clientset, err := util.GetClient() +//// if err != nil { +//// return "", err +//// } +//// +//// // 鑾峰彇ETCD涓昏妭鐐圭殑鍦板潃 +//// pods, err := clientset.CoreV1().Pods("kube-system").List(context.TODO(), metav1.ListOptions{ +//// LabelSelector: "component=etcd", +//// }) +//// if err != nil { +//// return "", err +//// } +//// +//// if len(pods.Items) == 0 { +//// return "", fmt.Errorf("no ETCD pods found") +//// } +//// +//// leaderEndpoint := pods.Items[0].Status.PodIP + ":2379" +//// return leaderEndpoint, nil +////} -- Gitblit v1.8.0