diff options
| author | KubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com> | 2023-10-09 10:11:54 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-10-09 10:11:54 +0800 |
| commit | 3a8f9a83b628da65feae543a6017a6dc184f4b5d (patch) | |
| tree | 107e8b7442014c5375a857736bc7e87549905dac | |
| parent | Merge pull request #4904 from ZhengXinwei-F/version (diff) | |
| parent | update resourcetype pod (diff) | |
| download | kubeedge-3a8f9a83b628da65feae543a6017a6dc184f4b5d.tar.gz | |
Merge pull request #4825 from luomengY/modify_mqtt_static
Add the function of supporting static pod on edge nodes.
| -rw-r--r-- | cloud/pkg/cloudhub/dispatcher/message_dispatcher.go | 3 | ||||
| -rw-r--r-- | cloud/pkg/edgecontroller/controller/upstream.go | 60 | ||||
| -rw-r--r-- | common/constants/default.go | 5 | ||||
| -rw-r--r-- | edge/pkg/edged/config/config.go | 1 | ||||
| -rw-r--r-- | edge/pkg/edged/edged.go | 9 | ||||
| -rw-r--r-- | edge/pkg/edged/kubeclientbridge/typed/core/v1/pod_bridge.go | 4 | ||||
| -rw-r--r-- | edge/pkg/metamanager/client/pod.go | 14 | ||||
| -rw-r--r-- | keadm/cmd/keadm/app/cmd/reset.go | 19 | ||||
| -rw-r--r-- | pkg/apis/componentconfig/cloudcore/v1alpha1/default.go | 2 | ||||
| -rw-r--r-- | pkg/apis/componentconfig/cloudcore/v1alpha1/types.go | 5 | ||||
| -rw-r--r-- | pkg/apis/componentconfig/edgecore/v1alpha2/default_kubelet_configuration.go | 2 | ||||
| -rw-r--r-- | pkg/apis/componentconfig/edgecore/v1alpha2/types.go | 5 |
12 files changed, 125 insertions, 4 deletions
diff --git a/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go b/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go index cd71bacdd..dd953fa57 100644 --- a/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go +++ b/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go @@ -467,7 +467,8 @@ func noAckRequired(msg *beehivemodel.Message) bool { resourceType == beehivemodel.ResourceTypeLease || resourceType == beehivemodel.ResourceTypeNodePatch || resourceType == beehivemodel.ResourceTypePodPatch || - resourceType == beehivemodel.ResourceTypePodStatus { + resourceType == beehivemodel.ResourceTypePodStatus || + (resourceType == beehivemodel.ResourceTypePod && msg.GetOperation() == beehivemodel.ResponseOperation) { return true } } diff --git a/cloud/pkg/edgecontroller/controller/upstream.go b/cloud/pkg/edgecontroller/controller/upstream.go index 8e38363a8..0f982ab6d 100644 --- a/cloud/pkg/edgecontroller/controller/upstream.go +++ b/cloud/pkg/edgecontroller/controller/upstream.go @@ -116,6 +116,7 @@ type UpstreamController struct { ruleStatusChan chan model.Message createLeaseChan chan model.Message queryLeaseChan chan model.Message + createPodChan chan model.Message // lister podLister corelisters.PodLister @@ -182,6 +183,9 @@ func (uc *UpstreamController) Start() error { for i := 0; i < int(uc.config.Load.UpdateRuleStatusWorkers); i++ { go uc.updateRuleStatus() } + for i := 0; i < int(uc.config.Load.CreatePodWorks); i++ { + go uc.createPod() + } return nil } @@ -243,9 +247,12 @@ func (uc *UpstreamController) dispatchMessage() { case model.ResourceTypePodPatch: uc.patchPodChan <- msg case model.ResourceTypePod: - if msg.GetOperation() == model.DeleteOperation { + switch msg.GetOperation() { + case model.DeleteOperation: uc.podDeleteChan <- msg - } else { + case model.InsertOperation: + uc.createPodChan <- msg + default: klog.Errorf("message: %s, operation type: %s unsupported", msg.GetID(), msg.GetOperation()) } case model.ResourceTypeRuleStatus: @@ -1042,6 +1049,54 @@ func (uc *UpstreamController) patchPod() { } } +func (uc *UpstreamController) createPod() { + for { + select { + case <-beehiveContext.Done(): + klog.Warning("stop createPod") + return + case msg := <-uc.createPodChan: + klog.V(5).Infof("message: %s, operation is: %s, and resource is %s", msg.GetID(), msg.GetOperation(), msg.GetResource()) + namespace, err := messagelayer.GetNamespace(msg) + if err != nil { + klog.Warningf("message: %s process failure, get namespace failed with error: %v", msg.GetID(), err) + continue + } + name, err := messagelayer.GetResourceName(msg) + if err != nil { + klog.Warningf("message: %s process failure, get resource name failed with error: %v", msg.GetID(), err) + continue + } + + podBytes, err := msg.GetContentData() + if err != nil { + klog.Warningf("message: %s process failure, get data failed with error: %v", msg.GetID(), err) + continue + } + var pod v1.Pod + if err = json.Unmarshal(podBytes, &pod); err != nil { + klog.Errorf("unmarshal pod request failed with error: %v", err) + continue + } + + createPod, err := uc.kubeClient.CoreV1().Pods(namespace).Create(context.TODO(), &pod, metaV1.CreateOptions{}) + if err != nil { + klog.Errorf("message: %s process failure, create pod failed with error: %v, namespace: %s, name: %s", msg.GetID(), err, namespace, name) + } + + resMsg := model.NewMessage(msg.GetID()). + SetResourceVersion(createPod.ResourceVersion). + FillBody(&edgeapi.ObjectResp{Object: createPod, Err: err}). + BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, msg.GetResource(), model.ResponseOperation) + if err = uc.messageLayer.Response(*resMsg); err != nil { + klog.Errorf("Message: %s process failure, response failed with error: %v", msg.GetID(), err) + continue + } + klog.V(4).Infof("message: %s, create pod successfully, namespace: %s, name: %s", msg.GetID(), namespace, name) + } + } +} + func (uc *UpstreamController) deletePod() { for { select { @@ -1378,6 +1433,7 @@ func NewUpstreamController(config *v1alpha1.EdgeController, factory k8sinformer. uc.queryNodeChan = make(chan model.Message, config.Buffer.QueryNode) uc.updateNodeChan = make(chan model.Message, config.Buffer.UpdateNode) uc.patchPodChan = make(chan model.Message, config.Buffer.PatchPod) + uc.createPodChan = make(chan model.Message, config.Buffer.CreatePod) uc.podDeleteChan = make(chan model.Message, config.Buffer.DeletePod) uc.createLeaseChan = make(chan model.Message, config.Buffer.CreateLease) uc.queryLeaseChan = make(chan model.Message, config.Buffer.QueryLease) diff --git a/common/constants/default.go b/common/constants/default.go index cb7231078..92a69f60e 100644 --- a/common/constants/default.go +++ b/common/constants/default.go @@ -84,6 +84,7 @@ const ( DefaultUpdateRuleStatusWorkers = 4 DefaultQueryLeaseWorkers = 100 DefaultServiceAccountTokenWorkers = 100 + CreatePodWorks = 4 DefaultUpdatePodStatusBuffer = 1024 DefaultUpdateNodeStatusBuffer = 1024 @@ -98,6 +99,7 @@ const ( DefaultDeletePodBuffer = 1024 DefaultQueryLeaseBuffer = 1024 DefaultServiceAccountTokenBuffer = 1024 + DefaultCreatePodBuffer = 1024 DefaultPodEventBuffer = 1 DefaultConfigMapEventBuffer = 1 @@ -150,4 +152,7 @@ const ( DeafultMosquittoContainerName = "mqtt-kubeedge" DeployMqttContainerEnv = "DEPLOY_MQTT_CONTAINER" + + // DefaultManifestsDir edge node default static pod path + DefaultManifestsDir = "/etc/kubeedge/manifests" ) diff --git a/edge/pkg/edged/config/config.go b/edge/pkg/edged/config/config.go index 52228dbad..7d217dbae 100644 --- a/edge/pkg/edged/config/config.go +++ b/edge/pkg/edged/config/config.go @@ -33,6 +33,7 @@ func InitConfigure(e *v1alpha2.Edged) { } func ConvertEdgedKubeletConfigurationToConfigKubeletConfiguration(in *v1alpha2.TailoredKubeletConfiguration, out *kubeletconfig.KubeletConfiguration, s conversion.Scope) error { + out.StaticPodPath = in.StaticPodPath out.SyncFrequency = in.SyncFrequency out.Address = in.Address out.ReadOnlyPort = in.ReadOnlyPort diff --git a/edge/pkg/edged/edged.go b/edge/pkg/edged/edged.go index 3ca218682..66e477b72 100644 --- a/edge/pkg/edged/edged.go +++ b/edge/pkg/edged/edged.go @@ -170,6 +170,15 @@ func newEdged(enable bool, nodeName, namespace string) (*edged, error) { KubeletFlags: kubeletFlags, KubeletConfiguration: kubeletConfig, } + + if kubeletConfig.StaticPodPath != "" { + if err := os.MkdirAll(kubeletConfig.StaticPodPath, os.ModePerm); err != nil { + return nil, fmt.Errorf("create %s static pod path failed: %v", kubeletConfig.StaticPodPath, err) + } + } else { + klog.ErrorS(err, "static pod path is nil!") + } + nodestatus.KubeletVersion = fmt.Sprintf("%s-kubeedge-%s", constants.CurrentSupportK8sVersion, version.Get()) // use kubeletServer to construct the default KubeletDeps kubeletDeps, err := DefaultKubeletDeps(&kubeletServer, utilfeature.DefaultFeatureGate) diff --git a/edge/pkg/edged/kubeclientbridge/typed/core/v1/pod_bridge.go b/edge/pkg/edged/kubeclientbridge/typed/core/v1/pod_bridge.go index 62095753b..821902fd9 100644 --- a/edge/pkg/edged/kubeclientbridge/typed/core/v1/pod_bridge.go +++ b/edge/pkg/edged/kubeclientbridge/typed/core/v1/pod_bridge.go @@ -47,3 +47,7 @@ func (c *PodsBridge) Patch(ctx context.Context, name string, pt types.PatchType, func (c *PodsBridge) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { return c.MetaClient.Pods(c.ns).Delete(name, opts) } + +func (c *PodsBridge) Create(ctx context.Context, pod *corev1.Pod, opts metav1.CreateOptions) (result *corev1.Pod, err error) { + return c.MetaClient.Pods(c.ns).Create(pod) +} diff --git a/edge/pkg/metamanager/client/pod.go b/edge/pkg/metamanager/client/pod.go index d6c9218b8..ce0c851b9 100644 --- a/edge/pkg/metamanager/client/pod.go +++ b/edge/pkg/metamanager/client/pod.go @@ -52,7 +52,19 @@ func newPods(namespace string, s SendInterface) *pods { } func (c *pods) Create(cm *corev1.Pod) (*corev1.Pod, error) { - return nil, nil + resource := fmt.Sprintf("%s/%s/%s", c.namespace, model.ResourceTypePod, cm.Name) + podMsg := message.BuildMsg(modules.MetaGroup, "", modules.EdgedModuleName, resource, model.InsertOperation, *cm) + resp, err := c.send.SendSync(podMsg) + if err != nil { + return nil, fmt.Errorf("create pod failed, err: %v", err) + } + + content, err := resp.GetContentData() + if err != nil { + return nil, fmt.Errorf("parse message to pod failed, err: %v", err) + } + + return handlePodResp(resource, content) } func (c *pods) Update(cm *corev1.Pod) error { diff --git a/keadm/cmd/keadm/app/cmd/reset.go b/keadm/cmd/keadm/app/cmd/reset.go index a180bca7e..10e8fb052 100644 --- a/keadm/cmd/keadm/app/cmd/reset.go +++ b/keadm/cmd/keadm/app/cmd/reset.go @@ -21,6 +21,7 @@ import ( "fmt" "os" "strings" + "time" "github.com/spf13/cobra" phases "k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/reset" @@ -90,6 +91,24 @@ func NewKubeEdgeReset() *cobra.Command { return fmt.Errorf("aborted reset operation") } } + + // first cleanup edge node static pod directory to stop static and mirror pod + if isEdgeNode { + config, err := util.ParseEdgecoreConfig(common.EdgecoreConfigPath) + if err != nil { + return err + } + dir := config.Modules.Edged.TailoredKubeletConfig.StaticPodPath + if dir != "" { + if err := phases.CleanDir(dir); err != nil { + fmt.Printf("Failed to delete static pod directory %s: %v\n", dir, err) + } else { + time.Sleep(1 * time.Second) + fmt.Printf("Static pod directory has been removed!\n") + } + } + } + // 1. kill cloudcore/edgecore process. // For edgecore, don't delete node from K8S if err := TearDownKubeEdge(isEdgeNode, reset.Kubeconfig); err != nil { diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go index d18a38ad2..31e0857e3 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go @@ -163,6 +163,7 @@ func getDefaultEdgeControllerLoad(nodeLimit int32) *EdgeControllerLoad { QueryLeaseWorkers: constants.DefaultQueryLeaseWorkers, UpdateRuleStatusWorkers: constants.DefaultUpdateRuleStatusWorkers, ServiceAccountTokenWorkers: constants.DefaultServiceAccountTokenWorkers, + CreatePodWorks: constants.CreatePodWorks, } } @@ -190,6 +191,7 @@ func getDefaultEdgeControllerBuffer(nodeLimit int32) *EdgeControllerBuffer { CreateLease: 1024 + nodeLimit, QueryLease: constants.DefaultQueryLeaseBuffer, ServiceAccountToken: constants.DefaultServiceAccountTokenBuffer, + CreatePod: constants.DefaultCreatePodBuffer, } } diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go index 0319ecbd9..8d04c9359 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go @@ -285,6 +285,9 @@ type EdgeControllerBuffer struct { // ServiceAccount indicates the buffer of service account token // default 1024 ServiceAccountToken int32 `json:"serviceAccountToken,omitempty"` + // CreatePod indicates the buffer of patch pod + // default 1024 + CreatePod int32 `json:"createPod,omitempty"` } // EdgeControllerLoad indicates the EdgeController load @@ -340,6 +343,8 @@ type EdgeControllerLoad struct { // ServiceAccountTokenWorkers indicates the load of service account token // default 4 ServiceAccountTokenWorkers int32 `json:"ServiceAccountTokenWorkers,omitempty"` + // default 4 + CreatePodWorks int32 `json:"CreatePodWorks,omitempty"` } // DeviceController indicates the device controller diff --git a/pkg/apis/componentconfig/edgecore/v1alpha2/default_kubelet_configuration.go b/pkg/apis/componentconfig/edgecore/v1alpha2/default_kubelet_configuration.go index 5207d5550..ce382c6b6 100644 --- a/pkg/apis/componentconfig/edgecore/v1alpha2/default_kubelet_configuration.go +++ b/pkg/apis/componentconfig/edgecore/v1alpha2/default_kubelet_configuration.go @@ -98,4 +98,6 @@ func SetDefaultsKubeletConfiguration(obj *TailoredKubeletConfiguration) { obj.CgroupsPerQOS = utilpointer.Bool(DefaultCgroupsPerQOS) obj.ResolverConfig = utilpointer.String(DefaultResolverConfig) obj.CPUCFSQuota = utilpointer.Bool(DefaultCPUCFSQuota) + // Add static pod default path + obj.StaticPodPath = constants.DefaultManifestsDir } diff --git a/pkg/apis/componentconfig/edgecore/v1alpha2/types.go b/pkg/apis/componentconfig/edgecore/v1alpha2/types.go index 0628649ad..377d0cddc 100644 --- a/pkg/apis/componentconfig/edgecore/v1alpha2/types.go +++ b/pkg/apis/componentconfig/edgecore/v1alpha2/types.go @@ -696,6 +696,11 @@ type TailoredKubeletConfiguration struct { // Default: true // +optional RegisterNode *bool `json:"registerNode,omitempty"` + // staticPodPath is the path to the directory containing local (static) pods to + // run, or the path to a single static pod file. + // Default: "/etc/kubeedge/manifests" + // +optional + StaticPodPath string `json:"staticPodPath,omitempty"` } // TailoredKubeletFlag indicates the tailored kubelet flag |
