summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com>2023-10-09 10:11:54 +0800
committerGitHub <noreply@github.com>2023-10-09 10:11:54 +0800
commit3a8f9a83b628da65feae543a6017a6dc184f4b5d (patch)
tree107e8b7442014c5375a857736bc7e87549905dac
parentMerge pull request #4904 from ZhengXinwei-F/version (diff)
parentupdate resourcetype pod (diff)
downloadkubeedge-3a8f9a83b628da65feae543a6017a6dc184f4b5d.tar.gz
Merge pull request #4825 from luomengY/modify_mqtt_static
Add the function of supporting static pod on edge nodes.
-rw-r--r--cloud/pkg/cloudhub/dispatcher/message_dispatcher.go3
-rw-r--r--cloud/pkg/edgecontroller/controller/upstream.go60
-rw-r--r--common/constants/default.go5
-rw-r--r--edge/pkg/edged/config/config.go1
-rw-r--r--edge/pkg/edged/edged.go9
-rw-r--r--edge/pkg/edged/kubeclientbridge/typed/core/v1/pod_bridge.go4
-rw-r--r--edge/pkg/metamanager/client/pod.go14
-rw-r--r--keadm/cmd/keadm/app/cmd/reset.go19
-rw-r--r--pkg/apis/componentconfig/cloudcore/v1alpha1/default.go2
-rw-r--r--pkg/apis/componentconfig/cloudcore/v1alpha1/types.go5
-rw-r--r--pkg/apis/componentconfig/edgecore/v1alpha2/default_kubelet_configuration.go2
-rw-r--r--pkg/apis/componentconfig/edgecore/v1alpha2/types.go5
12 files changed, 125 insertions, 4 deletions
diff --git a/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go b/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go
index cd71bacdd..dd953fa57 100644
--- a/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go
+++ b/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go
@@ -467,7 +467,8 @@ func noAckRequired(msg *beehivemodel.Message) bool {
resourceType == beehivemodel.ResourceTypeLease ||
resourceType == beehivemodel.ResourceTypeNodePatch ||
resourceType == beehivemodel.ResourceTypePodPatch ||
- resourceType == beehivemodel.ResourceTypePodStatus {
+ resourceType == beehivemodel.ResourceTypePodStatus ||
+ (resourceType == beehivemodel.ResourceTypePod && msg.GetOperation() == beehivemodel.ResponseOperation) {
return true
}
}
diff --git a/cloud/pkg/edgecontroller/controller/upstream.go b/cloud/pkg/edgecontroller/controller/upstream.go
index 8e38363a8..0f982ab6d 100644
--- a/cloud/pkg/edgecontroller/controller/upstream.go
+++ b/cloud/pkg/edgecontroller/controller/upstream.go
@@ -116,6 +116,7 @@ type UpstreamController struct {
ruleStatusChan chan model.Message
createLeaseChan chan model.Message
queryLeaseChan chan model.Message
+ createPodChan chan model.Message
// lister
podLister corelisters.PodLister
@@ -182,6 +183,9 @@ func (uc *UpstreamController) Start() error {
for i := 0; i < int(uc.config.Load.UpdateRuleStatusWorkers); i++ {
go uc.updateRuleStatus()
}
+ for i := 0; i < int(uc.config.Load.CreatePodWorks); i++ {
+ go uc.createPod()
+ }
return nil
}
@@ -243,9 +247,12 @@ func (uc *UpstreamController) dispatchMessage() {
case model.ResourceTypePodPatch:
uc.patchPodChan <- msg
case model.ResourceTypePod:
- if msg.GetOperation() == model.DeleteOperation {
+ switch msg.GetOperation() {
+ case model.DeleteOperation:
uc.podDeleteChan <- msg
- } else {
+ case model.InsertOperation:
+ uc.createPodChan <- msg
+ default:
klog.Errorf("message: %s, operation type: %s unsupported", msg.GetID(), msg.GetOperation())
}
case model.ResourceTypeRuleStatus:
@@ -1042,6 +1049,54 @@ func (uc *UpstreamController) patchPod() {
}
}
+func (uc *UpstreamController) createPod() {
+ for {
+ select {
+ case <-beehiveContext.Done():
+ klog.Warning("stop createPod")
+ return
+ case msg := <-uc.createPodChan:
+ klog.V(5).Infof("message: %s, operation is: %s, and resource is %s", msg.GetID(), msg.GetOperation(), msg.GetResource())
+ namespace, err := messagelayer.GetNamespace(msg)
+ if err != nil {
+ klog.Warningf("message: %s process failure, get namespace failed with error: %v", msg.GetID(), err)
+ continue
+ }
+ name, err := messagelayer.GetResourceName(msg)
+ if err != nil {
+ klog.Warningf("message: %s process failure, get resource name failed with error: %v", msg.GetID(), err)
+ continue
+ }
+
+ podBytes, err := msg.GetContentData()
+ if err != nil {
+ klog.Warningf("message: %s process failure, get data failed with error: %v", msg.GetID(), err)
+ continue
+ }
+ var pod v1.Pod
+ if err = json.Unmarshal(podBytes, &pod); err != nil {
+ klog.Errorf("unmarshal pod request failed with error: %v", err)
+ continue
+ }
+
+ createPod, err := uc.kubeClient.CoreV1().Pods(namespace).Create(context.TODO(), &pod, metaV1.CreateOptions{})
+ if err != nil {
+ klog.Errorf("message: %s process failure, create pod failed with error: %v, namespace: %s, name: %s", msg.GetID(), err, namespace, name)
+ }
+
+ resMsg := model.NewMessage(msg.GetID()).
+ SetResourceVersion(createPod.ResourceVersion).
+ FillBody(&edgeapi.ObjectResp{Object: createPod, Err: err}).
+ BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, msg.GetResource(), model.ResponseOperation)
+ if err = uc.messageLayer.Response(*resMsg); err != nil {
+ klog.Errorf("Message: %s process failure, response failed with error: %v", msg.GetID(), err)
+ continue
+ }
+ klog.V(4).Infof("message: %s, create pod successfully, namespace: %s, name: %s", msg.GetID(), namespace, name)
+ }
+ }
+}
+
func (uc *UpstreamController) deletePod() {
for {
select {
@@ -1378,6 +1433,7 @@ func NewUpstreamController(config *v1alpha1.EdgeController, factory k8sinformer.
uc.queryNodeChan = make(chan model.Message, config.Buffer.QueryNode)
uc.updateNodeChan = make(chan model.Message, config.Buffer.UpdateNode)
uc.patchPodChan = make(chan model.Message, config.Buffer.PatchPod)
+ uc.createPodChan = make(chan model.Message, config.Buffer.CreatePod)
uc.podDeleteChan = make(chan model.Message, config.Buffer.DeletePod)
uc.createLeaseChan = make(chan model.Message, config.Buffer.CreateLease)
uc.queryLeaseChan = make(chan model.Message, config.Buffer.QueryLease)
diff --git a/common/constants/default.go b/common/constants/default.go
index cb7231078..92a69f60e 100644
--- a/common/constants/default.go
+++ b/common/constants/default.go
@@ -84,6 +84,7 @@ const (
DefaultUpdateRuleStatusWorkers = 4
DefaultQueryLeaseWorkers = 100
DefaultServiceAccountTokenWorkers = 100
+ CreatePodWorks = 4
DefaultUpdatePodStatusBuffer = 1024
DefaultUpdateNodeStatusBuffer = 1024
@@ -98,6 +99,7 @@ const (
DefaultDeletePodBuffer = 1024
DefaultQueryLeaseBuffer = 1024
DefaultServiceAccountTokenBuffer = 1024
+ DefaultCreatePodBuffer = 1024
DefaultPodEventBuffer = 1
DefaultConfigMapEventBuffer = 1
@@ -150,4 +152,7 @@ const (
DeafultMosquittoContainerName = "mqtt-kubeedge"
DeployMqttContainerEnv = "DEPLOY_MQTT_CONTAINER"
+
+ // DefaultManifestsDir edge node default static pod path
+ DefaultManifestsDir = "/etc/kubeedge/manifests"
)
diff --git a/edge/pkg/edged/config/config.go b/edge/pkg/edged/config/config.go
index 52228dbad..7d217dbae 100644
--- a/edge/pkg/edged/config/config.go
+++ b/edge/pkg/edged/config/config.go
@@ -33,6 +33,7 @@ func InitConfigure(e *v1alpha2.Edged) {
}
func ConvertEdgedKubeletConfigurationToConfigKubeletConfiguration(in *v1alpha2.TailoredKubeletConfiguration, out *kubeletconfig.KubeletConfiguration, s conversion.Scope) error {
+ out.StaticPodPath = in.StaticPodPath
out.SyncFrequency = in.SyncFrequency
out.Address = in.Address
out.ReadOnlyPort = in.ReadOnlyPort
diff --git a/edge/pkg/edged/edged.go b/edge/pkg/edged/edged.go
index 3ca218682..66e477b72 100644
--- a/edge/pkg/edged/edged.go
+++ b/edge/pkg/edged/edged.go
@@ -170,6 +170,15 @@ func newEdged(enable bool, nodeName, namespace string) (*edged, error) {
KubeletFlags: kubeletFlags,
KubeletConfiguration: kubeletConfig,
}
+
+ if kubeletConfig.StaticPodPath != "" {
+ if err := os.MkdirAll(kubeletConfig.StaticPodPath, os.ModePerm); err != nil {
+ return nil, fmt.Errorf("create %s static pod path failed: %v", kubeletConfig.StaticPodPath, err)
+ }
+ } else {
+ klog.ErrorS(err, "static pod path is nil!")
+ }
+
nodestatus.KubeletVersion = fmt.Sprintf("%s-kubeedge-%s", constants.CurrentSupportK8sVersion, version.Get())
// use kubeletServer to construct the default KubeletDeps
kubeletDeps, err := DefaultKubeletDeps(&kubeletServer, utilfeature.DefaultFeatureGate)
diff --git a/edge/pkg/edged/kubeclientbridge/typed/core/v1/pod_bridge.go b/edge/pkg/edged/kubeclientbridge/typed/core/v1/pod_bridge.go
index 62095753b..821902fd9 100644
--- a/edge/pkg/edged/kubeclientbridge/typed/core/v1/pod_bridge.go
+++ b/edge/pkg/edged/kubeclientbridge/typed/core/v1/pod_bridge.go
@@ -47,3 +47,7 @@ func (c *PodsBridge) Patch(ctx context.Context, name string, pt types.PatchType,
func (c *PodsBridge) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
return c.MetaClient.Pods(c.ns).Delete(name, opts)
}
+
+func (c *PodsBridge) Create(ctx context.Context, pod *corev1.Pod, opts metav1.CreateOptions) (result *corev1.Pod, err error) {
+ return c.MetaClient.Pods(c.ns).Create(pod)
+}
diff --git a/edge/pkg/metamanager/client/pod.go b/edge/pkg/metamanager/client/pod.go
index d6c9218b8..ce0c851b9 100644
--- a/edge/pkg/metamanager/client/pod.go
+++ b/edge/pkg/metamanager/client/pod.go
@@ -52,7 +52,19 @@ func newPods(namespace string, s SendInterface) *pods {
}
func (c *pods) Create(cm *corev1.Pod) (*corev1.Pod, error) {
- return nil, nil
+ resource := fmt.Sprintf("%s/%s/%s", c.namespace, model.ResourceTypePod, cm.Name)
+ podMsg := message.BuildMsg(modules.MetaGroup, "", modules.EdgedModuleName, resource, model.InsertOperation, *cm)
+ resp, err := c.send.SendSync(podMsg)
+ if err != nil {
+ return nil, fmt.Errorf("create pod failed, err: %v", err)
+ }
+
+ content, err := resp.GetContentData()
+ if err != nil {
+ return nil, fmt.Errorf("parse message to pod failed, err: %v", err)
+ }
+
+ return handlePodResp(resource, content)
}
func (c *pods) Update(cm *corev1.Pod) error {
diff --git a/keadm/cmd/keadm/app/cmd/reset.go b/keadm/cmd/keadm/app/cmd/reset.go
index a180bca7e..10e8fb052 100644
--- a/keadm/cmd/keadm/app/cmd/reset.go
+++ b/keadm/cmd/keadm/app/cmd/reset.go
@@ -21,6 +21,7 @@ import (
"fmt"
"os"
"strings"
+ "time"
"github.com/spf13/cobra"
phases "k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/reset"
@@ -90,6 +91,24 @@ func NewKubeEdgeReset() *cobra.Command {
return fmt.Errorf("aborted reset operation")
}
}
+
+ // first cleanup edge node static pod directory to stop static and mirror pod
+ if isEdgeNode {
+ config, err := util.ParseEdgecoreConfig(common.EdgecoreConfigPath)
+ if err != nil {
+ return err
+ }
+ dir := config.Modules.Edged.TailoredKubeletConfig.StaticPodPath
+ if dir != "" {
+ if err := phases.CleanDir(dir); err != nil {
+ fmt.Printf("Failed to delete static pod directory %s: %v\n", dir, err)
+ } else {
+ time.Sleep(1 * time.Second)
+ fmt.Printf("Static pod directory has been removed!\n")
+ }
+ }
+ }
+
// 1. kill cloudcore/edgecore process.
// For edgecore, don't delete node from K8S
if err := TearDownKubeEdge(isEdgeNode, reset.Kubeconfig); err != nil {
diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go
index d18a38ad2..31e0857e3 100644
--- a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go
+++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go
@@ -163,6 +163,7 @@ func getDefaultEdgeControllerLoad(nodeLimit int32) *EdgeControllerLoad {
QueryLeaseWorkers: constants.DefaultQueryLeaseWorkers,
UpdateRuleStatusWorkers: constants.DefaultUpdateRuleStatusWorkers,
ServiceAccountTokenWorkers: constants.DefaultServiceAccountTokenWorkers,
+ CreatePodWorks: constants.CreatePodWorks,
}
}
@@ -190,6 +191,7 @@ func getDefaultEdgeControllerBuffer(nodeLimit int32) *EdgeControllerBuffer {
CreateLease: 1024 + nodeLimit,
QueryLease: constants.DefaultQueryLeaseBuffer,
ServiceAccountToken: constants.DefaultServiceAccountTokenBuffer,
+ CreatePod: constants.DefaultCreatePodBuffer,
}
}
diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go
index 0319ecbd9..8d04c9359 100644
--- a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go
+++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go
@@ -285,6 +285,9 @@ type EdgeControllerBuffer struct {
// ServiceAccount indicates the buffer of service account token
// default 1024
ServiceAccountToken int32 `json:"serviceAccountToken,omitempty"`
+ // CreatePod indicates the buffer of patch pod
+ // default 1024
+ CreatePod int32 `json:"createPod,omitempty"`
}
// EdgeControllerLoad indicates the EdgeController load
@@ -340,6 +343,8 @@ type EdgeControllerLoad struct {
// ServiceAccountTokenWorkers indicates the load of service account token
// default 4
ServiceAccountTokenWorkers int32 `json:"ServiceAccountTokenWorkers,omitempty"`
+ // default 4
+ CreatePodWorks int32 `json:"CreatePodWorks,omitempty"`
}
// DeviceController indicates the device controller
diff --git a/pkg/apis/componentconfig/edgecore/v1alpha2/default_kubelet_configuration.go b/pkg/apis/componentconfig/edgecore/v1alpha2/default_kubelet_configuration.go
index 5207d5550..ce382c6b6 100644
--- a/pkg/apis/componentconfig/edgecore/v1alpha2/default_kubelet_configuration.go
+++ b/pkg/apis/componentconfig/edgecore/v1alpha2/default_kubelet_configuration.go
@@ -98,4 +98,6 @@ func SetDefaultsKubeletConfiguration(obj *TailoredKubeletConfiguration) {
obj.CgroupsPerQOS = utilpointer.Bool(DefaultCgroupsPerQOS)
obj.ResolverConfig = utilpointer.String(DefaultResolverConfig)
obj.CPUCFSQuota = utilpointer.Bool(DefaultCPUCFSQuota)
+ // Add static pod default path
+ obj.StaticPodPath = constants.DefaultManifestsDir
}
diff --git a/pkg/apis/componentconfig/edgecore/v1alpha2/types.go b/pkg/apis/componentconfig/edgecore/v1alpha2/types.go
index 0628649ad..377d0cddc 100644
--- a/pkg/apis/componentconfig/edgecore/v1alpha2/types.go
+++ b/pkg/apis/componentconfig/edgecore/v1alpha2/types.go
@@ -696,6 +696,11 @@ type TailoredKubeletConfiguration struct {
// Default: true
// +optional
RegisterNode *bool `json:"registerNode,omitempty"`
+ // staticPodPath is the path to the directory containing local (static) pods to
+ // run, or the path to a single static pod file.
+ // Default: "/etc/kubeedge/manifests"
+ // +optional
+ StaticPodPath string `json:"staticPodPath,omitempty"`
}
// TailoredKubeletFlag indicates the tailored kubelet flag