summaryrefslogtreecommitdiff
path: root/cloud
diff options
context:
space:
mode:
authorShelley-BaoYue <baoyue2@huawei.com>2024-01-03 10:49:28 +0800
committerShelley-BaoYue <baoyue2@huawei.com>2024-01-03 11:44:15 +0800
commitc5a307193a8a4494e921a25edca37216d8c14653 (patch)
tree612ed95f31cd244023d2699a89ab40e20a849d77 /cloud
parentadd image prepull api (diff)
downloadkubeedge-c5a307193a8a4494e921a25edca37216d8c14653.tar.gz
support imageprepull feature
Signed-off-by: Shelley-BaoYue <baoyue2@huawei.com>
Diffstat (limited to 'cloud')
-rw-r--r--cloud/cmd/cloudcore/app/server.go2
-rw-r--r--cloud/pkg/cloudhub/dispatcher/message_dispatcher.go6
-rw-r--r--cloud/pkg/common/messagelayer/context.go8
-rw-r--r--cloud/pkg/common/modules/modules.go3
-rw-r--r--cloud/pkg/common/util/util.go55
-rw-r--r--cloud/pkg/common/util/util_test.go55
-rw-r--r--cloud/pkg/imageprepullcontroller/config/config.go38
-rw-r--r--cloud/pkg/imageprepullcontroller/controller/downstream.go268
-rw-r--r--cloud/pkg/imageprepullcontroller/controller/upstream.go147
-rw-r--r--cloud/pkg/imageprepullcontroller/controller/util.go131
-rw-r--r--cloud/pkg/imageprepullcontroller/imageprepullcontroller.go91
-rw-r--r--cloud/pkg/imageprepullcontroller/manager/common.go62
-rw-r--r--cloud/pkg/imageprepullcontroller/manager/imageprepull.go52
-rw-r--r--cloud/pkg/nodeupgradejobcontroller/controller/downstream.go7
-rw-r--r--cloud/pkg/nodeupgradejobcontroller/controller/util.go34
-rw-r--r--cloud/pkg/nodeupgradejobcontroller/controller/util_test.go33
16 files changed, 921 insertions, 71 deletions
diff --git a/cloud/cmd/cloudcore/app/server.go b/cloud/cmd/cloudcore/app/server.go
index e42f6cf3b..fd346cd72 100644
--- a/cloud/cmd/cloudcore/app/server.go
+++ b/cloud/cmd/cloudcore/app/server.go
@@ -48,6 +48,7 @@ import (
"github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller"
"github.com/kubeedge/kubeedge/cloud/pkg/dynamiccontroller"
"github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller"
+ "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller"
"github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller"
"github.com/kubeedge/kubeedge/cloud/pkg/policycontroller"
"github.com/kubeedge/kubeedge/cloud/pkg/router"
@@ -158,6 +159,7 @@ func registerModules(c *v1alpha1.CloudCoreConfig) {
cloudhub.Register(c.Modules.CloudHub)
edgecontroller.Register(c.Modules.EdgeController)
devicecontroller.Register(c.Modules.DeviceController)
+ imageprepullcontroller.Register(c.Modules.ImagePrePullController)
nodeupgradejobcontroller.Register(c.Modules.NodeUpgradeJobController)
synccontroller.Register(c.Modules.SyncController)
cloudstream.Register(c.Modules.CloudStream, c.CommonConfig)
diff --git a/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go b/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go
index dd953fa57..a236d3ebd 100644
--- a/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go
+++ b/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go
@@ -33,6 +33,7 @@ import (
"github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/session"
"github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer"
"github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
+ imageprepullcontroller "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/controller"
"github.com/kubeedge/kubeedge/cloud/pkg/synccontroller"
commonconst "github.com/kubeedge/kubeedge/common/constants"
v2 "github.com/kubeedge/kubeedge/edge/pkg/metamanager/dao/v2"
@@ -183,6 +184,9 @@ func (md *messageDispatcher) DispatchUpstream(message *beehivemodel.Message, inf
message.Router.Resource = fmt.Sprintf("node/%s/%s", info.NodeID, message.Router.Resource)
beehivecontext.Send(modules.RouterModuleName, *message)
+ case message.GetOperation() == imageprepullcontroller.ImagePrePull:
+ beehivecontext.SendToGroup(modules.ImagePrePullControllerModuleGroup, *message)
+
default:
err := md.PubToController(info, message)
if err != nil {
@@ -447,7 +451,7 @@ func noAckRequired(msg *beehivemodel.Message) bool {
return true
case msg.GetGroup() == modules.UserGroup:
return true
- case msg.GetSource() == modules.NodeUpgradeJobControllerModuleName:
+ case msg.GetSource() == modules.NodeUpgradeJobControllerModuleName || msg.GetSource() == modules.ImagePrePullControllerModuleName:
return true
case msg.GetOperation() == beehivemodel.ResponseOperation:
content, ok := msg.Content.(string)
diff --git a/cloud/pkg/common/messagelayer/context.go b/cloud/pkg/common/messagelayer/context.go
index 7aeb2afdc..8a2e9d4bd 100644
--- a/cloud/pkg/common/messagelayer/context.go
+++ b/cloud/pkg/common/messagelayer/context.go
@@ -104,6 +104,14 @@ func NodeUpgradeJobControllerMessageLayer() MessageLayer {
}
}
+func ImagePrePullControllerMessageLayer() MessageLayer {
+ return &ContextMessageLayer{
+ SendModuleName: modules.CloudHubModuleName,
+ ReceiveModuleName: modules.ImagePrePullControllerModuleName,
+ ResponseModuleName: modules.CloudHubModuleName,
+ }
+}
+
func PolicyControllerMessageLayer() MessageLayer {
return &ContextMessageLayer{
SendModuleName: modules.CloudHubModuleName,
diff --git a/cloud/pkg/common/modules/modules.go b/cloud/pkg/common/modules/modules.go
index 76f748251..f458aee5e 100644
--- a/cloud/pkg/common/modules/modules.go
+++ b/cloud/pkg/common/modules/modules.go
@@ -16,6 +16,9 @@ const (
NodeUpgradeJobControllerModuleName = "nodeupgradejobcontroller"
NodeUpgradeJobControllerModuleGroup = "nodeupgradejobcontroller"
+ ImagePrePullControllerModuleName = "imageprepullcontroller"
+ ImagePrePullControllerModuleGroup = "imageprepullcontroller"
+
SyncControllerModuleName = "synccontroller"
SyncControllerModuleGroup = "synccontroller"
diff --git a/cloud/pkg/common/util/util.go b/cloud/pkg/common/util/util.go
new file mode 100644
index 000000000..a80ba1c08
--- /dev/null
+++ b/cloud/pkg/common/util/util.go
@@ -0,0 +1,55 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package util
+
+import (
+ metav1 "k8s.io/api/core/v1"
+
+ "github.com/kubeedge/kubeedge/common/constants"
+)
+
+// IsEdgeNode checks whether a node is an Edge Node
+// only if label {"node-role.kubernetes.io/edge": ""} exists, it is an edge node
+func IsEdgeNode(node *metav1.Node) bool {
+ if node.Labels == nil {
+ return false
+ }
+ if _, ok := node.Labels[constants.EdgeNodeRoleKey]; !ok {
+ return false
+ }
+
+ if node.Labels[constants.EdgeNodeRoleKey] != constants.EdgeNodeRoleValue {
+ return false
+ }
+
+ return true
+}
+
+// RemoveDuplicateElement deduplicate
+func RemoveDuplicateElement[T any](s []T) []T {
+ result := make([]T, 0, len(s))
+ temp := make(map[any]struct{}, len(s))
+
+ for _, item := range s {
+ if _, ok := temp[item]; !ok {
+ temp[item] = struct{}{}
+ result = append(result, item)
+ }
+ }
+
+ return result
+}
diff --git a/cloud/pkg/common/util/util_test.go b/cloud/pkg/common/util/util_test.go
new file mode 100644
index 000000000..749d7596a
--- /dev/null
+++ b/cloud/pkg/common/util/util_test.go
@@ -0,0 +1,55 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package util
+
+import (
+ "reflect"
+ "testing"
+)
+
+func TestRemoveDuplicateElement(t *testing.T) {
+ tests := []struct {
+ name string
+ input []string
+ expected []string
+ }{
+ {
+ name: "case 1",
+ input: []string{"a", "b", "c"},
+ expected: []string{"a", "b", "c"},
+ },
+ {
+ name: "case 2",
+ input: []string{"a", "a", "b", "c", "b", "a", "a"},
+ expected: []string{"a", "b", "c"},
+ },
+ {
+ name: "case 3",
+ input: []string{},
+ expected: []string{},
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ result := RemoveDuplicateElement(test.input)
+ if !reflect.DeepEqual(result, test.expected) {
+ t.Errorf("Got = %v, Want = %v", result, test.expected)
+ }
+ })
+ }
+}
diff --git a/cloud/pkg/imageprepullcontroller/config/config.go b/cloud/pkg/imageprepullcontroller/config/config.go
new file mode 100644
index 000000000..88572a842
--- /dev/null
+++ b/cloud/pkg/imageprepullcontroller/config/config.go
@@ -0,0 +1,38 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package config
+
+import (
+ "sync"
+
+ "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1"
+)
+
+var Config Configure
+var once sync.Once
+
+type Configure struct {
+ v1alpha1.ImagePrePullController
+}
+
+func InitConfigure(dc *v1alpha1.ImagePrePullController) {
+ once.Do(func() {
+ Config = Configure{
+ ImagePrePullController: *dc,
+ }
+ })
+}
diff --git a/cloud/pkg/imageprepullcontroller/controller/downstream.go b/cloud/pkg/imageprepullcontroller/controller/downstream.go
new file mode 100644
index 000000000..f8abfd1c1
--- /dev/null
+++ b/cloud/pkg/imageprepullcontroller/controller/downstream.go
@@ -0,0 +1,268 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ "fmt"
+ "time"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/apimachinery/pkg/watch"
+ k8sinformer "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/klog/v2"
+
+ beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
+ "github.com/kubeedge/beehive/pkg/core/model"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/client"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/informers"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/util"
+ "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/manager"
+ commontypes "github.com/kubeedge/kubeedge/common/types"
+ "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
+ crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned"
+ crdinformers "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions"
+)
+
+type DownstreamController struct {
+ kubeClient kubernetes.Interface
+ informer k8sinformer.SharedInformerFactory
+ crdClient crdClientset.Interface
+ messageLayer messagelayer.MessageLayer
+
+ imagePrePullJobManager *manager.ImagePrePullJobManager
+}
+
+// Start DownstreamController
+func (dc *DownstreamController) Start() error {
+ klog.Info("Start ImagePrePullJob Downstream Controller")
+ go dc.syncImagePrePullJob()
+ return nil
+}
+
+// syncImagePrePullJob is used to get events from informer
+func (dc *DownstreamController) syncImagePrePullJob() {
+ for {
+ select {
+ case <-beehiveContext.Done():
+ klog.Info("stop sync ImagePrePullJob")
+ return
+ case e := <-dc.imagePrePullJobManager.Events():
+ imagePrePull, ok := e.Object.(*v1alpha1.ImagePrePullJob)
+ if !ok {
+ klog.Warningf("object type: %T unsupported", e.Object)
+ continue
+ }
+ switch e.Type {
+ case watch.Added:
+ dc.imagePrePullJobAdded(imagePrePull)
+ case watch.Deleted:
+ dc.imagePrePullJobDeleted(imagePrePull)
+ case watch.Modified:
+ dc.imagePrePullJobUpdate(imagePrePull)
+ default:
+ klog.Warningf("ImagePrePullJob event type: %s unsupported", e.Type)
+ }
+ }
+ }
+}
+
+// imagePrePullJobAdded is used to process addition of new ImagePrePullJob in apiserver
+func (dc *DownstreamController) imagePrePullJobAdded(imagePrePull *v1alpha1.ImagePrePullJob) {
+ klog.V(4).Infof("add ImagePrePullJob: %v", imagePrePull)
+ // store in cache map
+ dc.imagePrePullJobManager.ImagePrePullMap.Store(imagePrePull.Name, imagePrePull)
+
+ // If imagePrePullJob is not initial state, we don't need to send message\
+ if imagePrePull.Status.State != v1alpha1.PrePullInitialValue {
+ klog.Errorf("The ImagePrePullJob %s is already running or completed, don't send message again", imagePrePull.Name)
+ return
+ }
+
+ // get node list that need prepull images
+ var nodesToPrePullImage []string
+ if len(imagePrePull.Spec.ImagePrePullTemplate.NodeNames) != 0 {
+ for _, node := range imagePrePull.Spec.ImagePrePullTemplate.NodeNames {
+ nodeInfo, err := dc.informer.Core().V1().Nodes().Lister().Get(node)
+ if err != nil {
+ klog.Errorf("Failed to get node(%s) info: %v", node, err)
+ continue
+ }
+
+ if validateNode(nodeInfo) {
+ nodesToPrePullImage = append(nodesToPrePullImage, nodeInfo.Name)
+ }
+ }
+ } else if imagePrePull.Spec.ImagePrePullTemplate.LabelSelector != nil {
+ selector, err := metav1.LabelSelectorAsSelector(imagePrePull.Spec.ImagePrePullTemplate.LabelSelector)
+ if err != nil {
+ klog.Errorf("LabelSelector(%s) is not valid: %v", imagePrePull.Spec.ImagePrePullTemplate.LabelSelector, err)
+ return
+ }
+
+ nodes, err := dc.informer.Core().V1().Nodes().Lister().List(selector)
+ if err != nil {
+ klog.Errorf("Failed to get nodes with label %s: %v", selector.String(), err)
+ return
+ }
+
+ for _, node := range nodes {
+ if validateNode(node) {
+ nodesToPrePullImage = append(nodesToPrePullImage, node.Name)
+ }
+ }
+ }
+
+ // deduplicate: remove duplicate nodes to avoid repeating prepull images on the same node
+ nodesToPrePullImage = util.RemoveDuplicateElement(nodesToPrePullImage)
+
+ klog.Infof("Filtered finished, images will be prepulled on below nodes\n%v\n", nodesToPrePullImage)
+
+ go func() {
+ for _, node := range nodesToPrePullImage {
+ dc.processPrePull(node, imagePrePull)
+ }
+ }()
+}
+
+// imagePrePullJobDeleted is used to process deleted ImagePrePullJob in apiServer
+func (dc *DownstreamController) imagePrePullJobDeleted(imagePrePull *v1alpha1.ImagePrePullJob) {
+ // delete drom cache map
+ dc.imagePrePullJobManager.ImagePrePullMap.Delete(imagePrePull.Name)
+}
+
+// imagePrePullJobUpdate is used to process update of ImagePrePullJob in apiServer
+// Now we don't allow update spec, so we only update the cache map in imagePrePullJobUpdate func.
+func (dc *DownstreamController) imagePrePullJobUpdate(imagePrePull *v1alpha1.ImagePrePullJob) {
+ _, ok := dc.imagePrePullJobManager.ImagePrePullMap.Load(imagePrePull.Name)
+ // store in cache map
+ dc.imagePrePullJobManager.ImagePrePullMap.Store(imagePrePull.Name, imagePrePull)
+ if !ok {
+ klog.Infof("ImagePrePull Job %s not exist, and store it into first", imagePrePull.Name)
+ // If ImagePrePullJob not present in ImagePrePull map means it is not modified and added.
+ dc.imagePrePullJobAdded(imagePrePull)
+ }
+}
+
+// processPrePull process prepull job added and send it to edge nodes.
+func (dc *DownstreamController) processPrePull(node string, imagePrePull *v1alpha1.ImagePrePullJob) {
+ klog.V(4).Infof("begin to send imagePrePull message to %s", node)
+
+ imagePrePullTemplateInfo := imagePrePull.Spec.ImagePrePullTemplate
+ imagePrePullRequest := commontypes.ImagePrePullJobRequest{
+ Images: imagePrePullTemplateInfo.Images,
+ NodeName: node,
+ Secret: imagePrePullTemplateInfo.ImageSecret,
+ RetryTimes: imagePrePullTemplateInfo.RetryTimesOnEachNode,
+ CheckItems: imagePrePullTemplateInfo.CheckItems,
+ }
+
+ // handle timeout: could not receive image prepull msg feedback from edge node
+ // send prepull timeout response message to upstream
+ go dc.handleImagePrePullJobTimeoutOnEachNode(node, imagePrePull.Name, imagePrePullTemplateInfo.TimeoutSecondsOnEachNode)
+
+ // send prepull msg to edge node
+ msg := model.NewMessage("")
+ resource := buildPrePullResource(imagePrePull.Name, node)
+ msg.BuildRouter(modules.ImagePrePullControllerModuleName, modules.ImagePrePullControllerModuleGroup, resource, ImagePrePull).
+ FillBody(imagePrePullRequest)
+
+ err := dc.messageLayer.Send(*msg)
+ if err != nil {
+ klog.Errorf("Failed to send prepull message %s due to error %v", msg.GetID(), err)
+ return
+ }
+
+ // update imagePrePullJob status to prepulling
+ status := &v1alpha1.ImagePrePullStatus{
+ NodeName: node,
+ State: v1alpha1.PrePulling,
+ }
+ err = patchImagePrePullStatus(dc.crdClient, imagePrePull, status)
+ if err != nil {
+ klog.Errorf("Failed to mark imagePrePullJob prepulling status: %v", err)
+ }
+}
+
+// handleImagePrePullJobTimeoutOnEachNode is used to handle the situation that cloud don't receive prepull result
+// from edge node within the timeout period.
+// If so, the ImagePrePullJobState will update to timeout
+func (dc *DownstreamController) handleImagePrePullJobTimeoutOnEachNode(node, jobName string, timeoutSecondsEachNode *uint32) {
+ var timeout uint32 = 360
+ if timeoutSecondsEachNode != nil && *timeoutSecondsEachNode != 0 {
+ timeout = *timeoutSecondsEachNode
+ }
+
+ receiveFeedback := false
+
+ _ = wait.Poll(10*time.Second, time.Duration(timeout)*time.Second, func() (bool, error) {
+ cacheValue, ok := dc.imagePrePullJobManager.ImagePrePullMap.Load(jobName)
+ if !ok {
+ receiveFeedback = true
+ klog.Errorf("ImagePrePullJob %s is not exist", jobName)
+ return false, fmt.Errorf("imagePrePullJob %s is not exist", jobName)
+ }
+ imagePrePullValue := cacheValue.(*v1alpha1.ImagePrePullJob)
+ for _, statusValue := range imagePrePullValue.Status.Status {
+ if statusValue.NodeName == node && (statusValue.State == v1alpha1.PrePullSuccessful || statusValue.State == v1alpha1.PrePullFailed) {
+ receiveFeedback = true
+ return true, nil
+ }
+ break
+ }
+ return false, nil
+ })
+
+ if receiveFeedback {
+ return
+ }
+ klog.Errorf("TIMEOUT to receive image prepull %s response from edge node %s", jobName, node)
+
+ // construct timeout image prepull response and send it to upstream controller
+ responseResource := buildPrePullResource(jobName, node)
+ resp := commontypes.ImagePrePullJobResponse{
+ NodeName: node,
+ State: v1alpha1.PrePullFailed,
+ Reason: "timeout to receive response from edge",
+ }
+
+ respMsg := model.NewMessage("").
+ BuildRouter(modules.ImagePrePullControllerModuleName, modules.ImagePrePullControllerModuleGroup, responseResource, ImagePrePull).
+ FillBody(resp)
+ beehiveContext.Send(modules.ImagePrePullControllerModuleName, *respMsg)
+}
+
+// NewDownstreamController new downstream controller to process downstream imageprepull msg to edge nodes.
+func NewDownstreamController(crdInformerFactory crdinformers.SharedInformerFactory) (*DownstreamController, error) {
+ imagePrePullJobManager, err := manager.NewImagePrePullJobManager(crdInformerFactory.Operations().V1alpha1().ImagePrePullJobs().Informer())
+ if err != nil {
+ klog.Warningf("Create ImagePrePullJob manager failed with error: %s", err)
+ return nil, err
+ }
+
+ dc := &DownstreamController{
+ kubeClient: client.GetKubeClient(),
+ informer: informers.GetInformersManager().GetKubeInformerFactory(),
+ crdClient: client.GetCRDClient(),
+ imagePrePullJobManager: imagePrePullJobManager,
+ messageLayer: messagelayer.ImagePrePullControllerMessageLayer(),
+ }
+ return dc, nil
+}
diff --git a/cloud/pkg/imageprepullcontroller/controller/upstream.go b/cloud/pkg/imageprepullcontroller/controller/upstream.go
new file mode 100644
index 000000000..98fd28bea
--- /dev/null
+++ b/cloud/pkg/imageprepullcontroller/controller/upstream.go
@@ -0,0 +1,147 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ "encoding/json"
+
+ k8sinformer "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/klog/v2"
+
+ beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
+ "github.com/kubeedge/beehive/pkg/core/model"
+ keclient "github.com/kubeedge/kubeedge/cloud/pkg/common/client"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/informers"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer"
+ "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/config"
+ "github.com/kubeedge/kubeedge/common/types"
+ "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
+ crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned"
+)
+
+// UpstreamController subscribe messages from edge and sync to k8s api server
+type UpstreamController struct {
+ // downstream controller to update ImagePrePullJob status in cache
+ dc *DownstreamController
+
+ kubeClient kubernetes.Interface
+ informer k8sinformer.SharedInformerFactory
+ crdClient crdClientset.Interface
+ messageLayer messagelayer.MessageLayer
+ // message channel
+ imagePrePullJobStatusChan chan model.Message
+}
+
+// Start imageprepull UpstreamController
+func (uc *UpstreamController) Start() error {
+ klog.Info("Start ImagePrePullJob Upstream Controller")
+
+ uc.imagePrePullJobStatusChan = make(chan model.Message, config.Config.Buffer.ImagePrePullJobStatus)
+ go uc.dispatchMessage()
+
+ for i := 0; i < int(config.Config.Load.ImagePrePullJobWorkers); i++ {
+ go uc.updateImagePrePullJobStatus()
+ }
+ return nil
+}
+
+// updateImagePrePullJobStatus update imagePrePullJob status
+func (uc *UpstreamController) updateImagePrePullJobStatus() {
+ for {
+ select {
+ case <-beehiveContext.Done():
+ klog.Info("Stop update ImagePrePullJob status")
+ return
+ case msg := <-uc.imagePrePullJobStatusChan:
+ klog.V(4).Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource())
+ nodeName, jobName, err := parsePrePullresource(msg.GetResource())
+ if err != nil {
+ klog.Errorf("message resource %s is not supported", msg.GetResource())
+ continue
+ }
+
+ oldValue, ok := uc.dc.imagePrePullJobManager.ImagePrePullMap.Load(jobName)
+ if !ok {
+ klog.Errorf("ImagePrePullJob %s not exist", jobName)
+ continue
+ }
+
+ imagePrePull, ok := oldValue.(*v1alpha1.ImagePrePullJob)
+ if !ok {
+ klog.Errorf("ImagePrePullJob info %T is not valid", oldValue)
+ continue
+ }
+
+ data, err := msg.GetContentData()
+ if err != nil {
+ klog.Errorf("failed to get image prepull content from response msg, err: %v", err)
+ continue
+ }
+ resp := &types.ImagePrePullJobResponse{}
+ err = json.Unmarshal(data, resp)
+ if err != nil {
+ klog.Errorf("Failed to unmarshal image prepull response: %v", err)
+ continue
+ }
+
+ status := &v1alpha1.ImagePrePullStatus{
+ NodeName: nodeName,
+ State: resp.State,
+ Reason: resp.Reason,
+ ImageStatus: resp.ImageStatus,
+ }
+ err = patchImagePrePullStatus(uc.crdClient, imagePrePull, status)
+ if err != nil {
+ klog.Errorf("Failed to patch ImagePrePullJob status, err: %v", err)
+ }
+ }
+ }
+}
+
+// dispatchMessage receive the message from edge and write into imagePrePullJobStatusChan
+func (uc *UpstreamController) dispatchMessage() {
+ for {
+ select {
+ case <-beehiveContext.Done():
+ klog.Info("Stop dispatch ImagePrePullJob upstream message")
+ return
+ default:
+ }
+
+ msg, err := uc.messageLayer.Receive()
+ if err != nil {
+ klog.Warningf("Receive message failed, %v", err)
+ continue
+ }
+
+ klog.V(4).Infof("ImagePrePullJob upstream controller receive msg %#v", msg)
+ uc.imagePrePullJobStatusChan <- msg
+ }
+}
+
+// NewUpstreamController create UpstreamController from config
+func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) {
+ uc := &UpstreamController{
+ kubeClient: keclient.GetKubeClient(),
+ informer: informers.GetInformersManager().GetKubeInformerFactory(),
+ crdClient: keclient.GetCRDClient(),
+ messageLayer: messagelayer.ImagePrePullControllerMessageLayer(),
+ dc: dc,
+ }
+ return uc, nil
+}
diff --git a/cloud/pkg/imageprepullcontroller/controller/util.go b/cloud/pkg/imageprepullcontroller/controller/util.go
new file mode 100644
index 000000000..592164833
--- /dev/null
+++ b/cloud/pkg/imageprepullcontroller/controller/util.go
@@ -0,0 +1,131 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "strings"
+
+ jsonpatch "github.com/evanphx/json-patch"
+ v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ apimachineryType "k8s.io/apimachinery/pkg/types"
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/util"
+ "github.com/kubeedge/kubeedge/common/constants"
+ "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
+ crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned"
+)
+
+const ImagePrePull = "prepull"
+
+func validateNode(node *v1.Node) bool {
+ if !util.IsEdgeNode(node) {
+ klog.Warningf("Node(%s) is not edge node", node.Name)
+ return false
+ }
+
+ // if node is in NotReady state, cannot prepull images
+ for _, condition := range node.Status.Conditions {
+ if condition.Type == v1.NodeReady && condition.Status != v1.ConditionTrue {
+ klog.Warningf("Node(%s) is in NotReady state", node.Name)
+ return false
+ }
+ }
+
+ return true
+}
+
+// buildPrePullResource build prepull resource in msg send to edge node
+func buildPrePullResource(imagePrePullName, nodeName string) string {
+ resource := fmt.Sprintf("%s/%s/%s/%s", "node", nodeName, ImagePrePull, imagePrePullName)
+ return resource
+}
+
+func parsePrePullresource(resource string) (string, string, error) {
+ var nodeName, jobName string
+ sli := strings.Split(resource, constants.ResourceSep)
+ if len(sli) != 4 {
+ return nodeName, jobName, fmt.Errorf("the resource %s is not the standard type", resource)
+ }
+ return sli[1], sli[3], nil
+}
+
+func patchImagePrePullStatus(crdClient crdClientset.Interface, imagePrePull *v1alpha1.ImagePrePullJob, status *v1alpha1.ImagePrePullStatus) error {
+ oldValue := imagePrePull.DeepCopy()
+ newValue := updateNodeImagePrePullStatus(oldValue, status)
+
+ var completeFlag int
+ var failedFlag bool
+ newValue.Status.State = v1alpha1.PrePulling
+ for _, statusValue := range newValue.Status.Status {
+ if statusValue.State == v1alpha1.PrePullFailed {
+ failedFlag = true
+ completeFlag++
+ }
+ if statusValue.State == v1alpha1.PrePullSuccessful {
+ completeFlag++
+ }
+ }
+ if completeFlag == len(newValue.Status.Status) {
+ if failedFlag {
+ newValue.Status.State = v1alpha1.PrePullFailed
+ } else {
+ newValue.Status.State = v1alpha1.PrePullSuccessful
+ }
+ }
+
+ oldData, err := json.Marshal(oldValue)
+ if err != nil {
+ return fmt.Errorf("failed to marshal the old ImagePrePullJob(%s): %v", oldValue.Name, err)
+ }
+
+ newData, err := json.Marshal(newValue)
+ if err != nil {
+ return fmt.Errorf("failed to marshal the new ImagePrePullJob(%s): %v", newValue.Name, err)
+ }
+
+ patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
+ if err != nil {
+ return fmt.Errorf("failed to create a merge patch: %v", err)
+ }
+
+ _, err = crdClient.OperationsV1alpha1().ImagePrePullJobs().Patch(context.TODO(), newValue.Name, apimachineryType.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
+ if err != nil {
+ return fmt.Errorf("failed to patch update ImagePrePullJob status: %v", err)
+ }
+
+ return nil
+}
+
+func updateNodeImagePrePullStatus(imagePrePull *v1alpha1.ImagePrePullJob, status *v1alpha1.ImagePrePullStatus) *v1alpha1.ImagePrePullJob {
+ // return value imageprepull cannot populate the input parameter old
+ newValue := imagePrePull.DeepCopy()
+
+ for index, nodeStatus := range newValue.Status.Status {
+ if nodeStatus.NodeName == status.NodeName {
+ newValue.Status.Status[index] = *status
+ return newValue
+ }
+ }
+
+ newValue.Status.Status = append(newValue.Status.Status, *status)
+ return newValue
+}
diff --git a/cloud/pkg/imageprepullcontroller/imageprepullcontroller.go b/cloud/pkg/imageprepullcontroller/imageprepullcontroller.go
new file mode 100644
index 000000000..4f772d258
--- /dev/null
+++ b/cloud/pkg/imageprepullcontroller/imageprepullcontroller.go
@@ -0,0 +1,91 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package imageprepullcontroller
+
+import (
+ "time"
+
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/beehive/pkg/core"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/informers"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
+ "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/config"
+ "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/controller"
+ "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1"
+)
+
+// ImagePrePullController is controller for processing prepull images on edge nodes
+type ImagePrePullController struct {
+ downstream *controller.DownstreamController
+ upstream *controller.UpstreamController
+ enable bool
+}
+
+var _ core.Module = (*ImagePrePullController)(nil)
+
+func newImagePrePullController(enable bool) *ImagePrePullController {
+ if !enable {
+ return &ImagePrePullController{enable: enable}
+ }
+ downstream, err := controller.NewDownstreamController(informers.GetInformersManager().GetKubeEdgeInformerFactory())
+ if err != nil {
+ klog.Exitf("New ImagePrePull Controller downstream failed with error: %s", err)
+ }
+ upstream, err := controller.NewUpstreamController(downstream)
+ if err != nil {
+ klog.Exitf("New ImagePrePull Controller upstream failed with error: %s", err)
+ }
+ return &ImagePrePullController{
+ downstream: downstream,
+ upstream: upstream,
+ enable: enable,
+ }
+}
+
+func Register(dc *v1alpha1.ImagePrePullController) {
+ config.InitConfigure(dc)
+ core.Register(newImagePrePullController(dc.Enable))
+}
+
+// Name of controller
+func (uc *ImagePrePullController) Name() string {
+ return modules.ImagePrePullControllerModuleName
+}
+
+// Group of controller
+func (uc *ImagePrePullController) Group() string {
+ return modules.ImagePrePullControllerModuleGroup
+}
+
+// Enable indicates whether enable this module
+func (uc *ImagePrePullController) Enable() bool {
+ return uc.enable
+}
+
+// Start controller
+func (uc *ImagePrePullController) Start() {
+ if err := uc.downstream.Start(); err != nil {
+ klog.Exitf("start ImagePrePullJob controller downstream failed with error: %s", err)
+ }
+ // wait for downstream controller to start and load ImagePrePullJob
+ // TODO think about sync
+ time.Sleep(1 * time.Second)
+ if err := uc.upstream.Start(); err != nil {
+ klog.Exitf("start ImagePrePullJob controller upstream failed with error: %s", err)
+ }
+}
diff --git a/cloud/pkg/imageprepullcontroller/manager/common.go b/cloud/pkg/imageprepullcontroller/manager/common.go
new file mode 100644
index 000000000..9f371875a
--- /dev/null
+++ b/cloud/pkg/imageprepullcontroller/manager/common.go
@@ -0,0 +1,62 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package manager
+
+import (
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/klog/v2"
+)
+
+// Manager define the interface of a Manager, ImagePrePullJob Manager implement it
+type Manager interface {
+ Events() chan watch.Event
+}
+
+// CommonResourceEventHandler can be used by ImagePrePullJob Manager
+type CommonResourceEventHandler struct {
+ events chan watch.Event
+}
+
+func (c *CommonResourceEventHandler) obj2Event(t watch.EventType, obj interface{}) {
+ eventObj, ok := obj.(runtime.Object)
+ if !ok {
+ klog.Warningf("unknown type: %T, ignore", obj)
+ return
+ }
+ c.events <- watch.Event{Type: t, Object: eventObj}
+}
+
+// OnAdd handle Add event
+func (c *CommonResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool) {
+ c.obj2Event(watch.Added, obj)
+}
+
+// OnUpdate handle Update event
+func (c *CommonResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
+ c.obj2Event(watch.Modified, newObj)
+}
+
+// OnDelete handle Delete event
+func (c *CommonResourceEventHandler) OnDelete(obj interface{}) {
+ c.obj2Event(watch.Deleted, obj)
+}
+
+// NewCommonResourceEventHandler create CommonResourceEventHandler used by ImagePrePullJob Manager
+func NewCommonResourceEventHandler(events chan watch.Event) *CommonResourceEventHandler {
+ return &CommonResourceEventHandler{events: events}
+}
diff --git a/cloud/pkg/imageprepullcontroller/manager/imageprepull.go b/cloud/pkg/imageprepullcontroller/manager/imageprepull.go
new file mode 100644
index 000000000..cdb2eaccc
--- /dev/null
+++ b/cloud/pkg/imageprepullcontroller/manager/imageprepull.go
@@ -0,0 +1,52 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package manager
+
+import (
+ "sync"
+
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/client-go/tools/cache"
+
+ "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/config"
+)
+
+// ImagePrePullJobManager is a manager watch ImagePrePullJob change event
+type ImagePrePullJobManager struct {
+ // events from watch kubernetes api server
+ events chan watch.Event
+
+ // ImagePrePullMap, key is ImagePrePullJob.Name, value is *v1alpha1.ImagePrePullJob{}
+ ImagePrePullMap sync.Map
+}
+
+// Events return a channel, can receive all ImagePrePullJob event
+func (dmm *ImagePrePullJobManager) Events() chan watch.Event {
+ return dmm.events
+}
+
+// NewImagePrePullJobManager create ImagePrePullJobManager from config
+func NewImagePrePullJobManager(si cache.SharedIndexInformer) (*ImagePrePullJobManager, error) {
+ events := make(chan watch.Event, config.Config.Buffer.ImagePrePullJobEvent)
+ rh := NewCommonResourceEventHandler(events)
+ _, err := si.AddEventHandler(rh)
+ if err != nil {
+ return nil, err
+ }
+
+ return &ImagePrePullJobManager{events: events}, nil
+}
diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go b/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go
index 8e4f96d98..6efee8012 100644
--- a/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go
+++ b/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go
@@ -38,6 +38,7 @@ import (
"github.com/kubeedge/kubeedge/cloud/pkg/common/informers"
"github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer"
"github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/util"
"github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller/manager"
"github.com/kubeedge/kubeedge/common/constants"
commontypes "github.com/kubeedge/kubeedge/common/types"
@@ -143,7 +144,7 @@ func (dc *DownstreamController) nodeUpgradeJobAdded(upgrade *v1alpha1.NodeUpgrad
}
// deduplicate: remove duplicate nodes to avoid repeating upgrade to the same node
- nodesToUpgrade = RemoveDuplicateElement(nodesToUpgrade)
+ nodesToUpgrade = util.RemoveDuplicateElement(nodesToUpgrade)
klog.Infof("Filtered finished, the below nodes are to upgrade\n%v\n", nodesToUpgrade)
@@ -200,7 +201,7 @@ func (dc *DownstreamController) processUpgrade(node string, upgrade *v1alpha1.No
return
}
- // process time out: cloud did not receive upgrade feedback from edge
+ // process time out: could not receive upgrade feedback from edge
// send upgrade timeout response message to upstream
go dc.handleNodeUpgradeJobTimeout(node, upgrade.Name, upgrade.Spec.Version, upgradeReq.HistoryID, upgrade.Spec.TimeoutSeconds)
@@ -303,7 +304,7 @@ func needUpgrade(node *v1.Node, upgradeVersion string) bool {
}
// we only care about edge nodes, so just remove not edge nodes
- if !isEdgeNode(node) {
+ if !util.IsEdgeNode(node) {
klog.Warningf("Node(%s) is not edge node", node.Name)
return false
}
diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/util.go b/cloud/pkg/nodeupgradejobcontroller/controller/util.go
index 68a8e95ce..8b80c4261 100644
--- a/cloud/pkg/nodeupgradejobcontroller/controller/util.go
+++ b/cloud/pkg/nodeupgradejobcontroller/controller/util.go
@@ -22,9 +22,7 @@ import (
"time"
"github.com/distribution/distribution/v3/reference"
- metav1 "k8s.io/api/core/v1"
- "github.com/kubeedge/kubeedge/common/constants"
"github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
)
@@ -55,23 +53,6 @@ func filterVersion(version string, expected string) bool {
return version[index+length:] == expected
}
-// isEdgeNode checks whether a node is an Edge Node
-// only if label {"node-role.kubernetes.io/edge": ""} exists, it is an edge node
-func isEdgeNode(node *metav1.Node) bool {
- if node.Labels == nil {
- return false
- }
- if _, ok := node.Labels[constants.EdgeNodeRoleKey]; !ok {
- return false
- }
-
- if node.Labels[constants.EdgeNodeRoleKey] != constants.EdgeNodeRoleValue {
- return false
- }
-
- return true
-}
-
// isCompleted returns true only if some/all edge upgrade is upgrading or completed
func isCompleted(upgrade *v1alpha1.NodeUpgradeJob) bool {
// all edge node upgrade is upgrading or completed
@@ -89,21 +70,6 @@ func isCompleted(upgrade *v1alpha1.NodeUpgradeJob) bool {
return false
}
-// RemoveDuplicateElement deduplicate
-func RemoveDuplicateElement(s []string) []string {
- result := make([]string, 0, len(s))
- temp := make(map[string]struct{}, len(s))
-
- for _, item := range s {
- if _, ok := temp[item]; !ok {
- temp[item] = struct{}{}
- result = append(result, item)
- }
- }
-
- return result
-}
-
// UpdateNodeUpgradeJobStatus updates the status
// return the updated result
func UpdateNodeUpgradeJobStatus(old *v1alpha1.NodeUpgradeJob, status *v1alpha1.UpgradeStatus) *v1alpha1.NodeUpgradeJob {
diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go b/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go
index bfc93c7eb..499da62fa 100644
--- a/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go
+++ b/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go
@@ -66,39 +66,6 @@ func TestFilterVersion(t *testing.T) {
}
}
-func TestRemoveDuplicateElement(t *testing.T) {
- tests := []struct {
- name string
- input []string
- expected []string
- }{
- {
- name: "case 1",
- input: []string{"a", "b", "c"},
- expected: []string{"a", "b", "c"},
- },
- {
- name: "case 2",
- input: []string{"a", "a", "b", "c", "b", "a", "a"},
- expected: []string{"a", "b", "c"},
- },
- {
- name: "case 3",
- input: []string{},
- expected: []string{},
- },
- }
-
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- result := RemoveDuplicateElement(test.input)
- if !reflect.DeepEqual(result, test.expected) {
- t.Errorf("Got = %v, Want = %v", result, test.expected)
- }
- })
- }
-}
-
func TestUpdateUpgradeStatus(t *testing.T) {
upgrade := v1alpha1.NodeUpgradeJob{
Status: v1alpha1.NodeUpgradeJobStatus{