package k8s import ( "context" "flag" "fmt" "log" "math/rand" "path/filepath" "strconv" "basic.com/aps/aps_deploy.git/src/util" appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" ) var ( replicas int32 = 3 port int32 = 9081 rpcPort int32 = 9091 namespaces = []string{"guangsheng", "geruimi", "tongsheng"} usedNodePorts = make(map[int32]bool) ) func create_test() { // 配置 Kubernetes 集群的 kubeconfig 路径 kubeconfig := flag.String("kubeconfig", filepath.Join(util.HomeDir(), ".kube", "config"), "kubeconfig file") flag.Parse() // 创建 Kubernetes 客户端 config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err.Error()) } clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } // 创建多个 Namespace 下的相同名称的 Deployment 和 Service for _, ns := range namespaces { err = CreateDeploymentAndService(clientset, ns, ns, ns) if err != nil { panic(err) } nodeport, err := GetServiceNodePort(clientset, ns, ns) if err != nil { panic(err) } log.Printf("Service NodePort: %d\n", nodeport) } } // CheckAndDeleteResources 检测并删除指定的 Namespace、Deployment 和 Service func CheckAndDeleteResources(clientset *kubernetes.Clientset, namespace, deploymentName, serviceName string) error { fmt.Println("\033[1;37;40mChecking and deleting resources in Namespace:", namespace, "\033[0m") // 检查并删除 Deployment err := deleteDeploymentIfExists(clientset, namespace, deploymentName) if err != nil { return err } // 检查并删除 Service err = deleteServiceIfExists(clientset, namespace, serviceName) if err != nil { return err } return nil } // deleteDeploymentIfExists 检查并删除指定的 Deployment(如果存在) func deleteDeploymentIfExists(clientset *kubernetes.Clientset, namespace, deploymentName string) error { fmt.Println("\033[1;37;40mChecking and deleting Deployment:", deploymentName, "\033[0m") err := clientset.AppsV1().Deployments(namespace).Delete(context.TODO(), deploymentName, metav1.DeleteOptions{}) if err != nil { if errors.IsNotFound(err) { fmt.Printf("Deployment %s not found in Namespace %s\n", deploymentName, namespace) } else { return fmt.Errorf("failed to delete Deployment: %v", err) } } else { fmt.Printf("Deployment %s deleted from Namespace %s\n", deploymentName, namespace) } return nil } // deleteServiceIfExists 检查并删除指定的 Service(如果存在) func deleteServiceIfExists(clientset *kubernetes.Clientset, namespace, serviceName string) error { fmt.Println("\033[1;37;40mChecking and deleting Service:", serviceName, "\033[0m") err := clientset.CoreV1().Services(namespace).Delete(context.TODO(), serviceName, metav1.DeleteOptions{}) if err != nil { if errors.IsNotFound(err) { fmt.Printf("Service %s not found in Namespace %s\n", serviceName, namespace) } else { return fmt.Errorf("failed to delete Service: %v", err) } } else { fmt.Printf("Service %s deleted from Namespace %s\n", serviceName, namespace) } return nil } func CreateDeploymentAndService(clientset *kubernetes.Clientset, namespace, deploymentName, serviceName string) error { fmt.Println("\033[1;37;40mCreating resources in Namespace:", namespace, "\033[0m") // 检测并删除已存在的 Deployment 和 Service err := CheckAndDeleteResources(clientset, namespace, deploymentName, serviceName) if err != nil { return err } // 创建 Namespace err = createNamespace(clientset, namespace) if err != nil { return err } nodePort1, nodePort2, err := getTwoNodePort(clientset) if err != nil { return err } port1 := fmt.Sprint(nodePort1) port2 := fmt.Sprint(nodePort2) // 创建 Deployment err = createDeployment(clientset, namespace, deploymentName, port1, port2) if err != nil { return err } log.Printf("Waiting for Deployment %s to be ready...\n", deploymentName) // 创建 Service err = createService(clientset, namespace, serviceName, nodePort1, nodePort2) if err != nil { return err } return nil } // createNamespace 创建指定的 Namespace func createNamespace(clientset *kubernetes.Clientset, namespace string) error { log.Printf("Creating Namespace: %s\n", namespace) ns := &apiv1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: namespace, }, } _, err := clientset.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{}) if err != nil { if !errors.IsAlreadyExists(err) { return fmt.Errorf("failed to create Namespace: %w", err) } log.Printf("Namespace %s already exists\n", namespace) } else { log.Printf("Namespace %s created\n", namespace) } return nil } func createDeployment(clientset *kubernetes.Clientset, namespace, deploymentName, port1, port2 string) error { fmt.Println("\033[1;37;40mCreating Deployment:", deploymentName, "\033[0m") deployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: deploymentName, }, Spec: appsv1.DeploymentSpec{ Replicas: &replicas, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "cid": namespace, }, }, Template: apiv1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "cid": namespace, }, }, Spec: apiv1.PodSpec{ TopologySpreadConstraints: []apiv1.TopologySpreadConstraint{ { MaxSkew: 1, TopologyKey: "kubernetes.io/hostname", WhenUnsatisfiable: apiv1.DoNotSchedule, LabelSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "cid": namespace, }, }, }, }, Containers: []apiv1.Container{ { Name: namespace, Image: "192.168.20.119/apsserver/apsserver:v0.5", Env: []apiv1.EnvVar{ { Name: "NODE_PORT1", Value: port1, }, { Name: "NODE_PORT2", Value: port2, }, { Name: "DB_HOST", Value: "localhost", }, { Name: "DB_NAME", Value: namespace, }, { Name: "DB_PORT", Value: strconv.Itoa(3306), }, { Name: "DB_USER", Value: namespace, }, { Name: "DB_PASSWD", Value: namespace + "@Basic2023", }, }, ImagePullPolicy: apiv1.PullAlways, // 设置镜像拉取策略为 Always }, }, }, }, }, } _, err := clientset.AppsV1().Deployments(namespace).Create(context.TODO(), deployment, metav1.CreateOptions{}) if err != nil { if !errors.IsAlreadyExists(err) { return fmt.Errorf("failed to create Deployment: %v", err) } fmt.Printf("Deployment %s already exists in Namespace %s\n", deploymentName, namespace) } else { fmt.Printf("Deployment %s created in Namespace %s\n", deploymentName, namespace) } return nil } // createService 创建指定的 Service func createService(clientset *kubernetes.Clientset, namespace, serviceName string, port1, port2 int32) error { fmt.Println("\033[1;37;40mCreating Service:", serviceName, "\033[0m") service := &apiv1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName, }, Spec: apiv1.ServiceSpec{ Selector: map[string]string{ "cid": namespace, }, Type: apiv1.ServiceTypeNodePort, Ports: []apiv1.ServicePort{ { Name: "http", Protocol: apiv1.ProtocolTCP, Port: port, // 集群内部访问端口 TargetPort: intstr.FromInt(int(port)), // 容器对外端口 NodePort: port1, // 外部访问端口 }, { Name: "tcp", Protocol: apiv1.ProtocolTCP, Port: rpcPort, TargetPort: intstr.FromInt(int(rpcPort)), NodePort: port2, }, }, }, } _, err := clientset.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{}) if err != nil { if !errors.IsAlreadyExists(err) { return fmt.Errorf("failed to create Service: %v", err) } log.Printf("Service %s already exists in Namespace %s\n", serviceName, namespace) } else { log.Printf("Service %s created in Namespace %s\n", serviceName, namespace) } return nil } func getTwoNodePort(client *kubernetes.Clientset) (nodePort1, nodePort2 int32, err error) { nodePort1, err = getRandomNodePort(client) if err != nil { return 0, 0, err } for { nodePort2, err = getRandomNodePort(client) if err != nil { return 0, 0, err } if nodePort2 != nodePort1 { break } } return nodePort1, nodePort2, nil } // getRandomNodePort 获取一个未使用的随机 NodePort func getRandomNodePort(clientset *kubernetes.Clientset) (int32, error) { // 获取一个随机的 NodePort nodePort := int32(0) for { // 生成一个随机的 NodePort nodePort = generateRandomNodePort() // 检查该 NodePort 是否已被使用 used, err := isNodePortUsed(clientset, nodePort) if err != nil { return 0, err } // 如果未被使用,则标记为已使用,并退出循环 if !used { usedNodePorts[nodePort] = true break } } return nodePort, nil } // generateRandomNodePort 生成一个随机的 NodePort func generateRandomNodePort() int32 { // 在范围 30000-32767 中生成随机数作为 NodePort return int32(rand.Intn(32767-30000+1) + 30000) } // isNodePortUsed 检查指定的 NodePort 是否已被使用 func isNodePortUsed(clientset *kubernetes.Clientset, nodePort int32) (bool, error) { // 获取所有 Service services, err := clientset.CoreV1().Services("").List(context.TODO(), metav1.ListOptions{}) if err != nil { return false, err } // 检查每个 Service 的 NodePort 是否与指定的 NodePort 相同 for _, svc := range services.Items { for _, port := range svc.Spec.Ports { if port.NodePort == nodePort { return true, nil } } } return false, nil } // GetServiceNodePort 获取指定 Service 的 NodePort func GetServiceNodePort(clientset *kubernetes.Clientset, namespace, serviceName string) (int32, error) { svc, err := clientset.CoreV1().Services(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{}) if err != nil { return 0, fmt.Errorf("failed to get Service: %v", err) } // 检查 Service 类型是否为 NodePort if svc.Spec.Type != apiv1.ServiceTypeNodePort { return 0, fmt.Errorf("Service %s is not of type NodePort", serviceName) } // 获取第一个端口的 NodePort if len(svc.Spec.Ports) > 0 { for _, p := range svc.Spec.Ports { // return tcp port if p.Name == "tcp" { return p.NodePort, nil } } } return 0, fmt.Errorf("no ports defined for Service %s", serviceName) }