package k8s import ( "context" "flag" "fmt" "log" "math/rand" "path/filepath" "strconv" "basic.com/aps/aps_deploy.git/src/util" appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" ) var ( replicas int32 = 1 port int32 = 9081 rpcPort int32 = 9091 namespaces = []string{"testFactory1", "testFactory2", "testFactory3"} usedNodePorts = make(map[int32]bool) Projects = []string{ProjectAps, ProjectCrm, ProjectSrm, ProjectWms} ) const ( ProjectAps = "aps" ProjectCrm = "crm" ProjectSrm = "srm" ProjectWms = "wms" ) type Config struct { Client *kubernetes.Clientset // Kubernetes 客户端 Image string // 镜像名称 DBHost string // 数据库地址 DBName string //数据库名称 HttpPort int32 // HTTP 端口 RpcPort int32 // RPC 端口 NameSpace string // Namespace DeploymentName string // Deployment 名称 ServiceName string // Service 名称 ALHost string // 算法Host Host string // 服务Host NodeID string // Nsq节点ID HttpNodePort int32 // HTTP nodePort端口 RpcNodePort int32 // RPC nodePort端口 } func create_test() { // 配置 Kubernetes 集群的 kubeconfig 路径 kubeconfig := flag.String("kubeconfig", filepath.Join(util.HomeDir(), ".kube", "config"), "kubeconfig file") flag.Parse() // 创建 Kubernetes 客户端 config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err.Error()) } clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } // 创建多个 Namespace 下的相同名称的 Deployment 和 Service for _, ns := range namespaces { err = CreateNamespace(clientset, ns) if err != nil { panic(err) } for _, proj := range Projects { obj := GetCreateObj(proj) err = obj.CreateDeploymentAndService(Config{Client: clientset, NameSpace: ns, DeploymentName: proj, ServiceName: proj}, nil) if err != nil { panic(err) } } nodeport, err := GetServiceNodePort(clientset, ns, ns) if err != nil { panic(err) } log.Printf("Service NodePort: %d\n", nodeport) } } func GetCreateObj(proj string) Create { switch proj { case ProjectAps: return &ApsCreate{} case ProjectCrm: return &CrmCreate{} case ProjectSrm: return &SrmCreate{} case ProjectWms: return &WmsCreate{} } return nil } // CheckAndDeleteResources 检测并删除指定的 Namespace、Deployment 和 Service func CheckAndDeleteResources(clientset *kubernetes.Clientset, namespace, deploymentName, serviceName string) error { fmt.Println("\033[1;37;40mChecking and deleting resources in Namespace:", namespace, "\033[0m") // 检查并删除 Deployment err := deleteDeploymentIfExists(clientset, namespace, deploymentName) if err != nil { return err } // 检查并删除 Service err = deleteServiceIfExists(clientset, namespace, serviceName) if err != nil { return err } return nil } // deleteDeploymentIfExists 检查并删除指定的 Deployment(如果存在) func deleteDeploymentIfExists(clientset *kubernetes.Clientset, namespace, deploymentName string) error { fmt.Println("\033[1;37;40mChecking and deleting Deployment:", deploymentName, "\033[0m") err := clientset.AppsV1().Deployments(namespace).Delete(context.TODO(), deploymentName, metav1.DeleteOptions{}) if err != nil { if errors.IsNotFound(err) { fmt.Printf("Deployment %s not found in Namespace %s\n", deploymentName, namespace) } else { return fmt.Errorf("failed to delete Deployment: %v", err) } } else { fmt.Printf("Deployment %s deleted from Namespace %s\n", deploymentName, namespace) } return nil } // deleteServiceIfExists 检查并删除指定的 Service(如果存在) func deleteServiceIfExists(clientset *kubernetes.Clientset, namespace, serviceName string) error { fmt.Println("\033[1;37;40mChecking and deleting Service:", serviceName, "\033[0m") err := clientset.CoreV1().Services(namespace).Delete(context.TODO(), serviceName, metav1.DeleteOptions{}) if err != nil { if errors.IsNotFound(err) { fmt.Printf("Service %s not found in Namespace %s\n", serviceName, namespace) } else { return fmt.Errorf("failed to delete Service: %v", err) } } else { fmt.Printf("Service %s deleted from Namespace %s\n", serviceName, namespace) } return nil } // CreateNamespace 创建指定的 Namespace func CreateNamespace(clientset *kubernetes.Clientset, namespace string) error { log.Printf("Creating Namespace: %s\n", namespace) ns := &apiv1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: namespace, }, } _, err := clientset.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{}) if err != nil { if !errors.IsAlreadyExists(err) { return fmt.Errorf("failed to create Namespace: %w", err) } log.Printf("Namespace %s already exists\n", namespace) } else { log.Printf("Namespace %s created\n", namespace) } return nil } func getTwoNodePort(client *kubernetes.Clientset) (nodePort1, nodePort2 int32, err error) { nodePort1, err = getRandomNodePort(client) if err != nil { return 0, 0, err } for { nodePort2, err = getRandomNodePort(client) if err != nil { return 0, 0, err } if nodePort2 != nodePort1 { break } } return nodePort1, nodePort2, nil } // getRandomNodePort 获取一个未使用的随机 NodePort func getRandomNodePort(clientset *kubernetes.Clientset) (int32, error) { // 获取一个随机的 NodePort nodePort := int32(0) for { // 生成一个随机的 NodePort nodePort = generateRandomNodePort() // 检查该 NodePort 是否已被使用 used, err := isNodePortUsed(clientset, nodePort) if err != nil { return 0, err } // 如果未被使用,则标记为已使用,并退出循环 if !used { usedNodePorts[nodePort] = true break } } return nodePort, nil } // generateRandomNodePort 生成一个随机的 NodePort func generateRandomNodePort() int32 { // 在范围 30000-32767 中生成随机数作为 NodePort return int32(rand.Intn(32767-30000+1) + 30000) } // isNodePortUsed 检查指定的 NodePort 是否已被使用 func isNodePortUsed(clientset *kubernetes.Clientset, nodePort int32) (bool, error) { // 获取所有 Service services, err := clientset.CoreV1().Services("").List(context.TODO(), metav1.ListOptions{}) if err != nil { return false, err } // 检查每个 Service 的 NodePort 是否与指定的 NodePort 相同 for _, svc := range services.Items { for _, port := range svc.Spec.Ports { if port.NodePort == nodePort { return true, nil } } } return false, nil } // GetServiceNodePort 获取指定 Service 的 NodePort func GetServiceNodePort(clientset *kubernetes.Clientset, namespace, serviceName string) (int32, error) { svc, err := clientset.CoreV1().Services(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{}) if err != nil { return 0, fmt.Errorf("failed to get Service: %v", err) } // 检查 Service 类型是否为 NodePort if svc.Spec.Type != apiv1.ServiceTypeNodePort { return 0, fmt.Errorf("Service %s is not of type NodePort", serviceName) } // 获取第一个端口的 NodePort if len(svc.Spec.Ports) > 0 { for _, p := range svc.Spec.Ports { // return tcp port if p.Name == "http" { return p.NodePort, nil } } } return 0, fmt.Errorf("no ports defined for Service %s", serviceName) } type Create interface { CreateDeploymentAndService(config Config, extendEnv map[string]string) error CreateEnv(config Config, pairs map[string]string) []apiv1.EnvVar CreateDeployment(config Config, extendEnv map[string]string) error CreateService(config Config) error GetCid(config Config) string } type ApsCreate struct{} func (c *ApsCreate) CreateDeploymentAndService(config Config, extendEnv map[string]string) error { fmt.Println("\033[1;37;40mCreating resources in Namespace:", config.NameSpace, "\033[0m") // 检测并删除已存在的 Deployment 和 Service err := CheckAndDeleteResources(config.Client, config.NameSpace, config.DeploymentName, config.ServiceName) if err != nil { return err } config.HttpNodePort, config.RpcNodePort, err = getTwoNodePort(config.Client) if err != nil { return err } // 创建 Deployment err = c.CreateDeployment(config, extendEnv) if err != nil { return err } log.Printf("Waiting for Deployment %s to be ready...\n", config.DeploymentName) // 创建 Service err = c.CreateService(config) if err != nil { return err } return nil } func (c *ApsCreate) CreateEnv(config Config, pairs map[string]string) []apiv1.EnvVar { envs := []apiv1.EnvVar{ { Name: "GRPC_PORT", Value: fmt.Sprint(config.RpcPort), }, { Name: "DB_HOST", //ValueFrom: &apiv1.EnvVarSource{ // FieldRef: &apiv1.ObjectFieldSelector{ // APIVersion: "v1", // FieldPath: "status.hostIP", // }, //}, Value: config.DBHost, }, { Name: "HOST", ValueFrom: &apiv1.EnvVarSource{ FieldRef: &apiv1.ObjectFieldSelector{ APIVersion: "v1", FieldPath: "status.hostIP", }, }, }, { Name: "AL_HOST", Value: config.ALHost, }, { Name: "GRPC_NODE_PORT", Value: strconv.Itoa(int(config.RpcNodePort)), }, { Name: "NODE_ID", Value: config.NodeID, }, { Name: "DB_NAME", Value: config.NameSpace, }, { Name: "DB_PORT", Value: strconv.Itoa(3306), }, { Name: "DB_USER", Value: config.NameSpace, }, { Name: "DB_PASSWD", Value: config.NameSpace + "@Basic2023", }, } for name, value := range pairs { envs = append(envs, apiv1.EnvVar{ Name: name, Value: value, }) } return envs } func (c *ApsCreate) CreateDeployment(config Config, extendEnv map[string]string) error { fmt.Println("\033[1;37;40mCreating Deployment:", config.DeploymentName, "\033[0m") log.Printf("Aps CreateDeployment config:%+v\n", config) envs := c.CreateEnv(config, extendEnv) deployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: config.DeploymentName, }, Spec: appsv1.DeploymentSpec{ Replicas: &replicas, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "cid": c.GetCid(config), }, }, Template: apiv1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "cid": c.GetCid(config), }, }, Spec: apiv1.PodSpec{ TopologySpreadConstraints: []apiv1.TopologySpreadConstraint{ { MaxSkew: 1, TopologyKey: "kubernetes.io/hostname", WhenUnsatisfiable: apiv1.DoNotSchedule, LabelSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "cid": c.GetCid(config), }, }, }, }, Containers: []apiv1.Container{ { Name: config.NameSpace, Image: config.Image, Env: envs, ImagePullPolicy: apiv1.PullIfNotPresent, // 设置镜像拉取策略为 Always }, }, }, }, }, } _, err := config.Client.AppsV1().Deployments(config.NameSpace).Create(context.TODO(), deployment, metav1.CreateOptions{}) if err != nil { if !errors.IsAlreadyExists(err) { return fmt.Errorf("failed to create Deployment: %v", err) } fmt.Printf("Deployment %s already exists in Namespace %s\n", config.DeploymentName, config.NameSpace) } else { fmt.Printf("Deployment %s created in Namespace %s\n", config.DeploymentName, config.NameSpace) } return nil } // createService 创建指定的 Service func (c *ApsCreate) CreateService(config Config) error { fmt.Println("\033[1;37;40mCreating Service:", config.ServiceName, "\033[0m") service := &apiv1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: config.ServiceName, }, Spec: apiv1.ServiceSpec{ Selector: map[string]string{ "cid": c.GetCid(config), }, Type: apiv1.ServiceTypeNodePort, Ports: []apiv1.ServicePort{ { Name: "http", Protocol: apiv1.ProtocolTCP, Port: config.HttpPort, // 集群内部访问端口 TargetPort: intstr.FromInt(int(config.HttpPort)), // 容器对外端口 NodePort: config.HttpNodePort, // 外部访问端口 }, { Name: "tcp", Protocol: apiv1.ProtocolTCP, Port: config.RpcPort, TargetPort: intstr.FromInt(int(config.RpcPort)), NodePort: config.RpcNodePort, }, }, SessionAffinity: apiv1.ServiceAffinityClientIP, }, } _, err := config.Client.CoreV1().Services(config.NameSpace).Create(context.TODO(), service, metav1.CreateOptions{}) if err != nil { if !errors.IsAlreadyExists(err) { return fmt.Errorf("failed to create Service: %v", err) } log.Printf("Service %s already exists in Namespace %s\n", config.ServiceName, config.NameSpace) } else { log.Printf("Service %s created in Namespace %s\n", config.ServiceName, config.NameSpace) } return nil } func (c *ApsCreate) GetCid(config Config) string { return fmt.Sprintf("%v-%v", config.NameSpace, "aps") }