diff options
| author | Shelley-BaoYue <baoyue2@huawei.com> | 2024-01-03 10:49:28 +0800 |
|---|---|---|
| committer | Shelley-BaoYue <baoyue2@huawei.com> | 2024-01-03 11:44:15 +0800 |
| commit | c5a307193a8a4494e921a25edca37216d8c14653 (patch) | |
| tree | 612ed95f31cd244023d2699a89ab40e20a849d77 /cloud | |
| parent | add image prepull api (diff) | |
| download | kubeedge-c5a307193a8a4494e921a25edca37216d8c14653.tar.gz | |
support imageprepull feature
Signed-off-by: Shelley-BaoYue <baoyue2@huawei.com>
Diffstat (limited to 'cloud')
| -rw-r--r-- | cloud/cmd/cloudcore/app/server.go | 2 | ||||
| -rw-r--r-- | cloud/pkg/cloudhub/dispatcher/message_dispatcher.go | 6 | ||||
| -rw-r--r-- | cloud/pkg/common/messagelayer/context.go | 8 | ||||
| -rw-r--r-- | cloud/pkg/common/modules/modules.go | 3 | ||||
| -rw-r--r-- | cloud/pkg/common/util/util.go | 55 | ||||
| -rw-r--r-- | cloud/pkg/common/util/util_test.go | 55 | ||||
| -rw-r--r-- | cloud/pkg/imageprepullcontroller/config/config.go | 38 | ||||
| -rw-r--r-- | cloud/pkg/imageprepullcontroller/controller/downstream.go | 268 | ||||
| -rw-r--r-- | cloud/pkg/imageprepullcontroller/controller/upstream.go | 147 | ||||
| -rw-r--r-- | cloud/pkg/imageprepullcontroller/controller/util.go | 131 | ||||
| -rw-r--r-- | cloud/pkg/imageprepullcontroller/imageprepullcontroller.go | 91 | ||||
| -rw-r--r-- | cloud/pkg/imageprepullcontroller/manager/common.go | 62 | ||||
| -rw-r--r-- | cloud/pkg/imageprepullcontroller/manager/imageprepull.go | 52 | ||||
| -rw-r--r-- | cloud/pkg/nodeupgradejobcontroller/controller/downstream.go | 7 | ||||
| -rw-r--r-- | cloud/pkg/nodeupgradejobcontroller/controller/util.go | 34 | ||||
| -rw-r--r-- | cloud/pkg/nodeupgradejobcontroller/controller/util_test.go | 33 |
16 files changed, 921 insertions, 71 deletions
diff --git a/cloud/cmd/cloudcore/app/server.go b/cloud/cmd/cloudcore/app/server.go index e42f6cf3b..fd346cd72 100644 --- a/cloud/cmd/cloudcore/app/server.go +++ b/cloud/cmd/cloudcore/app/server.go @@ -48,6 +48,7 @@ import ( "github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller" "github.com/kubeedge/kubeedge/cloud/pkg/dynamiccontroller" "github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller" + "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller" "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller" "github.com/kubeedge/kubeedge/cloud/pkg/policycontroller" "github.com/kubeedge/kubeedge/cloud/pkg/router" @@ -158,6 +159,7 @@ func registerModules(c *v1alpha1.CloudCoreConfig) { cloudhub.Register(c.Modules.CloudHub) edgecontroller.Register(c.Modules.EdgeController) devicecontroller.Register(c.Modules.DeviceController) + imageprepullcontroller.Register(c.Modules.ImagePrePullController) nodeupgradejobcontroller.Register(c.Modules.NodeUpgradeJobController) synccontroller.Register(c.Modules.SyncController) cloudstream.Register(c.Modules.CloudStream, c.CommonConfig) diff --git a/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go b/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go index dd953fa57..a236d3ebd 100644 --- a/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go +++ b/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go @@ -33,6 +33,7 @@ import ( "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/session" "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer" "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + imageprepullcontroller "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/controller" "github.com/kubeedge/kubeedge/cloud/pkg/synccontroller" commonconst "github.com/kubeedge/kubeedge/common/constants" v2 "github.com/kubeedge/kubeedge/edge/pkg/metamanager/dao/v2" @@ -183,6 +184,9 @@ func (md *messageDispatcher) DispatchUpstream(message *beehivemodel.Message, inf message.Router.Resource = fmt.Sprintf("node/%s/%s", info.NodeID, message.Router.Resource) beehivecontext.Send(modules.RouterModuleName, *message) + case message.GetOperation() == imageprepullcontroller.ImagePrePull: + beehivecontext.SendToGroup(modules.ImagePrePullControllerModuleGroup, *message) + default: err := md.PubToController(info, message) if err != nil { @@ -447,7 +451,7 @@ func noAckRequired(msg *beehivemodel.Message) bool { return true case msg.GetGroup() == modules.UserGroup: return true - case msg.GetSource() == modules.NodeUpgradeJobControllerModuleName: + case msg.GetSource() == modules.NodeUpgradeJobControllerModuleName || msg.GetSource() == modules.ImagePrePullControllerModuleName: return true case msg.GetOperation() == beehivemodel.ResponseOperation: content, ok := msg.Content.(string) diff --git a/cloud/pkg/common/messagelayer/context.go b/cloud/pkg/common/messagelayer/context.go index 7aeb2afdc..8a2e9d4bd 100644 --- a/cloud/pkg/common/messagelayer/context.go +++ b/cloud/pkg/common/messagelayer/context.go @@ -104,6 +104,14 @@ func NodeUpgradeJobControllerMessageLayer() MessageLayer { } } +func ImagePrePullControllerMessageLayer() MessageLayer { + return &ContextMessageLayer{ + SendModuleName: modules.CloudHubModuleName, + ReceiveModuleName: modules.ImagePrePullControllerModuleName, + ResponseModuleName: modules.CloudHubModuleName, + } +} + func PolicyControllerMessageLayer() MessageLayer { return &ContextMessageLayer{ SendModuleName: modules.CloudHubModuleName, diff --git a/cloud/pkg/common/modules/modules.go b/cloud/pkg/common/modules/modules.go index 76f748251..f458aee5e 100644 --- a/cloud/pkg/common/modules/modules.go +++ b/cloud/pkg/common/modules/modules.go @@ -16,6 +16,9 @@ const ( NodeUpgradeJobControllerModuleName = "nodeupgradejobcontroller" NodeUpgradeJobControllerModuleGroup = "nodeupgradejobcontroller" + ImagePrePullControllerModuleName = "imageprepullcontroller" + ImagePrePullControllerModuleGroup = "imageprepullcontroller" + SyncControllerModuleName = "synccontroller" SyncControllerModuleGroup = "synccontroller" diff --git a/cloud/pkg/common/util/util.go b/cloud/pkg/common/util/util.go new file mode 100644 index 000000000..a80ba1c08 --- /dev/null +++ b/cloud/pkg/common/util/util.go @@ -0,0 +1,55 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + metav1 "k8s.io/api/core/v1" + + "github.com/kubeedge/kubeedge/common/constants" +) + +// IsEdgeNode checks whether a node is an Edge Node +// only if label {"node-role.kubernetes.io/edge": ""} exists, it is an edge node +func IsEdgeNode(node *metav1.Node) bool { + if node.Labels == nil { + return false + } + if _, ok := node.Labels[constants.EdgeNodeRoleKey]; !ok { + return false + } + + if node.Labels[constants.EdgeNodeRoleKey] != constants.EdgeNodeRoleValue { + return false + } + + return true +} + +// RemoveDuplicateElement deduplicate +func RemoveDuplicateElement[T any](s []T) []T { + result := make([]T, 0, len(s)) + temp := make(map[any]struct{}, len(s)) + + for _, item := range s { + if _, ok := temp[item]; !ok { + temp[item] = struct{}{} + result = append(result, item) + } + } + + return result +} diff --git a/cloud/pkg/common/util/util_test.go b/cloud/pkg/common/util/util_test.go new file mode 100644 index 000000000..749d7596a --- /dev/null +++ b/cloud/pkg/common/util/util_test.go @@ -0,0 +1,55 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "reflect" + "testing" +) + +func TestRemoveDuplicateElement(t *testing.T) { + tests := []struct { + name string + input []string + expected []string + }{ + { + name: "case 1", + input: []string{"a", "b", "c"}, + expected: []string{"a", "b", "c"}, + }, + { + name: "case 2", + input: []string{"a", "a", "b", "c", "b", "a", "a"}, + expected: []string{"a", "b", "c"}, + }, + { + name: "case 3", + input: []string{}, + expected: []string{}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + result := RemoveDuplicateElement(test.input) + if !reflect.DeepEqual(result, test.expected) { + t.Errorf("Got = %v, Want = %v", result, test.expected) + } + }) + } +} diff --git a/cloud/pkg/imageprepullcontroller/config/config.go b/cloud/pkg/imageprepullcontroller/config/config.go new file mode 100644 index 000000000..88572a842 --- /dev/null +++ b/cloud/pkg/imageprepullcontroller/config/config.go @@ -0,0 +1,38 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "sync" + + "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1" +) + +var Config Configure +var once sync.Once + +type Configure struct { + v1alpha1.ImagePrePullController +} + +func InitConfigure(dc *v1alpha1.ImagePrePullController) { + once.Do(func() { + Config = Configure{ + ImagePrePullController: *dc, + } + }) +} diff --git a/cloud/pkg/imageprepullcontroller/controller/downstream.go b/cloud/pkg/imageprepullcontroller/controller/downstream.go new file mode 100644 index 000000000..f8abfd1c1 --- /dev/null +++ b/cloud/pkg/imageprepullcontroller/controller/downstream.go @@ -0,0 +1,268 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + k8sinformer "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + beehiveContext "github.com/kubeedge/beehive/pkg/core/context" + "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/cloud/pkg/common/client" + "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" + "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer" + "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + "github.com/kubeedge/kubeedge/cloud/pkg/common/util" + "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/manager" + commontypes "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" + crdinformers "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions" +) + +type DownstreamController struct { + kubeClient kubernetes.Interface + informer k8sinformer.SharedInformerFactory + crdClient crdClientset.Interface + messageLayer messagelayer.MessageLayer + + imagePrePullJobManager *manager.ImagePrePullJobManager +} + +// Start DownstreamController +func (dc *DownstreamController) Start() error { + klog.Info("Start ImagePrePullJob Downstream Controller") + go dc.syncImagePrePullJob() + return nil +} + +// syncImagePrePullJob is used to get events from informer +func (dc *DownstreamController) syncImagePrePullJob() { + for { + select { + case <-beehiveContext.Done(): + klog.Info("stop sync ImagePrePullJob") + return + case e := <-dc.imagePrePullJobManager.Events(): + imagePrePull, ok := e.Object.(*v1alpha1.ImagePrePullJob) + if !ok { + klog.Warningf("object type: %T unsupported", e.Object) + continue + } + switch e.Type { + case watch.Added: + dc.imagePrePullJobAdded(imagePrePull) + case watch.Deleted: + dc.imagePrePullJobDeleted(imagePrePull) + case watch.Modified: + dc.imagePrePullJobUpdate(imagePrePull) + default: + klog.Warningf("ImagePrePullJob event type: %s unsupported", e.Type) + } + } + } +} + +// imagePrePullJobAdded is used to process addition of new ImagePrePullJob in apiserver +func (dc *DownstreamController) imagePrePullJobAdded(imagePrePull *v1alpha1.ImagePrePullJob) { + klog.V(4).Infof("add ImagePrePullJob: %v", imagePrePull) + // store in cache map + dc.imagePrePullJobManager.ImagePrePullMap.Store(imagePrePull.Name, imagePrePull) + + // If imagePrePullJob is not initial state, we don't need to send message\ + if imagePrePull.Status.State != v1alpha1.PrePullInitialValue { + klog.Errorf("The ImagePrePullJob %s is already running or completed, don't send message again", imagePrePull.Name) + return + } + + // get node list that need prepull images + var nodesToPrePullImage []string + if len(imagePrePull.Spec.ImagePrePullTemplate.NodeNames) != 0 { + for _, node := range imagePrePull.Spec.ImagePrePullTemplate.NodeNames { + nodeInfo, err := dc.informer.Core().V1().Nodes().Lister().Get(node) + if err != nil { + klog.Errorf("Failed to get node(%s) info: %v", node, err) + continue + } + + if validateNode(nodeInfo) { + nodesToPrePullImage = append(nodesToPrePullImage, nodeInfo.Name) + } + } + } else if imagePrePull.Spec.ImagePrePullTemplate.LabelSelector != nil { + selector, err := metav1.LabelSelectorAsSelector(imagePrePull.Spec.ImagePrePullTemplate.LabelSelector) + if err != nil { + klog.Errorf("LabelSelector(%s) is not valid: %v", imagePrePull.Spec.ImagePrePullTemplate.LabelSelector, err) + return + } + + nodes, err := dc.informer.Core().V1().Nodes().Lister().List(selector) + if err != nil { + klog.Errorf("Failed to get nodes with label %s: %v", selector.String(), err) + return + } + + for _, node := range nodes { + if validateNode(node) { + nodesToPrePullImage = append(nodesToPrePullImage, node.Name) + } + } + } + + // deduplicate: remove duplicate nodes to avoid repeating prepull images on the same node + nodesToPrePullImage = util.RemoveDuplicateElement(nodesToPrePullImage) + + klog.Infof("Filtered finished, images will be prepulled on below nodes\n%v\n", nodesToPrePullImage) + + go func() { + for _, node := range nodesToPrePullImage { + dc.processPrePull(node, imagePrePull) + } + }() +} + +// imagePrePullJobDeleted is used to process deleted ImagePrePullJob in apiServer +func (dc *DownstreamController) imagePrePullJobDeleted(imagePrePull *v1alpha1.ImagePrePullJob) { + // delete drom cache map + dc.imagePrePullJobManager.ImagePrePullMap.Delete(imagePrePull.Name) +} + +// imagePrePullJobUpdate is used to process update of ImagePrePullJob in apiServer +// Now we don't allow update spec, so we only update the cache map in imagePrePullJobUpdate func. +func (dc *DownstreamController) imagePrePullJobUpdate(imagePrePull *v1alpha1.ImagePrePullJob) { + _, ok := dc.imagePrePullJobManager.ImagePrePullMap.Load(imagePrePull.Name) + // store in cache map + dc.imagePrePullJobManager.ImagePrePullMap.Store(imagePrePull.Name, imagePrePull) + if !ok { + klog.Infof("ImagePrePull Job %s not exist, and store it into first", imagePrePull.Name) + // If ImagePrePullJob not present in ImagePrePull map means it is not modified and added. + dc.imagePrePullJobAdded(imagePrePull) + } +} + +// processPrePull process prepull job added and send it to edge nodes. +func (dc *DownstreamController) processPrePull(node string, imagePrePull *v1alpha1.ImagePrePullJob) { + klog.V(4).Infof("begin to send imagePrePull message to %s", node) + + imagePrePullTemplateInfo := imagePrePull.Spec.ImagePrePullTemplate + imagePrePullRequest := commontypes.ImagePrePullJobRequest{ + Images: imagePrePullTemplateInfo.Images, + NodeName: node, + Secret: imagePrePullTemplateInfo.ImageSecret, + RetryTimes: imagePrePullTemplateInfo.RetryTimesOnEachNode, + CheckItems: imagePrePullTemplateInfo.CheckItems, + } + + // handle timeout: could not receive image prepull msg feedback from edge node + // send prepull timeout response message to upstream + go dc.handleImagePrePullJobTimeoutOnEachNode(node, imagePrePull.Name, imagePrePullTemplateInfo.TimeoutSecondsOnEachNode) + + // send prepull msg to edge node + msg := model.NewMessage("") + resource := buildPrePullResource(imagePrePull.Name, node) + msg.BuildRouter(modules.ImagePrePullControllerModuleName, modules.ImagePrePullControllerModuleGroup, resource, ImagePrePull). + FillBody(imagePrePullRequest) + + err := dc.messageLayer.Send(*msg) + if err != nil { + klog.Errorf("Failed to send prepull message %s due to error %v", msg.GetID(), err) + return + } + + // update imagePrePullJob status to prepulling + status := &v1alpha1.ImagePrePullStatus{ + NodeName: node, + State: v1alpha1.PrePulling, + } + err = patchImagePrePullStatus(dc.crdClient, imagePrePull, status) + if err != nil { + klog.Errorf("Failed to mark imagePrePullJob prepulling status: %v", err) + } +} + +// handleImagePrePullJobTimeoutOnEachNode is used to handle the situation that cloud don't receive prepull result +// from edge node within the timeout period. +// If so, the ImagePrePullJobState will update to timeout +func (dc *DownstreamController) handleImagePrePullJobTimeoutOnEachNode(node, jobName string, timeoutSecondsEachNode *uint32) { + var timeout uint32 = 360 + if timeoutSecondsEachNode != nil && *timeoutSecondsEachNode != 0 { + timeout = *timeoutSecondsEachNode + } + + receiveFeedback := false + + _ = wait.Poll(10*time.Second, time.Duration(timeout)*time.Second, func() (bool, error) { + cacheValue, ok := dc.imagePrePullJobManager.ImagePrePullMap.Load(jobName) + if !ok { + receiveFeedback = true + klog.Errorf("ImagePrePullJob %s is not exist", jobName) + return false, fmt.Errorf("imagePrePullJob %s is not exist", jobName) + } + imagePrePullValue := cacheValue.(*v1alpha1.ImagePrePullJob) + for _, statusValue := range imagePrePullValue.Status.Status { + if statusValue.NodeName == node && (statusValue.State == v1alpha1.PrePullSuccessful || statusValue.State == v1alpha1.PrePullFailed) { + receiveFeedback = true + return true, nil + } + break + } + return false, nil + }) + + if receiveFeedback { + return + } + klog.Errorf("TIMEOUT to receive image prepull %s response from edge node %s", jobName, node) + + // construct timeout image prepull response and send it to upstream controller + responseResource := buildPrePullResource(jobName, node) + resp := commontypes.ImagePrePullJobResponse{ + NodeName: node, + State: v1alpha1.PrePullFailed, + Reason: "timeout to receive response from edge", + } + + respMsg := model.NewMessage(""). + BuildRouter(modules.ImagePrePullControllerModuleName, modules.ImagePrePullControllerModuleGroup, responseResource, ImagePrePull). + FillBody(resp) + beehiveContext.Send(modules.ImagePrePullControllerModuleName, *respMsg) +} + +// NewDownstreamController new downstream controller to process downstream imageprepull msg to edge nodes. +func NewDownstreamController(crdInformerFactory crdinformers.SharedInformerFactory) (*DownstreamController, error) { + imagePrePullJobManager, err := manager.NewImagePrePullJobManager(crdInformerFactory.Operations().V1alpha1().ImagePrePullJobs().Informer()) + if err != nil { + klog.Warningf("Create ImagePrePullJob manager failed with error: %s", err) + return nil, err + } + + dc := &DownstreamController{ + kubeClient: client.GetKubeClient(), + informer: informers.GetInformersManager().GetKubeInformerFactory(), + crdClient: client.GetCRDClient(), + imagePrePullJobManager: imagePrePullJobManager, + messageLayer: messagelayer.ImagePrePullControllerMessageLayer(), + } + return dc, nil +} diff --git a/cloud/pkg/imageprepullcontroller/controller/upstream.go b/cloud/pkg/imageprepullcontroller/controller/upstream.go new file mode 100644 index 000000000..98fd28bea --- /dev/null +++ b/cloud/pkg/imageprepullcontroller/controller/upstream.go @@ -0,0 +1,147 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "encoding/json" + + k8sinformer "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + beehiveContext "github.com/kubeedge/beehive/pkg/core/context" + "github.com/kubeedge/beehive/pkg/core/model" + keclient "github.com/kubeedge/kubeedge/cloud/pkg/common/client" + "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" + "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer" + "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/config" + "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" +) + +// UpstreamController subscribe messages from edge and sync to k8s api server +type UpstreamController struct { + // downstream controller to update ImagePrePullJob status in cache + dc *DownstreamController + + kubeClient kubernetes.Interface + informer k8sinformer.SharedInformerFactory + crdClient crdClientset.Interface + messageLayer messagelayer.MessageLayer + // message channel + imagePrePullJobStatusChan chan model.Message +} + +// Start imageprepull UpstreamController +func (uc *UpstreamController) Start() error { + klog.Info("Start ImagePrePullJob Upstream Controller") + + uc.imagePrePullJobStatusChan = make(chan model.Message, config.Config.Buffer.ImagePrePullJobStatus) + go uc.dispatchMessage() + + for i := 0; i < int(config.Config.Load.ImagePrePullJobWorkers); i++ { + go uc.updateImagePrePullJobStatus() + } + return nil +} + +// updateImagePrePullJobStatus update imagePrePullJob status +func (uc *UpstreamController) updateImagePrePullJobStatus() { + for { + select { + case <-beehiveContext.Done(): + klog.Info("Stop update ImagePrePullJob status") + return + case msg := <-uc.imagePrePullJobStatusChan: + klog.V(4).Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource()) + nodeName, jobName, err := parsePrePullresource(msg.GetResource()) + if err != nil { + klog.Errorf("message resource %s is not supported", msg.GetResource()) + continue + } + + oldValue, ok := uc.dc.imagePrePullJobManager.ImagePrePullMap.Load(jobName) + if !ok { + klog.Errorf("ImagePrePullJob %s not exist", jobName) + continue + } + + imagePrePull, ok := oldValue.(*v1alpha1.ImagePrePullJob) + if !ok { + klog.Errorf("ImagePrePullJob info %T is not valid", oldValue) + continue + } + + data, err := msg.GetContentData() + if err != nil { + klog.Errorf("failed to get image prepull content from response msg, err: %v", err) + continue + } + resp := &types.ImagePrePullJobResponse{} + err = json.Unmarshal(data, resp) + if err != nil { + klog.Errorf("Failed to unmarshal image prepull response: %v", err) + continue + } + + status := &v1alpha1.ImagePrePullStatus{ + NodeName: nodeName, + State: resp.State, + Reason: resp.Reason, + ImageStatus: resp.ImageStatus, + } + err = patchImagePrePullStatus(uc.crdClient, imagePrePull, status) + if err != nil { + klog.Errorf("Failed to patch ImagePrePullJob status, err: %v", err) + } + } + } +} + +// dispatchMessage receive the message from edge and write into imagePrePullJobStatusChan +func (uc *UpstreamController) dispatchMessage() { + for { + select { + case <-beehiveContext.Done(): + klog.Info("Stop dispatch ImagePrePullJob upstream message") + return + default: + } + + msg, err := uc.messageLayer.Receive() + if err != nil { + klog.Warningf("Receive message failed, %v", err) + continue + } + + klog.V(4).Infof("ImagePrePullJob upstream controller receive msg %#v", msg) + uc.imagePrePullJobStatusChan <- msg + } +} + +// NewUpstreamController create UpstreamController from config +func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) { + uc := &UpstreamController{ + kubeClient: keclient.GetKubeClient(), + informer: informers.GetInformersManager().GetKubeInformerFactory(), + crdClient: keclient.GetCRDClient(), + messageLayer: messagelayer.ImagePrePullControllerMessageLayer(), + dc: dc, + } + return uc, nil +} diff --git a/cloud/pkg/imageprepullcontroller/controller/util.go b/cloud/pkg/imageprepullcontroller/controller/util.go new file mode 100644 index 000000000..592164833 --- /dev/null +++ b/cloud/pkg/imageprepullcontroller/controller/util.go @@ -0,0 +1,131 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + jsonpatch "github.com/evanphx/json-patch" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apimachineryType "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + + "github.com/kubeedge/kubeedge/cloud/pkg/common/util" + "github.com/kubeedge/kubeedge/common/constants" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" +) + +const ImagePrePull = "prepull" + +func validateNode(node *v1.Node) bool { + if !util.IsEdgeNode(node) { + klog.Warningf("Node(%s) is not edge node", node.Name) + return false + } + + // if node is in NotReady state, cannot prepull images + for _, condition := range node.Status.Conditions { + if condition.Type == v1.NodeReady && condition.Status != v1.ConditionTrue { + klog.Warningf("Node(%s) is in NotReady state", node.Name) + return false + } + } + + return true +} + +// buildPrePullResource build prepull resource in msg send to edge node +func buildPrePullResource(imagePrePullName, nodeName string) string { + resource := fmt.Sprintf("%s/%s/%s/%s", "node", nodeName, ImagePrePull, imagePrePullName) + return resource +} + +func parsePrePullresource(resource string) (string, string, error) { + var nodeName, jobName string + sli := strings.Split(resource, constants.ResourceSep) + if len(sli) != 4 { + return nodeName, jobName, fmt.Errorf("the resource %s is not the standard type", resource) + } + return sli[1], sli[3], nil +} + +func patchImagePrePullStatus(crdClient crdClientset.Interface, imagePrePull *v1alpha1.ImagePrePullJob, status *v1alpha1.ImagePrePullStatus) error { + oldValue := imagePrePull.DeepCopy() + newValue := updateNodeImagePrePullStatus(oldValue, status) + + var completeFlag int + var failedFlag bool + newValue.Status.State = v1alpha1.PrePulling + for _, statusValue := range newValue.Status.Status { + if statusValue.State == v1alpha1.PrePullFailed { + failedFlag = true + completeFlag++ + } + if statusValue.State == v1alpha1.PrePullSuccessful { + completeFlag++ + } + } + if completeFlag == len(newValue.Status.Status) { + if failedFlag { + newValue.Status.State = v1alpha1.PrePullFailed + } else { + newValue.Status.State = v1alpha1.PrePullSuccessful + } + } + + oldData, err := json.Marshal(oldValue) + if err != nil { + return fmt.Errorf("failed to marshal the old ImagePrePullJob(%s): %v", oldValue.Name, err) + } + + newData, err := json.Marshal(newValue) + if err != nil { + return fmt.Errorf("failed to marshal the new ImagePrePullJob(%s): %v", newValue.Name, err) + } + + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return fmt.Errorf("failed to create a merge patch: %v", err) + } + + _, err = crdClient.OperationsV1alpha1().ImagePrePullJobs().Patch(context.TODO(), newValue.Name, apimachineryType.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") + if err != nil { + return fmt.Errorf("failed to patch update ImagePrePullJob status: %v", err) + } + + return nil +} + +func updateNodeImagePrePullStatus(imagePrePull *v1alpha1.ImagePrePullJob, status *v1alpha1.ImagePrePullStatus) *v1alpha1.ImagePrePullJob { + // return value imageprepull cannot populate the input parameter old + newValue := imagePrePull.DeepCopy() + + for index, nodeStatus := range newValue.Status.Status { + if nodeStatus.NodeName == status.NodeName { + newValue.Status.Status[index] = *status + return newValue + } + } + + newValue.Status.Status = append(newValue.Status.Status, *status) + return newValue +} diff --git a/cloud/pkg/imageprepullcontroller/imageprepullcontroller.go b/cloud/pkg/imageprepullcontroller/imageprepullcontroller.go new file mode 100644 index 000000000..4f772d258 --- /dev/null +++ b/cloud/pkg/imageprepullcontroller/imageprepullcontroller.go @@ -0,0 +1,91 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package imageprepullcontroller + +import ( + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/beehive/pkg/core" + "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" + "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/config" + "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/controller" + "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1" +) + +// ImagePrePullController is controller for processing prepull images on edge nodes +type ImagePrePullController struct { + downstream *controller.DownstreamController + upstream *controller.UpstreamController + enable bool +} + +var _ core.Module = (*ImagePrePullController)(nil) + +func newImagePrePullController(enable bool) *ImagePrePullController { + if !enable { + return &ImagePrePullController{enable: enable} + } + downstream, err := controller.NewDownstreamController(informers.GetInformersManager().GetKubeEdgeInformerFactory()) + if err != nil { + klog.Exitf("New ImagePrePull Controller downstream failed with error: %s", err) + } + upstream, err := controller.NewUpstreamController(downstream) + if err != nil { + klog.Exitf("New ImagePrePull Controller upstream failed with error: %s", err) + } + return &ImagePrePullController{ + downstream: downstream, + upstream: upstream, + enable: enable, + } +} + +func Register(dc *v1alpha1.ImagePrePullController) { + config.InitConfigure(dc) + core.Register(newImagePrePullController(dc.Enable)) +} + +// Name of controller +func (uc *ImagePrePullController) Name() string { + return modules.ImagePrePullControllerModuleName +} + +// Group of controller +func (uc *ImagePrePullController) Group() string { + return modules.ImagePrePullControllerModuleGroup +} + +// Enable indicates whether enable this module +func (uc *ImagePrePullController) Enable() bool { + return uc.enable +} + +// Start controller +func (uc *ImagePrePullController) Start() { + if err := uc.downstream.Start(); err != nil { + klog.Exitf("start ImagePrePullJob controller downstream failed with error: %s", err) + } + // wait for downstream controller to start and load ImagePrePullJob + // TODO think about sync + time.Sleep(1 * time.Second) + if err := uc.upstream.Start(); err != nil { + klog.Exitf("start ImagePrePullJob controller upstream failed with error: %s", err) + } +} diff --git a/cloud/pkg/imageprepullcontroller/manager/common.go b/cloud/pkg/imageprepullcontroller/manager/common.go new file mode 100644 index 000000000..9f371875a --- /dev/null +++ b/cloud/pkg/imageprepullcontroller/manager/common.go @@ -0,0 +1,62 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2" +) + +// Manager define the interface of a Manager, ImagePrePullJob Manager implement it +type Manager interface { + Events() chan watch.Event +} + +// CommonResourceEventHandler can be used by ImagePrePullJob Manager +type CommonResourceEventHandler struct { + events chan watch.Event +} + +func (c *CommonResourceEventHandler) obj2Event(t watch.EventType, obj interface{}) { + eventObj, ok := obj.(runtime.Object) + if !ok { + klog.Warningf("unknown type: %T, ignore", obj) + return + } + c.events <- watch.Event{Type: t, Object: eventObj} +} + +// OnAdd handle Add event +func (c *CommonResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool) { + c.obj2Event(watch.Added, obj) +} + +// OnUpdate handle Update event +func (c *CommonResourceEventHandler) OnUpdate(oldObj, newObj interface{}) { + c.obj2Event(watch.Modified, newObj) +} + +// OnDelete handle Delete event +func (c *CommonResourceEventHandler) OnDelete(obj interface{}) { + c.obj2Event(watch.Deleted, obj) +} + +// NewCommonResourceEventHandler create CommonResourceEventHandler used by ImagePrePullJob Manager +func NewCommonResourceEventHandler(events chan watch.Event) *CommonResourceEventHandler { + return &CommonResourceEventHandler{events: events} +} diff --git a/cloud/pkg/imageprepullcontroller/manager/imageprepull.go b/cloud/pkg/imageprepullcontroller/manager/imageprepull.go new file mode 100644 index 000000000..cdb2eaccc --- /dev/null +++ b/cloud/pkg/imageprepullcontroller/manager/imageprepull.go @@ -0,0 +1,52 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "sync" + + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + + "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/config" +) + +// ImagePrePullJobManager is a manager watch ImagePrePullJob change event +type ImagePrePullJobManager struct { + // events from watch kubernetes api server + events chan watch.Event + + // ImagePrePullMap, key is ImagePrePullJob.Name, value is *v1alpha1.ImagePrePullJob{} + ImagePrePullMap sync.Map +} + +// Events return a channel, can receive all ImagePrePullJob event +func (dmm *ImagePrePullJobManager) Events() chan watch.Event { + return dmm.events +} + +// NewImagePrePullJobManager create ImagePrePullJobManager from config +func NewImagePrePullJobManager(si cache.SharedIndexInformer) (*ImagePrePullJobManager, error) { + events := make(chan watch.Event, config.Config.Buffer.ImagePrePullJobEvent) + rh := NewCommonResourceEventHandler(events) + _, err := si.AddEventHandler(rh) + if err != nil { + return nil, err + } + + return &ImagePrePullJobManager{events: events}, nil +} diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go b/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go index 8e4f96d98..6efee8012 100644 --- a/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go +++ b/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go @@ -38,6 +38,7 @@ import ( "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer" "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + "github.com/kubeedge/kubeedge/cloud/pkg/common/util" "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller/manager" "github.com/kubeedge/kubeedge/common/constants" commontypes "github.com/kubeedge/kubeedge/common/types" @@ -143,7 +144,7 @@ func (dc *DownstreamController) nodeUpgradeJobAdded(upgrade *v1alpha1.NodeUpgrad } // deduplicate: remove duplicate nodes to avoid repeating upgrade to the same node - nodesToUpgrade = RemoveDuplicateElement(nodesToUpgrade) + nodesToUpgrade = util.RemoveDuplicateElement(nodesToUpgrade) klog.Infof("Filtered finished, the below nodes are to upgrade\n%v\n", nodesToUpgrade) @@ -200,7 +201,7 @@ func (dc *DownstreamController) processUpgrade(node string, upgrade *v1alpha1.No return } - // process time out: cloud did not receive upgrade feedback from edge + // process time out: could not receive upgrade feedback from edge // send upgrade timeout response message to upstream go dc.handleNodeUpgradeJobTimeout(node, upgrade.Name, upgrade.Spec.Version, upgradeReq.HistoryID, upgrade.Spec.TimeoutSeconds) @@ -303,7 +304,7 @@ func needUpgrade(node *v1.Node, upgradeVersion string) bool { } // we only care about edge nodes, so just remove not edge nodes - if !isEdgeNode(node) { + if !util.IsEdgeNode(node) { klog.Warningf("Node(%s) is not edge node", node.Name) return false } diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/util.go b/cloud/pkg/nodeupgradejobcontroller/controller/util.go index 68a8e95ce..8b80c4261 100644 --- a/cloud/pkg/nodeupgradejobcontroller/controller/util.go +++ b/cloud/pkg/nodeupgradejobcontroller/controller/util.go @@ -22,9 +22,7 @@ import ( "time" "github.com/distribution/distribution/v3/reference" - metav1 "k8s.io/api/core/v1" - "github.com/kubeedge/kubeedge/common/constants" "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" ) @@ -55,23 +53,6 @@ func filterVersion(version string, expected string) bool { return version[index+length:] == expected } -// isEdgeNode checks whether a node is an Edge Node -// only if label {"node-role.kubernetes.io/edge": ""} exists, it is an edge node -func isEdgeNode(node *metav1.Node) bool { - if node.Labels == nil { - return false - } - if _, ok := node.Labels[constants.EdgeNodeRoleKey]; !ok { - return false - } - - if node.Labels[constants.EdgeNodeRoleKey] != constants.EdgeNodeRoleValue { - return false - } - - return true -} - // isCompleted returns true only if some/all edge upgrade is upgrading or completed func isCompleted(upgrade *v1alpha1.NodeUpgradeJob) bool { // all edge node upgrade is upgrading or completed @@ -89,21 +70,6 @@ func isCompleted(upgrade *v1alpha1.NodeUpgradeJob) bool { return false } -// RemoveDuplicateElement deduplicate -func RemoveDuplicateElement(s []string) []string { - result := make([]string, 0, len(s)) - temp := make(map[string]struct{}, len(s)) - - for _, item := range s { - if _, ok := temp[item]; !ok { - temp[item] = struct{}{} - result = append(result, item) - } - } - - return result -} - // UpdateNodeUpgradeJobStatus updates the status // return the updated result func UpdateNodeUpgradeJobStatus(old *v1alpha1.NodeUpgradeJob, status *v1alpha1.UpgradeStatus) *v1alpha1.NodeUpgradeJob { diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go b/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go index bfc93c7eb..499da62fa 100644 --- a/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go +++ b/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go @@ -66,39 +66,6 @@ func TestFilterVersion(t *testing.T) { } } -func TestRemoveDuplicateElement(t *testing.T) { - tests := []struct { - name string - input []string - expected []string - }{ - { - name: "case 1", - input: []string{"a", "b", "c"}, - expected: []string{"a", "b", "c"}, - }, - { - name: "case 2", - input: []string{"a", "a", "b", "c", "b", "a", "a"}, - expected: []string{"a", "b", "c"}, - }, - { - name: "case 3", - input: []string{}, - expected: []string{}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - result := RemoveDuplicateElement(test.input) - if !reflect.DeepEqual(result, test.expected) { - t.Errorf("Got = %v, Want = %v", result, test.expected) - } - }) - } -} - func TestUpdateUpgradeStatus(t *testing.T) { upgrade := v1alpha1.NodeUpgradeJob{ Status: v1alpha1.NodeUpgradeJobStatus{ |
