zhangqian
2024-01-11 4b717f9a0ed341163f4891f3007f990f658262d3
src/k8s/create.go
@@ -20,11 +20,19 @@
)
var (
   replicas      int32 = 3
   replicas      int32 = 1
   port          int32 = 9081
   rpcPort       int32 = 9091
   namespaces          = []string{"guangsheng", "geruimi", "tongsheng"}
   namespaces          = []string{"testFactory1", "testFactory2", "testFactory3"}
   usedNodePorts       = make(map[int32]bool)
   Projects            = []string{ProjectAps, ProjectCrm, ProjectSrm, ProjectWms}
)
const (
   ProjectAps = "aps"
   ProjectCrm = "crm"
   ProjectSrm = "srm"
   ProjectWms = "wms"
)
type Config struct {
@@ -40,6 +48,8 @@
   ALHost         string                // 算法Host
   Host           string                // 服务Host
   NodeID         string                // Nsq节点ID
   HttpNodePort   int32                 // HTTP nodePort端口
   RpcNodePort    int32                 // RPC nodePort端口
}
func create_test() {
@@ -60,9 +70,16 @@
   // 创建多个 Namespace 下的相同名称的 Deployment 和 Service
   for _, ns := range namespaces {
      err = CreateDeploymentAndService(Config{Client: clientset, NameSpace: ns, DeploymentName: ns, ServiceName: ns})
      err = CreateNamespace(clientset, ns)
      if err != nil {
         panic(err)
      }
      for _, proj := range Projects {
         obj := GetCreateObj(proj)
         err = obj.CreateDeploymentAndService(Config{Client: clientset, NameSpace: ns, DeploymentName: proj, ServiceName: proj}, nil)
         if err != nil {
            panic(err)
         }
      }
      nodeport, err := GetServiceNodePort(clientset, ns, ns)
@@ -71,7 +88,22 @@
      }
      log.Printf("Service NodePort: %d\n", nodeport)
   }
}
func GetCreateObj(proj string) Create {
   switch proj {
   case ProjectAps:
      return &ApsCreate{}
   case ProjectCrm:
      return &CrmCreate{}
   case ProjectSrm:
      return &SrmCreate{}
   case ProjectWms:
      return &WmsCreate{}
   }
   return nil
}
// CheckAndDeleteResources 检测并删除指定的 Namespace、Deployment 和 Service
@@ -129,36 +161,6 @@
   return nil
}
func CreateDeploymentAndService(config Config) error {
   fmt.Println("\033[1;37;40mCreating resources in Namespace:", config.NameSpace, "\033[0m")
   // 检测并删除已存在的 Deployment 和 Service
   err := CheckAndDeleteResources(config.Client, config.NameSpace, config.DeploymentName, config.ServiceName)
   if err != nil {
      return err
   }
   config.HttpPort, config.RpcPort, err = getTwoNodePort(config.Client)
   if err != nil {
      return err
   }
   // 创建 Deployment
   err = createDeployment(config)
   if err != nil {
      return err
   }
   log.Printf("Waiting for Deployment %s to be ready...\n", config.DeploymentName)
   // 创建 Service
   err = createService(config)
   if err != nil {
      return err
   }
   return nil
}
// CreateNamespace 创建指定的 Namespace
func CreateNamespace(clientset *kubernetes.Clientset, namespace string) error {
   log.Printf("Creating Namespace: %s\n", namespace)
@@ -177,167 +179,6 @@
      log.Printf("Namespace %s already exists\n", namespace)
   } else {
      log.Printf("Namespace %s created\n", namespace)
   }
   return nil
}
func createDeployment(config Config) error {
   fmt.Println("\033[1;37;40mCreating Deployment:", config.DeploymentName, "\033[0m")
   envs := []apiv1.EnvVar{
      {
         Name:  "GRPC_PORT",
         Value: fmt.Sprint(config.RpcPort),
      },
      {
         Name: "DB_HOST",
         ValueFrom: &apiv1.EnvVarSource{
            FieldRef: &apiv1.ObjectFieldSelector{
               APIVersion: "v1",
               FieldPath:  "status.hostIP",
            },
         },
      },
      {
         Name: "HOST",
         ValueFrom: &apiv1.EnvVarSource{
            FieldRef: &apiv1.ObjectFieldSelector{
               APIVersion: "v1",
               FieldPath:  "status.hostIP",
            },
         },
      },
      {
         Name:  "DB_NAME",
         Value: config.DBName,
      },
      {
         Name:  "DB_PORT",
         Value: strconv.Itoa(6446),
      },
      {
         Name:  "DB_USER",
         Value: config.NameSpace,
      },
      {
         Name:  "DB_PASSWD",
         Value: config.NameSpace + "@Basic2023",
      },
   }
   if config.ALHost != "" {
      envs = append(envs, apiv1.EnvVar{
         Name:  "AL_HOST",
         Value: config.ALHost,
      })
   }
   if config.NodeID != "" {
      envs = append(envs, apiv1.EnvVar{
         Name:  "NODE_ID",
         Value: config.NodeID,
      })
   }
   deployment := &appsv1.Deployment{
      ObjectMeta: metav1.ObjectMeta{
         Name: config.DeploymentName,
      },
      Spec: appsv1.DeploymentSpec{
         Replicas: &replicas,
         Selector: &metav1.LabelSelector{
            MatchLabels: map[string]string{
               "cid": config.NameSpace,
            },
         },
         Template: apiv1.PodTemplateSpec{
            ObjectMeta: metav1.ObjectMeta{
               Labels: map[string]string{
                  "cid": config.NameSpace,
               },
            },
            Spec: apiv1.PodSpec{
               TopologySpreadConstraints: []apiv1.TopologySpreadConstraint{
                  {
                     MaxSkew:           1,
                     TopologyKey:       "kubernetes.io/hostname",
                     WhenUnsatisfiable: apiv1.DoNotSchedule,
                     LabelSelector: &metav1.LabelSelector{
                        MatchLabels: map[string]string{
                           "cid": config.NameSpace,
                        },
                     },
                  },
               },
               Containers: []apiv1.Container{
                  {
                     Name:            config.NameSpace,
                     Image:           config.Image,
                     Env:             envs,
                     ImagePullPolicy: apiv1.PullAlways, // 设置镜像拉取策略为 Always
                  },
               },
            },
         },
      },
   }
   _, err := config.Client.AppsV1().Deployments(config.DeploymentName).Create(context.TODO(), deployment, metav1.CreateOptions{})
   if err != nil {
      if !errors.IsAlreadyExists(err) {
         return fmt.Errorf("failed to create Deployment: %v", err)
      }
      fmt.Printf("Deployment %s already exists in Namespace %s\n", config.DeploymentName, config.NameSpace)
   } else {
      fmt.Printf("Deployment %s created in Namespace %s\n", config.DeploymentName, config.NameSpace)
   }
   return nil
}
// createService 创建指定的 Service
func createService(config Config) error {
   fmt.Println("\033[1;37;40mCreating Service:", config.ServiceName, "\033[0m")
   service := &apiv1.Service{
      ObjectMeta: metav1.ObjectMeta{
         Name: config.ServiceName,
      },
      Spec: apiv1.ServiceSpec{
         Selector: map[string]string{
            "cid": config.NameSpace,
         },
         Type: apiv1.ServiceTypeNodePort,
         Ports: []apiv1.ServicePort{
            {
               Name:       "http",
               Protocol:   apiv1.ProtocolTCP,
               Port:       port,                      // 集群内部访问端口
               TargetPort: intstr.FromInt(int(port)), // 容器对外端口
               NodePort:   config.HttpPort,           // 外部访问端口
            },
            {
               Name:       "tcp",
               Protocol:   apiv1.ProtocolTCP,
               Port:       rpcPort,
               TargetPort: intstr.FromInt(int(rpcPort)),
               NodePort:   config.RpcPort,
            },
         },
         SessionAffinity: apiv1.ServiceAffinityClientIP,
      },
   }
   _, err := config.Client.CoreV1().Services(config.ServiceName).Create(context.TODO(), service, metav1.CreateOptions{})
   if err != nil {
      if !errors.IsAlreadyExists(err) {
         return fmt.Errorf("failed to create Service: %v", err)
      }
      log.Printf("Service %s already exists in Namespace %s\n", config.ServiceName, config.NameSpace)
   } else {
      log.Printf("Service %s created in Namespace %s\n", config.ServiceName, config.NameSpace)
   }
   return nil
@@ -439,3 +280,217 @@
   return 0, fmt.Errorf("no ports defined for Service %s", serviceName)
}
type Create interface {
   CreateDeploymentAndService(config Config, extendEnv map[string]string) error
   CreateEnv(config Config, pairs map[string]string) []apiv1.EnvVar
   CreateDeployment(config Config, extendEnv map[string]string) error
   CreateService(config Config) error
   GetCid(config Config) string
}
type ApsCreate struct{}
func (c *ApsCreate) CreateDeploymentAndService(config Config, extendEnv map[string]string) error {
   fmt.Println("\033[1;37;40mCreating resources in Namespace:", config.NameSpace, "\033[0m")
   // 检测并删除已存在的 Deployment 和 Service
   err := CheckAndDeleteResources(config.Client, config.NameSpace, config.DeploymentName, config.ServiceName)
   if err != nil {
      return err
   }
   config.HttpNodePort, config.RpcNodePort, err = getTwoNodePort(config.Client)
   if err != nil {
      return err
   }
   // 创建 Deployment
   err = c.CreateDeployment(config, extendEnv)
   if err != nil {
      return err
   }
   log.Printf("Waiting for Deployment %s to be ready...\n", config.DeploymentName)
   // 创建 Service
   err = c.CreateService(config)
   if err != nil {
      return err
   }
   return nil
}
func (c *ApsCreate) CreateEnv(config Config, pairs map[string]string) []apiv1.EnvVar {
   envs := []apiv1.EnvVar{
      {
         Name:  "GRPC_PORT",
         Value: fmt.Sprint(config.RpcPort),
      },
      {
         Name: "DB_HOST",
         //ValueFrom: &apiv1.EnvVarSource{
         //   FieldRef: &apiv1.ObjectFieldSelector{
         //      APIVersion: "v1",
         //      FieldPath:  "status.hostIP",
         //   },
         //},
         Value: config.DBHost,
      },
      {
         Name: "HOST",
         ValueFrom: &apiv1.EnvVarSource{
            FieldRef: &apiv1.ObjectFieldSelector{
               APIVersion: "v1",
               FieldPath:  "status.hostIP",
            },
         },
      },
      {
         Name:  "AL_HOST",
         Value: config.ALHost,
      },
      {
         Name:  "GRPC_NODE_PORT",
         Value: strconv.Itoa(int(config.RpcNodePort)),
      },
      {
         Name:  "NODE_ID",
         Value: config.NodeID,
      },
      {
         Name:  "DB_NAME",
         Value: config.NameSpace,
      },
      {
         Name:  "DB_PORT",
         Value: strconv.Itoa(3306),
      },
      {
         Name:  "DB_USER",
         Value: config.NameSpace,
      },
      {
         Name:  "DB_PASSWD",
         Value: config.NameSpace + "@Basic2023",
      },
   }
   for name, value := range pairs {
      envs = append(envs, apiv1.EnvVar{
         Name:  name,
         Value: value,
      })
   }
   return envs
}
func (c *ApsCreate) CreateDeployment(config Config, extendEnv map[string]string) error {
   fmt.Println("\033[1;37;40mCreating Deployment:", config.DeploymentName, "\033[0m")
   log.Printf("Aps CreateDeployment config:%+v\n", config)
   envs := c.CreateEnv(config, extendEnv)
   deployment := &appsv1.Deployment{
      ObjectMeta: metav1.ObjectMeta{
         Name: config.DeploymentName,
      },
      Spec: appsv1.DeploymentSpec{
         Replicas: &replicas,
         Selector: &metav1.LabelSelector{
            MatchLabels: map[string]string{
               "cid": c.GetCid(config),
            },
         },
         Template: apiv1.PodTemplateSpec{
            ObjectMeta: metav1.ObjectMeta{
               Labels: map[string]string{
                  "cid": c.GetCid(config),
               },
            },
            Spec: apiv1.PodSpec{
               TopologySpreadConstraints: []apiv1.TopologySpreadConstraint{
                  {
                     MaxSkew:           1,
                     TopologyKey:       "kubernetes.io/hostname",
                     WhenUnsatisfiable: apiv1.DoNotSchedule,
                     LabelSelector: &metav1.LabelSelector{
                        MatchLabels: map[string]string{
                           "cid": c.GetCid(config),
                        },
                     },
                  },
               },
               Containers: []apiv1.Container{
                  {
                     Name:            config.NameSpace,
                     Image:           config.Image,
                     Env:             envs,
                     ImagePullPolicy: apiv1.PullIfNotPresent, // 设置镜像拉取策略为 Always
                  },
               },
            },
         },
      },
   }
   _, err := config.Client.AppsV1().Deployments(config.NameSpace).Create(context.TODO(), deployment, metav1.CreateOptions{})
   if err != nil {
      if !errors.IsAlreadyExists(err) {
         return fmt.Errorf("failed to create Deployment: %v", err)
      }
      fmt.Printf("Deployment %s already exists in Namespace %s\n", config.DeploymentName, config.NameSpace)
   } else {
      fmt.Printf("Deployment %s created in Namespace %s\n", config.DeploymentName, config.NameSpace)
   }
   return nil
}
// createService 创建指定的 Service
func (c *ApsCreate) CreateService(config Config) error {
   fmt.Println("\033[1;37;40mCreating Service:", config.ServiceName, "\033[0m")
   service := &apiv1.Service{
      ObjectMeta: metav1.ObjectMeta{
         Name: config.ServiceName,
      },
      Spec: apiv1.ServiceSpec{
         Selector: map[string]string{
            "cid": c.GetCid(config),
         },
         Type: apiv1.ServiceTypeNodePort,
         Ports: []apiv1.ServicePort{
            {
               Name:       "http",
               Protocol:   apiv1.ProtocolTCP,
               Port:       config.HttpPort,                      // 集群内部访问端口
               TargetPort: intstr.FromInt(int(config.HttpPort)), // 容器对外端口
               NodePort:   config.HttpNodePort,                  // 外部访问端口
            },
            {
               Name:       "tcp",
               Protocol:   apiv1.ProtocolTCP,
               Port:       config.RpcPort,
               TargetPort: intstr.FromInt(int(config.RpcPort)),
               NodePort:   config.RpcNodePort,
            },
         },
         SessionAffinity: apiv1.ServiceAffinityClientIP,
      },
   }
   _, err := config.Client.CoreV1().Services(config.NameSpace).Create(context.TODO(), service, metav1.CreateOptions{})
   if err != nil {
      if !errors.IsAlreadyExists(err) {
         return fmt.Errorf("failed to create Service: %v", err)
      }
      log.Printf("Service %s already exists in Namespace %s\n", config.ServiceName, config.NameSpace)
   } else {
      log.Printf("Service %s created in Namespace %s\n", config.ServiceName, config.NameSpace)
   }
   return nil
}
func (c *ApsCreate) GetCid(config Config) string {
   return fmt.Sprintf("%v-%v", config.NameSpace, "aps")
}