diff options
| author | KubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com> | 2024-01-15 21:18:26 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-01-15 21:18:26 +0800 |
| commit | 88e4b1ec243d9049de9be28c8ea4dd3c0612f07d (patch) | |
| tree | 770598173cdf13db42b9a61849e6651b2ad2749c | |
| parent | Merge pull request #5353 from WillardHu/approver (diff) | |
| parent | support enable imagePrePullController through helm (diff) | |
| download | kubeedge-88e4b1ec243d9049de9be28c8ea4dd3c0612f07d.tar.gz | |
Merge pull request #5331 from Shelley-BaoYue/image-prepull
support image prepull feature
47 files changed, 2460 insertions, 83 deletions
diff --git a/build/admission/clusterrole.yaml b/build/admission/clusterrole.yaml index 0f3519949..92d13bf9e 100644 --- a/build/admission/clusterrole.yaml +++ b/build/admission/clusterrole.yaml @@ -26,5 +26,5 @@ rules: resources: ["rules", "ruleendpoints"] verbs: ["get", "list"] - apiGroups: ["operations.kubeedge.io"] - resources: ["nodeupgradejobs"] + resources: ["nodeupgradejobs", "imageprepulljobs"] verbs: ["get", "list"] diff --git a/build/cloud/03-clusterrole.yaml b/build/cloud/03-clusterrole.yaml index 2bb3ac7df..19978bcdb 100644 --- a/build/cloud/03-clusterrole.yaml +++ b/build/cloud/03-clusterrole.yaml @@ -37,5 +37,5 @@ rules: resources: ["*"] verbs: ["get", "list", "watch"] - apiGroups: ["operations.kubeedge.io"] - resources: ["nodeupgradejobs", "nodeupgradejobs/status"] + resources: ["nodeupgradejobs", "nodeupgradejobs/status", "imageprepulljobs", "imageprepulljobs/status"] verbs: ["get", "list", "watch", "update", "patch"] diff --git a/build/crd-samples/operations/imageprepulljob.yaml b/build/crd-samples/operations/imageprepulljob.yaml new file mode 100644 index 000000000..cebfbe27f --- /dev/null +++ b/build/crd-samples/operations/imageprepulljob.yaml @@ -0,0 +1,14 @@ +apiVersion: operations.kubeedge.io/v1alpha1 +kind: ImagePrePullJob +metadata: + name: imageprepull-example + labels: + description: ImagePrePullLabel +spec: + imagePrePullTemplate: + images: + - busybox:latest + nodes: + - edgenode1 # Need to replaced with your own node name + timeoutSecondsOnEachNode: 300 + retryTimesOnEachNode: 1
\ No newline at end of file diff --git a/build/crd-samples/nodeupgradejob.yaml b/build/crd-samples/operations/nodeupgradejob.yaml index 5b97287d7..5b97287d7 100644 --- a/build/crd-samples/nodeupgradejob.yaml +++ b/build/crd-samples/operations/nodeupgradejob.yaml diff --git a/build/crds/operations/operations_v1alpha1_imageprepulljob.yaml b/build/crds/operations/operations_v1alpha1_imageprepulljob.yaml new file mode 100644 index 000000000..903516d32 --- /dev/null +++ b/build/crds/operations/operations_v1alpha1_imageprepulljob.yaml @@ -0,0 +1,204 @@ + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.6.2 + creationTimestamp: null + name: imageprepulljobs.operations.kubeedge.io +spec: + group: operations.kubeedge.io + names: + kind: ImagePrePullJob + listKind: ImagePrePullJobList + plural: imageprepulljobs + singular: imageprepulljob + scope: Cluster + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: ImagePrePullJob is used to prepull images on edge node. + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: Spec represents the specification of the desired behavior + of ImagePrePullJob. + properties: + imagePrePullTemplate: + description: ImagePrepullTemplate represents original templates of + imagePrePull + properties: + checkItems: + description: CheckItems specifies the items need to be checked + before the task is executed. The default CheckItems value is + disk. + items: + type: string + type: array + imageSecrets: + description: ImageSecret specifies the secret for image pull if + private registry used. Use {namespace}/{secretName} in format. + type: string + images: + description: Images is the image list to be prepull + items: + type: string + type: array + labelSelector: + description: LabelSelector is a filter to select member clusters + by labels. It must match a node's labels for the NodeUpgradeJob + to be operated on that node. Please note that sets of NodeNames + and LabelSelector are ORed. Users must set one and can only + set one. + properties: + matchExpressions: + description: matchExpressions is a list of label selector + requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector + that contains values, a key, and an operator that relates + the key and values. + properties: + key: + description: key is the label key that the selector + applies to. + type: string + operator: + description: operator represents a key's relationship + to a set of values. Valid operators are In, NotIn, + Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. If + the operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. This array is replaced + during a strategic merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. A + single {key,value} in the matchLabels map is equivalent + to an element of matchExpressions, whose key field is "key", + the operator is "In", and the values array contains only + "value". The requirements are ANDed. + type: object + type: object + nodeNames: + description: NodeNames is a request to select some specific nodes. + If it is non-empty, the upgrade job simply select these edge + nodes to do upgrade operation. Please note that sets of NodeNames + and LabelSelector are ORed. Users must set one and can only + set one. + items: + type: string + type: array + retryTimes: + description: RetryTimes specifies the retry times if image pull + failed on each edgenode. Default to 0 + format: int32 + type: integer + timeoutSecondsOnEachNode: + description: TimeoutSecondsOnEachNode limits the duration of the + image prepull job on each edgenode. Default to 360. If set to + 0, we'll use the default value 360. + format: int32 + type: integer + type: object + type: object + status: + description: Status represents the status of ImagePrePullJob. + properties: + state: + description: 'State represents for the state phase of the ImagePrePullJob. + There are four possible state values: "", prechecking, prepulling, + successful, failed.' + enum: + - prepulling + - successful + - failed + type: string + status: + description: Status contains image prepull status for each edge node. + items: + description: ImagePrePullStatus stores image prepull status for + each edge node. + properties: + imageStatus: + description: ImageStatus represents the prepull status for each + image + items: + description: ImageStatus stores the prepull status for each + image. + properties: + image: + description: Image is the name of the image + type: string + reason: + description: Reason represents the fail reason if image + pull failed + type: string + state: + description: 'State represents for the state phase of + this image pull on the edge node There are two possible + state values: successful, failed.' + enum: + - prepulling + - successful + - failed + type: string + type: object + type: array + nodeName: + description: NodeName is the name of edge node. + type: string + reason: + description: Reason represents the fail reason if images prepull + failed on the edge node + type: string + state: + description: 'State represents for the state phase of the ImagePrepullJob + on the edge node. There are five possible state values: "", + prepulling, successful, failed.' + enum: + - prepulling + - successful + - failed + type: string + type: object + type: array + type: object + required: + - spec + type: object + served: true + storage: true + subresources: + status: {} +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] diff --git a/cloud/cmd/cloudcore/app/server.go b/cloud/cmd/cloudcore/app/server.go index e42f6cf3b..fd346cd72 100644 --- a/cloud/cmd/cloudcore/app/server.go +++ b/cloud/cmd/cloudcore/app/server.go @@ -48,6 +48,7 @@ import ( "github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller" "github.com/kubeedge/kubeedge/cloud/pkg/dynamiccontroller" "github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller" + "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller" "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller" "github.com/kubeedge/kubeedge/cloud/pkg/policycontroller" "github.com/kubeedge/kubeedge/cloud/pkg/router" @@ -158,6 +159,7 @@ func registerModules(c *v1alpha1.CloudCoreConfig) { cloudhub.Register(c.Modules.CloudHub) edgecontroller.Register(c.Modules.EdgeController) devicecontroller.Register(c.Modules.DeviceController) + imageprepullcontroller.Register(c.Modules.ImagePrePullController) nodeupgradejobcontroller.Register(c.Modules.NodeUpgradeJobController) synccontroller.Register(c.Modules.SyncController) cloudstream.Register(c.Modules.CloudStream, c.CommonConfig) diff --git a/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go b/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go index dd953fa57..a236d3ebd 100644 --- a/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go +++ b/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go @@ -33,6 +33,7 @@ import ( "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/session" "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer" "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + imageprepullcontroller "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/controller" "github.com/kubeedge/kubeedge/cloud/pkg/synccontroller" commonconst "github.com/kubeedge/kubeedge/common/constants" v2 "github.com/kubeedge/kubeedge/edge/pkg/metamanager/dao/v2" @@ -183,6 +184,9 @@ func (md *messageDispatcher) DispatchUpstream(message *beehivemodel.Message, inf message.Router.Resource = fmt.Sprintf("node/%s/%s", info.NodeID, message.Router.Resource) beehivecontext.Send(modules.RouterModuleName, *message) + case message.GetOperation() == imageprepullcontroller.ImagePrePull: + beehivecontext.SendToGroup(modules.ImagePrePullControllerModuleGroup, *message) + default: err := md.PubToController(info, message) if err != nil { @@ -447,7 +451,7 @@ func noAckRequired(msg *beehivemodel.Message) bool { return true case msg.GetGroup() == modules.UserGroup: return true - case msg.GetSource() == modules.NodeUpgradeJobControllerModuleName: + case msg.GetSource() == modules.NodeUpgradeJobControllerModuleName || msg.GetSource() == modules.ImagePrePullControllerModuleName: return true case msg.GetOperation() == beehivemodel.ResponseOperation: content, ok := msg.Content.(string) diff --git a/cloud/pkg/common/messagelayer/context.go b/cloud/pkg/common/messagelayer/context.go index 7aeb2afdc..8a2e9d4bd 100644 --- a/cloud/pkg/common/messagelayer/context.go +++ b/cloud/pkg/common/messagelayer/context.go @@ -104,6 +104,14 @@ func NodeUpgradeJobControllerMessageLayer() MessageLayer { } } +func ImagePrePullControllerMessageLayer() MessageLayer { + return &ContextMessageLayer{ + SendModuleName: modules.CloudHubModuleName, + ReceiveModuleName: modules.ImagePrePullControllerModuleName, + ResponseModuleName: modules.CloudHubModuleName, + } +} + func PolicyControllerMessageLayer() MessageLayer { return &ContextMessageLayer{ SendModuleName: modules.CloudHubModuleName, diff --git a/cloud/pkg/common/modules/modules.go b/cloud/pkg/common/modules/modules.go index 76f748251..f458aee5e 100644 --- a/cloud/pkg/common/modules/modules.go +++ b/cloud/pkg/common/modules/modules.go @@ -16,6 +16,9 @@ const ( NodeUpgradeJobControllerModuleName = "nodeupgradejobcontroller" NodeUpgradeJobControllerModuleGroup = "nodeupgradejobcontroller" + ImagePrePullControllerModuleName = "imageprepullcontroller" + ImagePrePullControllerModuleGroup = "imageprepullcontroller" + SyncControllerModuleName = "synccontroller" SyncControllerModuleGroup = "synccontroller" diff --git a/cloud/pkg/common/util/util.go b/cloud/pkg/common/util/util.go new file mode 100644 index 000000000..a80ba1c08 --- /dev/null +++ b/cloud/pkg/common/util/util.go @@ -0,0 +1,55 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + metav1 "k8s.io/api/core/v1" + + "github.com/kubeedge/kubeedge/common/constants" +) + +// IsEdgeNode checks whether a node is an Edge Node +// only if label {"node-role.kubernetes.io/edge": ""} exists, it is an edge node +func IsEdgeNode(node *metav1.Node) bool { + if node.Labels == nil { + return false + } + if _, ok := node.Labels[constants.EdgeNodeRoleKey]; !ok { + return false + } + + if node.Labels[constants.EdgeNodeRoleKey] != constants.EdgeNodeRoleValue { + return false + } + + return true +} + +// RemoveDuplicateElement deduplicate +func RemoveDuplicateElement[T any](s []T) []T { + result := make([]T, 0, len(s)) + temp := make(map[any]struct{}, len(s)) + + for _, item := range s { + if _, ok := temp[item]; !ok { + temp[item] = struct{}{} + result = append(result, item) + } + } + + return result +} diff --git a/cloud/pkg/common/util/util_test.go b/cloud/pkg/common/util/util_test.go new file mode 100644 index 000000000..749d7596a --- /dev/null +++ b/cloud/pkg/common/util/util_test.go @@ -0,0 +1,55 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "reflect" + "testing" +) + +func TestRemoveDuplicateElement(t *testing.T) { + tests := []struct { + name string + input []string + expected []string + }{ + { + name: "case 1", + input: []string{"a", "b", "c"}, + expected: []string{"a", "b", "c"}, + }, + { + name: "case 2", + input: []string{"a", "a", "b", "c", "b", "a", "a"}, + expected: []string{"a", "b", "c"}, + }, + { + name: "case 3", + input: []string{}, + expected: []string{}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + result := RemoveDuplicateElement(test.input) + if !reflect.DeepEqual(result, test.expected) { + t.Errorf("Got = %v, Want = %v", result, test.expected) + } + }) + } +} diff --git a/cloud/pkg/imageprepullcontroller/config/config.go b/cloud/pkg/imageprepullcontroller/config/config.go new file mode 100644 index 000000000..88572a842 --- /dev/null +++ b/cloud/pkg/imageprepullcontroller/config/config.go @@ -0,0 +1,38 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "sync" + + "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1" +) + +var Config Configure +var once sync.Once + +type Configure struct { + v1alpha1.ImagePrePullController +} + +func InitConfigure(dc *v1alpha1.ImagePrePullController) { + once.Do(func() { + Config = Configure{ + ImagePrePullController: *dc, + } + }) +} diff --git a/cloud/pkg/imageprepullcontroller/controller/downstream.go b/cloud/pkg/imageprepullcontroller/controller/downstream.go new file mode 100644 index 000000000..d82c58413 --- /dev/null +++ b/cloud/pkg/imageprepullcontroller/controller/downstream.go @@ -0,0 +1,268 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + k8sinformer "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + beehiveContext "github.com/kubeedge/beehive/pkg/core/context" + "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/cloud/pkg/common/client" + "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" + "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer" + "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + "github.com/kubeedge/kubeedge/cloud/pkg/common/util" + "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/manager" + commontypes "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" + crdinformers "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions" +) + +type DownstreamController struct { + kubeClient kubernetes.Interface + informer k8sinformer.SharedInformerFactory + crdClient crdClientset.Interface + messageLayer messagelayer.MessageLayer + + imagePrePullJobManager *manager.ImagePrePullJobManager +} + +// Start DownstreamController +func (dc *DownstreamController) Start() error { + klog.Info("Start ImagePrePullJob Downstream Controller") + go dc.syncImagePrePullJob() + return nil +} + +// syncImagePrePullJob is used to get events from informer +func (dc *DownstreamController) syncImagePrePullJob() { + for { + select { + case <-beehiveContext.Done(): + klog.Info("stop sync ImagePrePullJob") + return + case e := <-dc.imagePrePullJobManager.Events(): + imagePrePull, ok := e.Object.(*v1alpha1.ImagePrePullJob) + if !ok { + klog.Warningf("object type: %T unsupported", e.Object) + continue + } + switch e.Type { + case watch.Added: + dc.imagePrePullJobAdded(imagePrePull) + case watch.Deleted: + dc.imagePrePullJobDeleted(imagePrePull) + case watch.Modified: + dc.imagePrePullJobUpdate(imagePrePull) + default: + klog.Warningf("ImagePrePullJob event type: %s unsupported", e.Type) + } + } + } +} + +// imagePrePullJobAdded is used to process addition of new ImagePrePullJob in apiserver +func (dc *DownstreamController) imagePrePullJobAdded(imagePrePull *v1alpha1.ImagePrePullJob) { + klog.V(4).Infof("add ImagePrePullJob: %v", imagePrePull) + // store in cache map + dc.imagePrePullJobManager.ImagePrePullMap.Store(imagePrePull.Name, imagePrePull) + + // If imagePrePullJob is not initial state, we don't need to send message\ + if imagePrePull.Status.State != v1alpha1.PrePullInitialValue { + klog.Errorf("The ImagePrePullJob %s is already running or completed, don't send message again", imagePrePull.Name) + return + } + + // get node list that need prepull images + var nodesToPrePullImage []string + if len(imagePrePull.Spec.ImagePrePullTemplate.NodeNames) != 0 { + for _, node := range imagePrePull.Spec.ImagePrePullTemplate.NodeNames { + nodeInfo, err := dc.informer.Core().V1().Nodes().Lister().Get(node) + if err != nil { + klog.Errorf("Failed to get node(%s) info: %v", node, err) + continue + } + + if validateNode(nodeInfo) { + nodesToPrePullImage = append(nodesToPrePullImage, nodeInfo.Name) + } + } + } else if imagePrePull.Spec.ImagePrePullTemplate.LabelSelector != nil { + selector, err := metav1.LabelSelectorAsSelector(imagePrePull.Spec.ImagePrePullTemplate.LabelSelector) + if err != nil { + klog.Errorf("LabelSelector(%s) is not valid: %v", imagePrePull.Spec.ImagePrePullTemplate.LabelSelector, err) + return + } + + nodes, err := dc.informer.Core().V1().Nodes().Lister().List(selector) + if err != nil { + klog.Errorf("Failed to get nodes with label %s: %v", selector.String(), err) + return + } + + for _, node := range nodes { + if validateNode(node) { + nodesToPrePullImage = append(nodesToPrePullImage, node.Name) + } + } + } + + // deduplicate: remove duplicate nodes to avoid repeating prepull images on the same node + nodesToPrePullImage = util.RemoveDuplicateElement(nodesToPrePullImage) + + klog.Infof("Filtered finished, images will be prepulled on below nodes\n%v\n", nodesToPrePullImage) + + go func() { + for _, node := range nodesToPrePullImage { + dc.processPrePull(node, imagePrePull) + } + }() +} + +// imagePrePullJobDeleted is used to process deleted ImagePrePullJob in apiServer +func (dc *DownstreamController) imagePrePullJobDeleted(imagePrePull *v1alpha1.ImagePrePullJob) { + // delete drom cache map + dc.imagePrePullJobManager.ImagePrePullMap.Delete(imagePrePull.Name) +} + +// imagePrePullJobUpdate is used to process update of ImagePrePullJob in apiServer +// Now we don't allow update spec, so we only update the cache map in imagePrePullJobUpdate func. +func (dc *DownstreamController) imagePrePullJobUpdate(imagePrePull *v1alpha1.ImagePrePullJob) { + _, ok := dc.imagePrePullJobManager.ImagePrePullMap.Load(imagePrePull.Name) + // store in cache map + dc.imagePrePullJobManager.ImagePrePullMap.Store(imagePrePull.Name, imagePrePull) + if !ok { + klog.Infof("ImagePrePull Job %s not exist, and store it into first", imagePrePull.Name) + // If ImagePrePullJob not present in ImagePrePull map means it is not modified and added. + dc.imagePrePullJobAdded(imagePrePull) + } +} + +// processPrePull process prepull job added and send it to edge nodes. +func (dc *DownstreamController) processPrePull(node string, imagePrePull *v1alpha1.ImagePrePullJob) { + klog.V(4).Infof("begin to send imagePrePull message to %s", node) + + imagePrePullTemplateInfo := imagePrePull.Spec.ImagePrePullTemplate + imagePrePullRequest := commontypes.ImagePrePullJobRequest{ + Images: imagePrePullTemplateInfo.Images, + NodeName: node, + Secret: imagePrePullTemplateInfo.ImageSecret, + RetryTimes: imagePrePullTemplateInfo.RetryTimes, + CheckItems: imagePrePullTemplateInfo.CheckItems, + } + + // handle timeout: could not receive image prepull msg feedback from edge node + // send prepull timeout response message to upstream + go dc.handleImagePrePullJobTimeoutOnEachNode(node, imagePrePull.Name, imagePrePullTemplateInfo.TimeoutSecondsOnEachNode) + + // send prepull msg to edge node + msg := model.NewMessage("") + resource := buildPrePullResource(imagePrePull.Name, node) + msg.BuildRouter(modules.ImagePrePullControllerModuleName, modules.ImagePrePullControllerModuleGroup, resource, ImagePrePull). + FillBody(imagePrePullRequest) + + err := dc.messageLayer.Send(*msg) + if err != nil { + klog.Errorf("Failed to send prepull message %s due to error %v", msg.GetID(), err) + return + } + + // update imagePrePullJob status to prepulling + status := &v1alpha1.ImagePrePullStatus{ + NodeName: node, + State: v1alpha1.PrePulling, + } + err = patchImagePrePullStatus(dc.crdClient, imagePrePull, status) + if err != nil { + klog.Errorf("Failed to mark imagePrePullJob prepulling status: %v", err) + } +} + +// handleImagePrePullJobTimeoutOnEachNode is used to handle the situation that cloud don't receive prepull result +// from edge node within the timeout period. +// If so, the ImagePrePullJobState will update to timeout +func (dc *DownstreamController) handleImagePrePullJobTimeoutOnEachNode(node, jobName string, timeoutSecondsEachNode *uint32) { + var timeout uint32 = 360 + if timeoutSecondsEachNode != nil && *timeoutSecondsEachNode != 0 { + timeout = *timeoutSecondsEachNode + } + + receiveFeedback := false + + _ = wait.Poll(10*time.Second, time.Duration(timeout)*time.Second, func() (bool, error) { + cacheValue, ok := dc.imagePrePullJobManager.ImagePrePullMap.Load(jobName) + if !ok { + receiveFeedback = true + klog.Errorf("ImagePrePullJob %s is not exist", jobName) + return false, fmt.Errorf("imagePrePullJob %s is not exist", jobName) + } + imagePrePullValue := cacheValue.(*v1alpha1.ImagePrePullJob) + for _, statusValue := range imagePrePullValue.Status.Status { + if statusValue.NodeName == node && (statusValue.State == v1alpha1.PrePullSuccessful || statusValue.State == v1alpha1.PrePullFailed) { + receiveFeedback = true + return true, nil + } + break + } + return false, nil + }) + + if receiveFeedback { + return + } + klog.Errorf("TIMEOUT to receive image prepull %s response from edge node %s", jobName, node) + + // construct timeout image prepull response and send it to upstream controller + responseResource := buildPrePullResource(jobName, node) + resp := commontypes.ImagePrePullJobResponse{ + NodeName: node, + State: v1alpha1.PrePullFailed, + Reason: "timeout to receive response from edge", + } + + respMsg := model.NewMessage(""). + BuildRouter(modules.ImagePrePullControllerModuleName, modules.ImagePrePullControllerModuleGroup, responseResource, ImagePrePull). + FillBody(resp) + beehiveContext.Send(modules.ImagePrePullControllerModuleName, *respMsg) +} + +// NewDownstreamController new downstream controller to process downstream imageprepull msg to edge nodes. +func NewDownstreamController(crdInformerFactory crdinformers.SharedInformerFactory) (*DownstreamController, error) { + imagePrePullJobManager, err := manager.NewImagePrePullJobManager(crdInformerFactory.Operations().V1alpha1().ImagePrePullJobs().Informer()) + if err != nil { + klog.Warningf("Create ImagePrePullJob manager failed with error: %s", err) + return nil, err + } + + dc := &DownstreamController{ + kubeClient: client.GetKubeClient(), + informer: informers.GetInformersManager().GetKubeInformerFactory(), + crdClient: client.GetCRDClient(), + imagePrePullJobManager: imagePrePullJobManager, + messageLayer: messagelayer.ImagePrePullControllerMessageLayer(), + } + return dc, nil +} diff --git a/cloud/pkg/imageprepullcontroller/controller/upstream.go b/cloud/pkg/imageprepullcontroller/controller/upstream.go new file mode 100644 index 000000000..98fd28bea --- /dev/null +++ b/cloud/pkg/imageprepullcontroller/controller/upstream.go @@ -0,0 +1,147 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "encoding/json" + + k8sinformer "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + beehiveContext "github.com/kubeedge/beehive/pkg/core/context" + "github.com/kubeedge/beehive/pkg/core/model" + keclient "github.com/kubeedge/kubeedge/cloud/pkg/common/client" + "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" + "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer" + "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/config" + "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" +) + +// UpstreamController subscribe messages from edge and sync to k8s api server +type UpstreamController struct { + // downstream controller to update ImagePrePullJob status in cache + dc *DownstreamController + + kubeClient kubernetes.Interface + informer k8sinformer.SharedInformerFactory + crdClient crdClientset.Interface + messageLayer messagelayer.MessageLayer + // message channel + imagePrePullJobStatusChan chan model.Message +} + +// Start imageprepull UpstreamController +func (uc *UpstreamController) Start() error { + klog.Info("Start ImagePrePullJob Upstream Controller") + + uc.imagePrePullJobStatusChan = make(chan model.Message, config.Config.Buffer.ImagePrePullJobStatus) + go uc.dispatchMessage() + + for i := 0; i < int(config.Config.Load.ImagePrePullJobWorkers); i++ { + go uc.updateImagePrePullJobStatus() + } + return nil +} + +// updateImagePrePullJobStatus update imagePrePullJob status +func (uc *UpstreamController) updateImagePrePullJobStatus() { + for { + select { + case <-beehiveContext.Done(): + klog.Info("Stop update ImagePrePullJob status") + return + case msg := <-uc.imagePrePullJobStatusChan: + klog.V(4).Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource()) + nodeName, jobName, err := parsePrePullresource(msg.GetResource()) + if err != nil { + klog.Errorf("message resource %s is not supported", msg.GetResource()) + continue + } + + oldValue, ok := uc.dc.imagePrePullJobManager.ImagePrePullMap.Load(jobName) + if !ok { + klog.Errorf("ImagePrePullJob %s not exist", jobName) + continue + } + + imagePrePull, ok := oldValue.(*v1alpha1.ImagePrePullJob) + if !ok { + klog.Errorf("ImagePrePullJob info %T is not valid", oldValue) + continue + } + + data, err := msg.GetContentData() + if err != nil { + klog.Errorf("failed to get image prepull content from response msg, err: %v", err) + continue + } + resp := &types.ImagePrePullJobResponse{} + err = json.Unmarshal(data, resp) + if err != nil { + klog.Errorf("Failed to unmarshal image prepull response: %v", err) + continue + } + + status := &v1alpha1.ImagePrePullStatus{ + NodeName: nodeName, + State: resp.State, + Reason: resp.Reason, + ImageStatus: resp.ImageStatus, + } + err = patchImagePrePullStatus(uc.crdClient, imagePrePull, status) + if err != nil { + klog.Errorf("Failed to patch ImagePrePullJob status, err: %v", err) + } + } + } +} + +// dispatchMessage receive the message from edge and write into imagePrePullJobStatusChan +func (uc *UpstreamController) dispatchMessage() { + for { + select { + case <-beehiveContext.Done(): + klog.Info("Stop dispatch ImagePrePullJob upstream message") + return + default: + } + + msg, err := uc.messageLayer.Receive() + if err != nil { + klog.Warningf("Receive message failed, %v", err) + continue + } + + klog.V(4).Infof("ImagePrePullJob upstream controller receive msg %#v", msg) + uc.imagePrePullJobStatusChan <- msg + } +} + +// NewUpstreamController create UpstreamController from config +func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) { + uc := &UpstreamController{ + kubeClient: keclient.GetKubeClient(), + informer: informers.GetInformersManager().GetKubeInformerFactory(), + crdClient: keclient.GetCRDClient(), + messageLayer: messagelayer.ImagePrePullControllerMessageLayer(), + dc: dc, + } + return uc, nil +} diff --git a/cloud/pkg/imageprepullcontroller/controller/util.go b/cloud/pkg/imageprepullcontroller/controller/util.go new file mode 100644 index 000000000..592164833 --- /dev/null +++ b/cloud/pkg/imageprepullcontroller/controller/util.go @@ -0,0 +1,131 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + jsonpatch "github.com/evanphx/json-patch" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apimachineryType "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + + "github.com/kubeedge/kubeedge/cloud/pkg/common/util" + "github.com/kubeedge/kubeedge/common/constants" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" +) + +const ImagePrePull = "prepull" + +func validateNode(node *v1.Node) bool { + if !util.IsEdgeNode(node) { + klog.Warningf("Node(%s) is not edge node", node.Name) + return false + } + + // if node is in NotReady state, cannot prepull images + for _, condition := range node.Status.Conditions { + if condition.Type == v1.NodeReady && condition.Status != v1.ConditionTrue { + klog.Warningf("Node(%s) is in NotReady state", node.Name) + return false + } + } + + return true +} + +// buildPrePullResource build prepull resource in msg send to edge node +func buildPrePullResource(imagePrePullName, nodeName string) string { + resource := fmt.Sprintf("%s/%s/%s/%s", "node", nodeName, ImagePrePull, imagePrePullName) + return resource +} + +func parsePrePullresource(resource string) (string, string, error) { + var nodeName, jobName string + sli := strings.Split(resource, constants.ResourceSep) + if len(sli) != 4 { + return nodeName, jobName, fmt.Errorf("the resource %s is not the standard type", resource) + } + return sli[1], sli[3], nil +} + +func patchImagePrePullStatus(crdClient crdClientset.Interface, imagePrePull *v1alpha1.ImagePrePullJob, status *v1alpha1.ImagePrePullStatus) error { + oldValue := imagePrePull.DeepCopy() + newValue := updateNodeImagePrePullStatus(oldValue, status) + + var completeFlag int + var failedFlag bool + newValue.Status.State = v1alpha1.PrePulling + for _, statusValue := range newValue.Status.Status { + if statusValue.State == v1alpha1.PrePullFailed { + failedFlag = true + completeFlag++ + } + if statusValue.State == v1alpha1.PrePullSuccessful { + completeFlag++ + } + } + if completeFlag == len(newValue.Status.Status) { + if failedFlag { + newValue.Status.State = v1alpha1.PrePullFailed + } else { + newValue.Status.State = v1alpha1.PrePullSuccessful + } + } + + oldData, err := json.Marshal(oldValue) + if err != nil { + return fmt.Errorf("failed to marshal the old ImagePrePullJob(%s): %v", oldValue.Name, err) + } + + newData, err := json.Marshal(newValue) + if err != nil { + return fmt.Errorf("failed to marshal the new ImagePrePullJob(%s): %v", newValue.Name, err) + } + + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return fmt.Errorf("failed to create a merge patch: %v", err) + } + + _, err = crdClient.OperationsV1alpha1().ImagePrePullJobs().Patch(context.TODO(), newValue.Name, apimachineryType.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") + if err != nil { + return fmt.Errorf("failed to patch update ImagePrePullJob status: %v", err) + } + + return nil +} + +func updateNodeImagePrePullStatus(imagePrePull *v1alpha1.ImagePrePullJob, status *v1alpha1.ImagePrePullStatus) *v1alpha1.ImagePrePullJob { + // return value imageprepull cannot populate the input parameter old + newValue := imagePrePull.DeepCopy() + + for index, nodeStatus := range newValue.Status.Status { + if nodeStatus.NodeName == status.NodeName { + newValue.Status.Status[index] = *status + return newValue + } + } + + newValue.Status.Status = append(newValue.Status.Status, *status) + return newValue +} diff --git a/cloud/pkg/imageprepullcontroller/imageprepullcontroller.go b/cloud/pkg/imageprepullcontroller/imageprepullcontroller.go new file mode 100644 index 000000000..4f772d258 --- /dev/null +++ b/cloud/pkg/imageprepullcontroller/imageprepullcontroller.go @@ -0,0 +1,91 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package imageprepullcontroller + +import ( + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/beehive/pkg/core" + "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" + "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/config" + "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/controller" + "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1" +) + +// ImagePrePullController is controller for processing prepull images on edge nodes +type ImagePrePullController struct { + downstream *controller.DownstreamController + upstream *controller.UpstreamController + enable bool +} + +var _ core.Module = (*ImagePrePullController)(nil) + +func newImagePrePullController(enable bool) *ImagePrePullController { + if !enable { + return &ImagePrePullController{enable: enable} + } + downstream, err := controller.NewDownstreamController(informers.GetInformersManager().GetKubeEdgeInformerFactory()) + if err != nil { + klog.Exitf("New ImagePrePull Controller downstream failed with error: %s", err) + } + upstream, err := controller.NewUpstreamController(downstream) + if err != nil { + klog.Exitf("New ImagePrePull Controller upstream failed with error: %s", err) + } + return &ImagePrePullController{ + downstream: downstream, + upstream: upstream, + enable: enable, + } +} + +func Register(dc *v1alpha1.ImagePrePullController) { + config.InitConfigure(dc) + core.Register(newImagePrePullController(dc.Enable)) +} + +// Name of controller +func (uc *ImagePrePullController) Name() string { + return modules.ImagePrePullControllerModuleName +} + +// Group of controller +func (uc *ImagePrePullController) Group() string { + return modules.ImagePrePullControllerModuleGroup +} + +// Enable indicates whether enable this module +func (uc *ImagePrePullController) Enable() bool { + return uc.enable +} + +// Start controller +func (uc *ImagePrePullController) Start() { + if err := uc.downstream.Start(); err != nil { + klog.Exitf("start ImagePrePullJob controller downstream failed with error: %s", err) + } + // wait for downstream controller to start and load ImagePrePullJob + // TODO think about sync + time.Sleep(1 * time.Second) + if err := uc.upstream.Start(); err != nil { + klog.Exitf("start ImagePrePullJob controller upstream failed with error: %s", err) + } +} diff --git a/cloud/pkg/imageprepullcontroller/manager/common.go b/cloud/pkg/imageprepullcontroller/manager/common.go new file mode 100644 index 000000000..9f371875a --- /dev/null +++ b/cloud/pkg/imageprepullcontroller/manager/common.go @@ -0,0 +1,62 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2" +) + +// Manager define the interface of a Manager, ImagePrePullJob Manager implement it +type Manager interface { + Events() chan watch.Event +} + +// CommonResourceEventHandler can be used by ImagePrePullJob Manager +type CommonResourceEventHandler struct { + events chan watch.Event +} + +func (c *CommonResourceEventHandler) obj2Event(t watch.EventType, obj interface{}) { + eventObj, ok := obj.(runtime.Object) + if !ok { + klog.Warningf("unknown type: %T, ignore", obj) + return + } + c.events <- watch.Event{Type: t, Object: eventObj} +} + +// OnAdd handle Add event +func (c *CommonResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool) { + c.obj2Event(watch.Added, obj) +} + +// OnUpdate handle Update event +func (c *CommonResourceEventHandler) OnUpdate(oldObj, newObj interface{}) { + c.obj2Event(watch.Modified, newObj) +} + +// OnDelete handle Delete event +func (c *CommonResourceEventHandler) OnDelete(obj interface{}) { + c.obj2Event(watch.Deleted, obj) +} + +// NewCommonResourceEventHandler create CommonResourceEventHandler used by ImagePrePullJob Manager +func NewCommonResourceEventHandler(events chan watch.Event) *CommonResourceEventHandler { + return &CommonResourceEventHandler{events: events} +} diff --git a/cloud/pkg/imageprepullcontroller/manager/imageprepull.go b/cloud/pkg/imageprepullcontroller/manager/imageprepull.go new file mode 100644 index 000000000..cdb2eaccc --- /dev/null +++ b/cloud/pkg/imageprepullcontroller/manager/imageprepull.go @@ -0,0 +1,52 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "sync" + + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + + "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/config" +) + +// ImagePrePullJobManager is a manager watch ImagePrePullJob change event +type ImagePrePullJobManager struct { + // events from watch kubernetes api server + events chan watch.Event + + // ImagePrePullMap, key is ImagePrePullJob.Name, value is *v1alpha1.ImagePrePullJob{} + ImagePrePullMap sync.Map +} + +// Events return a channel, can receive all ImagePrePullJob event +func (dmm *ImagePrePullJobManager) Events() chan watch.Event { + return dmm.events +} + +// NewImagePrePullJobManager create ImagePrePullJobManager from config +func NewImagePrePullJobManager(si cache.SharedIndexInformer) (*ImagePrePullJobManager, error) { + events := make(chan watch.Event, config.Config.Buffer.ImagePrePullJobEvent) + rh := NewCommonResourceEventHandler(events) + _, err := si.AddEventHandler(rh) + if err != nil { + return nil, err + } + + return &ImagePrePullJobManager{events: events}, nil +} diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go b/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go index 8e4f96d98..6efee8012 100644 --- a/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go +++ b/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go @@ -38,6 +38,7 @@ import ( "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer" "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + "github.com/kubeedge/kubeedge/cloud/pkg/common/util" "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller/manager" "github.com/kubeedge/kubeedge/common/constants" commontypes "github.com/kubeedge/kubeedge/common/types" @@ -143,7 +144,7 @@ func (dc *DownstreamController) nodeUpgradeJobAdded(upgrade *v1alpha1.NodeUpgrad } // deduplicate: remove duplicate nodes to avoid repeating upgrade to the same node - nodesToUpgrade = RemoveDuplicateElement(nodesToUpgrade) + nodesToUpgrade = util.RemoveDuplicateElement(nodesToUpgrade) klog.Infof("Filtered finished, the below nodes are to upgrade\n%v\n", nodesToUpgrade) @@ -200,7 +201,7 @@ func (dc *DownstreamController) processUpgrade(node string, upgrade *v1alpha1.No return } - // process time out: cloud did not receive upgrade feedback from edge + // process time out: could not receive upgrade feedback from edge // send upgrade timeout response message to upstream go dc.handleNodeUpgradeJobTimeout(node, upgrade.Name, upgrade.Spec.Version, upgradeReq.HistoryID, upgrade.Spec.TimeoutSeconds) @@ -303,7 +304,7 @@ func needUpgrade(node *v1.Node, upgradeVersion string) bool { } // we only care about edge nodes, so just remove not edge nodes - if !isEdgeNode(node) { + if !util.IsEdgeNode(node) { klog.Warningf("Node(%s) is not edge node", node.Name) return false } diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/util.go b/cloud/pkg/nodeupgradejobcontroller/controller/util.go index 68a8e95ce..8b80c4261 100644 --- a/cloud/pkg/nodeupgradejobcontroller/controller/util.go +++ b/cloud/pkg/nodeupgradejobcontroller/controller/util.go @@ -22,9 +22,7 @@ import ( "time" "github.com/distribution/distribution/v3/reference" - metav1 "k8s.io/api/core/v1" - "github.com/kubeedge/kubeedge/common/constants" "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" ) @@ -55,23 +53,6 @@ func filterVersion(version string, expected string) bool { return version[index+length:] == expected } -// isEdgeNode checks whether a node is an Edge Node -// only if label {"node-role.kubernetes.io/edge": ""} exists, it is an edge node -func isEdgeNode(node *metav1.Node) bool { - if node.Labels == nil { - return false - } - if _, ok := node.Labels[constants.EdgeNodeRoleKey]; !ok { - return false - } - - if node.Labels[constants.EdgeNodeRoleKey] != constants.EdgeNodeRoleValue { - return false - } - - return true -} - // isCompleted returns true only if some/all edge upgrade is upgrading or completed func isCompleted(upgrade *v1alpha1.NodeUpgradeJob) bool { // all edge node upgrade is upgrading or completed @@ -89,21 +70,6 @@ func isCompleted(upgrade *v1alpha1.NodeUpgradeJob) bool { return false } -// RemoveDuplicateElement deduplicate -func RemoveDuplicateElement(s []string) []string { - result := make([]string, 0, len(s)) - temp := make(map[string]struct{}, len(s)) - - for _, item := range s { - if _, ok := temp[item]; !ok { - temp[item] = struct{}{} - result = append(result, item) - } - } - - return result -} - // UpdateNodeUpgradeJobStatus updates the status // return the updated result func UpdateNodeUpgradeJobStatus(old *v1alpha1.NodeUpgradeJob, status *v1alpha1.UpgradeStatus) *v1alpha1.NodeUpgradeJob { diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go b/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go index bfc93c7eb..499da62fa 100644 --- a/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go +++ b/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go @@ -66,39 +66,6 @@ func TestFilterVersion(t *testing.T) { } } -func TestRemoveDuplicateElement(t *testing.T) { - tests := []struct { - name string - input []string - expected []string - }{ - { - name: "case 1", - input: []string{"a", "b", "c"}, - expected: []string{"a", "b", "c"}, - }, - { - name: "case 2", - input: []string{"a", "a", "b", "c", "b", "a", "a"}, - expected: []string{"a", "b", "c"}, - }, - { - name: "case 3", - input: []string{}, - expected: []string{}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - result := RemoveDuplicateElement(test.input) - if !reflect.DeepEqual(result, test.expected) { - t.Errorf("Got = %v, Want = %v", result, test.expected) - } - }) - } -} - func TestUpdateUpgradeStatus(t *testing.T) { upgrade := v1alpha1.NodeUpgradeJob{ Status: v1alpha1.NodeUpgradeJobStatus{ diff --git a/common/constants/default.go b/common/constants/default.go index d75d3c795..07b642816 100644 --- a/common/constants/default.go +++ b/common/constants/default.go @@ -118,6 +118,11 @@ const ( DefaultNodeUpgradeJobEventBuffer = 1 DefaultNodeUpgradeJobWorkers = 1 + // ImagePrePullController + DefaultImagePrePullJobStatusBuffer = 1024 + DefaultImagePrePullJobEventBuffer = 1 + DefaultImagePrePullJobWorkers = 1 + // Resource sep ResourceSep = "/" diff --git a/common/types/types.go b/common/types/types.go index 33a709bff..e03667a1b 100644 --- a/common/types/types.go +++ b/common/types/types.go @@ -5,6 +5,8 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" ) // PodStatusRequest is Message.Content which comes from edge @@ -53,3 +55,20 @@ type ObjectResp struct { Object metaV1.Object Err error } + +// ImagePrePullJobRequest is image prepull msg from cloud to edge +type ImagePrePullJobRequest struct { + Images []string + NodeName string + Secret string + RetryTimes int32 + CheckItems []string +} + +// ImagePrePullJobResponse is used to report status msg to cloudhub https service from each node +type ImagePrePullJobResponse struct { + NodeName string + State v1alpha1.PrePullState + Reason string + ImageStatus []v1alpha1.ImageStatus +} diff --git a/edge/pkg/edgehub/upgrade/image_prepull.go b/edge/pkg/edgehub/upgrade/image_prepull.go new file mode 100644 index 000000000..d02a4cdc9 --- /dev/null +++ b/edge/pkg/edgehub/upgrade/image_prepull.go @@ -0,0 +1,188 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package upgrade + +import ( + "encoding/json" + "fmt" + "strings" + + v1 "k8s.io/api/core/v1" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/klog/v2" + + beehiveContext "github.com/kubeedge/beehive/pkg/core/context" + "github.com/kubeedge/beehive/pkg/core/model" + cloudmodules "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + "github.com/kubeedge/kubeedge/common/constants" + commontypes "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options" + "github.com/kubeedge/kubeedge/edge/pkg/common/modules" + "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients" + "github.com/kubeedge/kubeedge/edge/pkg/edgehub/common/msghandler" + metaclient "github.com/kubeedge/kubeedge/edge/pkg/metamanager/client" + "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" +) + +func init() { + handler := &prepullHandler{} + msghandler.RegisterHandler(handler) +} + +type prepullHandler struct{} + +func (uh *prepullHandler) Filter(message *model.Message) bool { + name := message.GetGroup() + return name == cloudmodules.ImagePrePullControllerModuleGroup +} + +func (uh *prepullHandler) Process(message *model.Message, clientHub clients.Adapter) error { + // get edgecore config + edgecoreconfig := options.GetEdgeCoreConfig() + if edgecoreconfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint == "" { + edgecoreconfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint = edgecoreconfig.Modules.Edged.RemoteRuntimeEndpoint + } + nodeName := edgecoreconfig.Modules.Edged.HostnameOverride + + var errmsg, jobName string + defer func() { + if errmsg != "" { + sendResponseMessage(nodeName, jobName, errmsg, nil, v1alpha1.PrePullFailed) + } + }() + + jobName, err := parseJobName(message.GetResource()) + if err != nil { + errmsg = fmt.Sprintf("failed to parse prepull resource, err: %v", err) + return fmt.Errorf(errmsg) + } + + // parse message request + var prePullReq commontypes.ImagePrePullJobRequest + data, err := message.GetContentData() + if err != nil { + errmsg = fmt.Sprintf("failed to get message content, err: %v", err) + return fmt.Errorf(errmsg) + } + err = json.Unmarshal(data, &prePullReq) + if err != nil { + errmsg = fmt.Sprintf("failed to unmarshal message content, err: %v", err) + return fmt.Errorf(errmsg) + } + if nodeName != prePullReq.NodeName { + errmsg = fmt.Sprintf("request node name %s is not match with the edge node %s", prePullReq.NodeName, nodeName) + return fmt.Errorf(errmsg) + } + + // todo check items such as disk according to req.checkitem config + + // pull images + container, err := util.NewContainerRuntime(edgecoreconfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint, edgecoreconfig.Modules.Edged.TailoredKubeletConfig.CgroupDriver) + if err != nil { + errmsg = fmt.Sprintf("failed to new container runtime: %v", err) + return fmt.Errorf(errmsg) + } + + go func() { + pullMsg, resp, state := prePullImages(prePullReq, container) + sendResponseMessage(nodeName, jobName, pullMsg, resp, state) + }() + return nil +} + +func sendResponseMessage(nodeName, jobName, pullMsg string, imageStatus []v1alpha1.ImageStatus, state v1alpha1.PrePullState) { + resp := commontypes.ImagePrePullJobResponse{ + NodeName: nodeName, + State: state, + Reason: pullMsg, + ImageStatus: imageStatus, + } + msg := model.NewMessage("").SetRoute(modules.EdgeHubModuleName, modules.HubGroup). + SetResourceOperation(fmt.Sprintf("node/%s/prepull/%s", nodeName, jobName), "prepull").FillBody(resp) + beehiveContext.Send(modules.EdgeHubModuleName, *msg) +} + +func prePullImages(prePullReq commontypes.ImagePrePullJobRequest, container util.ContainerRuntime) (string, []v1alpha1.ImageStatus, v1alpha1.PrePullState) { + var errmsg string + authConfig, err := makeAuthConfig(prePullReq.Secret) + if err != nil { + errmsg = fmt.Sprintf("failed to get prepull secret, err: %v", err) + return errmsg, nil, v1alpha1.PrePullFailed + } + + state := v1alpha1.PrePullSuccessful + var imageStatus []v1alpha1.ImageStatus + for _, image := range prePullReq.Images { + prePullStatus := v1alpha1.ImageStatus{ + Image: image, + } + for i := 0; i < int(prePullReq.RetryTimes); i++ { + err = container.PullImage(image, authConfig, nil) + if err == nil { + break + } + } + if err != nil { + klog.Errorf("pull image %s failed, err: %v", image, err) + errmsg = "prepull images failed" + state = v1alpha1.PrePullFailed + prePullStatus.State = v1alpha1.PrePullFailed + prePullStatus.Reason = err.Error() + } else { + klog.Infof("pull image %s successfully!", image) + prePullStatus.State = v1alpha1.PrePullSuccessful + } + imageStatus = append(imageStatus, prePullStatus) + } + + return errmsg, imageStatus, state +} + +func parseJobName(resource string) (string, error) { + var jobName string + sli := strings.Split(resource, constants.ResourceSep) + if len(sli) != 2 { + return jobName, fmt.Errorf("the resource %s is not the standard type", resource) + } + return sli[1], nil +} + +func makeAuthConfig(pullsecret string) (*runtimeapi.AuthConfig, error) { + if pullsecret == "" { + return nil, nil + } + + secretSli := strings.Split(pullsecret, constants.ResourceSep) + if len(secretSli) != 2 { + return nil, fmt.Errorf("pull secret format is not correct") + } + + client := metaclient.New() + secret, err := client.Secrets(secretSli[0]).Get(secretSli[1]) + if err != nil { + return nil, fmt.Errorf("get secret %s failed, %v", secretSli[1], err) + } + + var auth runtimeapi.AuthConfig + err = json.Unmarshal(secret.Data[v1.DockerConfigJsonKey], &auth) + if err != nil { + return nil, fmt.Errorf("unmarshal secret %s to auth file failed, %v", secretSli[1], err) + } + + return &auth, nil +} diff --git a/hack/generate-crds.sh b/hack/generate-crds.sh index 295afa102..bc29ee1f4 100755 --- a/hack/generate-crds.sh +++ b/hack/generate-crds.sh @@ -86,6 +86,7 @@ function :copy:to:destination { mkdir -p ${CRD_OUTPUTS}/reliablesyncs mkdir -p ${CRD_OUTPUTS}/apps mkdir -p ${CRD_OUTPUTS}/policy + mkdir -p ${CRD_OUTPUTS}/operations for entry in `ls /tmp/crds/*.yaml`; do CRD_NAME=$(echo ${entry} | cut -d'.' -f3 | cut -d'_' -f2) @@ -108,7 +109,7 @@ function :copy:to:destination { elif [ "$CRD_NAME" == "objectsyncs" ]; then cp -v ${entry} ${CRD_OUTPUTS}/reliablesyncs/objectsync_${RELIABLESYNCS_VERSION}.yaml cp -v ${entry} ${HELM_CRDS_DIR}/objectsync_${RELIABLESYNCS_VERSION}.yaml - elif [ "$CRD_NAME" == "nodeupgradejobs" ]; then + elif [ "$CRD_NAME" == "nodeupgradejobs" ] || [ "$CRD_NAME" == "imageprepulljobs" ]; then CRD_NAME=$(remove_suffix_s "$CRD_NAME") cp -v ${entry} ${CRD_OUTPUTS}/operations/operations_${OPERATIONS_VERSION}_${CRD_NAME}.yaml cp -v ${entry} ${HELM_CRDS_DIR}/operations_${OPERATIONS_VERSION}_${CRD_NAME}.yaml diff --git a/hack/local-up-kubeedge.sh b/hack/local-up-kubeedge.sh index 750cf13bc..d008a082c 100755 --- a/hack/local-up-kubeedge.sh +++ b/hack/local-up-kubeedge.sh @@ -102,6 +102,7 @@ function create_rule_crd { function create_operation_crd { echo "creating the operation crd..." kubectl apply -f ${KUBEEDGE_ROOT}/build/crds/operations/operations_v1alpha1_nodeupgradejob.yaml + kubectl apply -f ${KUBEEDGE_ROOT}/build/crds/operations/operations_v1alpha1_imageprepulljob.yaml } function create_serviceaccountaccess_crd { diff --git a/keadm/cmd/keadm/app/cmd/util/image.go b/keadm/cmd/keadm/app/cmd/util/image.go index b92ed0799..34c81c89a 100644 --- a/keadm/cmd/keadm/app/cmd/util/image.go +++ b/keadm/cmd/keadm/app/cmd/util/image.go @@ -40,6 +40,7 @@ var mqttLabel = map[string]string{"io.kubeedge.edgecore/mqtt": image.EdgeMQTT} type ContainerRuntime interface { PullImages(images []string) error + PullImage(image string, authConfig *runtimeapi.AuthConfig, sandboxConfig *runtimeapi.PodSandboxConfig) error CopyResources(edgeImage string, files map[string]string) error RunMQTT(mqttImage string) error RemoveMQTT() error @@ -86,21 +87,28 @@ func convertCRIImage(image string) string { func (runtime *CRIRuntime) PullImages(images []string) error { for _, image := range images { - image = convertCRIImage(image) fmt.Printf("Pulling %s ...\n", image) - imageSpec := &runtimeapi.ImageSpec{Image: image} - status, err := runtime.ImageManagerService.ImageStatus(runtime.ctx, imageSpec, true) + err := runtime.PullImage(image, nil, nil) if err != nil { return err } - if status == nil || status.Image == nil { - if _, err := runtime.ImageManagerService.PullImage(runtime.ctx, imageSpec, nil, nil); err != nil { - return err - } - } fmt.Printf("Successfully pulled %s\n", image) } + return nil +} +func (runtime *CRIRuntime) PullImage(image string, authConfig *runtimeapi.AuthConfig, sandboxConfig *runtimeapi.PodSandboxConfig) error { + image = convertCRIImage(image) + imageSpec := &runtimeapi.ImageSpec{Image: image} + status, err := runtime.ImageManagerService.ImageStatus(runtime.ctx, imageSpec, true) + if err != nil { + return err + } + if status == nil || status.Image == nil { + if _, err := runtime.ImageManagerService.PullImage(runtime.ctx, imageSpec, authConfig, sandboxConfig); err != nil { + return err + } + } return nil } diff --git a/manifests/charts/cloudcore/crds/operations_v1alpha1_imageprepulljob.yaml b/manifests/charts/cloudcore/crds/operations_v1alpha1_imageprepulljob.yaml new file mode 100644 index 000000000..903516d32 --- /dev/null +++ b/manifests/charts/cloudcore/crds/operations_v1alpha1_imageprepulljob.yaml @@ -0,0 +1,204 @@ + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.6.2 + creationTimestamp: null + name: imageprepulljobs.operations.kubeedge.io +spec: + group: operations.kubeedge.io + names: + kind: ImagePrePullJob + listKind: ImagePrePullJobList + plural: imageprepulljobs + singular: imageprepulljob + scope: Cluster + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: ImagePrePullJob is used to prepull images on edge node. + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: Spec represents the specification of the desired behavior + of ImagePrePullJob. + properties: + imagePrePullTemplate: + description: ImagePrepullTemplate represents original templates of + imagePrePull + properties: + checkItems: + description: CheckItems specifies the items need to be checked + before the task is executed. The default CheckItems value is + disk. + items: + type: string + type: array + imageSecrets: + description: ImageSecret specifies the secret for image pull if + private registry used. Use {namespace}/{secretName} in format. + type: string + images: + description: Images is the image list to be prepull + items: + type: string + type: array + labelSelector: + description: LabelSelector is a filter to select member clusters + by labels. It must match a node's labels for the NodeUpgradeJob + to be operated on that node. Please note that sets of NodeNames + and LabelSelector are ORed. Users must set one and can only + set one. + properties: + matchExpressions: + description: matchExpressions is a list of label selector + requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector + that contains values, a key, and an operator that relates + the key and values. + properties: + key: + description: key is the label key that the selector + applies to. + type: string + operator: + description: operator represents a key's relationship + to a set of values. Valid operators are In, NotIn, + Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. If + the operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. This array is replaced + during a strategic merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. A + single {key,value} in the matchLabels map is equivalent + to an element of matchExpressions, whose key field is "key", + the operator is "In", and the values array contains only + "value". The requirements are ANDed. + type: object + type: object + nodeNames: + description: NodeNames is a request to select some specific nodes. + If it is non-empty, the upgrade job simply select these edge + nodes to do upgrade operation. Please note that sets of NodeNames + and LabelSelector are ORed. Users must set one and can only + set one. + items: + type: string + type: array + retryTimes: + description: RetryTimes specifies the retry times if image pull + failed on each edgenode. Default to 0 + format: int32 + type: integer + timeoutSecondsOnEachNode: + description: TimeoutSecondsOnEachNode limits the duration of the + image prepull job on each edgenode. Default to 360. If set to + 0, we'll use the default value 360. + format: int32 + type: integer + type: object + type: object + status: + description: Status represents the status of ImagePrePullJob. + properties: + state: + description: 'State represents for the state phase of the ImagePrePullJob. + There are four possible state values: "", prechecking, prepulling, + successful, failed.' + enum: + - prepulling + - successful + - failed + type: string + status: + description: Status contains image prepull status for each edge node. + items: + description: ImagePrePullStatus stores image prepull status for + each edge node. + properties: + imageStatus: + description: ImageStatus represents the prepull status for each + image + items: + description: ImageStatus stores the prepull status for each + image. + properties: + image: + description: Image is the name of the image + type: string + reason: + description: Reason represents the fail reason if image + pull failed + type: string + state: + description: 'State represents for the state phase of + this image pull on the edge node There are two possible + state values: successful, failed.' + enum: + - prepulling + - successful + - failed + type: string + type: object + type: array + nodeName: + description: NodeName is the name of edge node. + type: string + reason: + description: Reason represents the fail reason if images prepull + failed on the edge node + type: string + state: + description: 'State represents for the state phase of the ImagePrepullJob + on the edge node. There are five possible state values: "", + prepulling, successful, failed.' + enum: + - prepulling + - successful + - failed + type: string + type: object + type: array + type: object + required: + - spec + type: object + served: true + storage: true + subresources: + status: {} +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] diff --git a/manifests/charts/cloudcore/templates/configmap_cloudcore.yaml b/manifests/charts/cloudcore/templates/configmap_cloudcore.yaml index 9edc48f78..2986cfd91 100644 --- a/manifests/charts/cloudcore/templates/configmap_cloudcore.yaml +++ b/manifests/charts/cloudcore/templates/configmap_cloudcore.yaml @@ -55,3 +55,5 @@ data: mode: {{ .Values.iptablesManager.mode }} nodeUpgradeJobController: enable: {{ .Values.cloudCore.modules.nodeUpgradeJobController.enable }} + imagePrePullController: + enable: {{ .Values.cloudCore.modules.imagePrePullController.enable }} diff --git a/manifests/charts/cloudcore/templates/rbac_cloudcore.yaml b/manifests/charts/cloudcore/templates/rbac_cloudcore.yaml index 7d4c26fa6..eba4ee29d 100644 --- a/manifests/charts/cloudcore/templates/rbac_cloudcore.yaml +++ b/manifests/charts/cloudcore/templates/rbac_cloudcore.yaml @@ -38,7 +38,7 @@ rules: resources: ["*"] verbs: ["get", "list", "watch"] - apiGroups: ["operations.kubeedge.io"] - resources: ["nodeupgradejobs", "nodeupgradejobs/status"] + resources: ["nodeupgradejobs", "nodeupgradejobs/status", "imageprepulljobs", "imageprepulljobs/status"] verbs: ["get", "list", "watch", "update", "patch"] --- diff --git a/manifests/charts/cloudcore/values.yaml b/manifests/charts/cloudcore/values.yaml index ac88a9376..d33dac1b6 100644 --- a/manifests/charts/cloudcore/values.yaml +++ b/manifests/charts/cloudcore/values.yaml @@ -59,6 +59,8 @@ cloudCore: enable: false nodeUpgradeJobController: enable: false + imagePrePullController: + enable: false service: enable: true type: "NodePort" diff --git a/manifests/profiles/version.yaml b/manifests/profiles/version.yaml index e382be923..a959dfb55 100644 --- a/manifests/profiles/version.yaml +++ b/manifests/profiles/version.yaml @@ -53,6 +53,8 @@ cloudCore: enable: false nodeUpgradeJobController: enable: false + imagePrePullController: + enable: false service: enable: false type: "NodePort" diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go index 31e0857e3..66128ec49 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go @@ -108,6 +108,16 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig { NodeUpgradeJobWorkers: constants.DefaultNodeUpgradeJobWorkers, }, }, + ImagePrePullController: &ImagePrePullController{ + Enable: false, + Buffer: &ImagePrePullControllerBuffer{ + ImagePrePullJobStatus: constants.DefaultImagePrePullJobStatusBuffer, + ImagePrePullJobEvent: constants.DefaultImagePrePullJobEventBuffer, + }, + Load: &ImagePrePullControllerLoad{ + ImagePrePullJobWorkers: constants.DefaultImagePrePullJobWorkers, + }, + }, SyncController: &SyncController{ Enable: true, }, diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go index 8d04c9359..955960fe0 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go @@ -90,6 +90,8 @@ type Modules struct { DeviceController *DeviceController `json:"deviceController,omitempty"` // NodeUpgradeJobController indicates NodeUpgradeJobController module config NodeUpgradeJobController *NodeUpgradeJobController `json:"nodeUpgradeJobController,omitempty"` + // ImagePrePullController indicates ImagePrePullController module config + ImagePrePullController *ImagePrePullController `json:"imagePrePullController,omitempty"` // SyncController indicates SyncController module config SyncController *SyncController `json:"syncController,omitempty"` // DynamicController indicates DynamicController module config @@ -408,6 +410,35 @@ type NodeUpgradeJobControllerLoad struct { NodeUpgradeJobWorkers int32 `json:"nodeUpgradeJobWorkers,omitempty"` } +// ImagePrePullController indicates the operations controller +type ImagePrePullController struct { + // Enable indicates whether ImagePrePullController is enabled, + // if set to false (for debugging etc.), skip checking other ImagePrePullController configs. + // default false + Enable bool `json:"enable"` + // Buffer indicates Operation Controller buffer + Buffer *ImagePrePullControllerBuffer `json:"buffer,omitempty"` + // Load indicates Operation Controller Load + Load *ImagePrePullControllerLoad `json:"load,omitempty"` +} + +// ImagePrePullControllerBuffer indicates ImagePrePullController buffer +type ImagePrePullControllerBuffer struct { + // ImagePrePullJobStatus indicates the buffer of update ImagePrePullJob status + // default 1024 + ImagePrePullJobStatus int32 `json:"imagePrePullJobStatus,omitempty"` + // ImagePrePullJobEvent indicates the buffer of ImagePrePullJob event + // default 1 + ImagePrePullJobEvent int32 `json:"imagePrePullJobEvent,omitempty"` +} + +// ImagePrePullControllerLoad indicates the ImagePrePullController load +type ImagePrePullControllerLoad struct { + // ImagePrePullJobWorkers indicates the load of update ImagePrePullJob workers + // default 1 + ImagePrePullJobWorkers int32 `json:"imagePrePullJobWorkers,omitempty"` +} + // SyncController indicates the sync controller type SyncController struct { // Enable indicates whether syncController is enabled, diff --git a/pkg/apis/operations/v1alpha1/imageprepull_types.go b/pkg/apis/operations/v1alpha1/imageprepull_types.go new file mode 100644 index 000000000..0ac53a975 --- /dev/null +++ b/pkg/apis/operations/v1alpha1/imageprepull_types.go @@ -0,0 +1,158 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// +genclient +// +genclient:nonNamespaced +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ImagePrePullJob is used to prepull images on edge node. +// +k8s:openapi-gen=true +// +kubebuilder:subresource:status +// +kubebuilder:resource:scope=Cluster +type ImagePrePullJob struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + // Spec represents the specification of the desired behavior of ImagePrePullJob. + // +required + Spec ImagePrePullJobSpec `json:"spec"` + + // Status represents the status of ImagePrePullJob. + // +optional + Status ImagePrePullJobStatus `json:"status,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ImagePrePullJobList is a list of ImagePrePullJob. +type ImagePrePullJobList struct { + // Standard type metadata. + metav1.TypeMeta `json:",inline"` + + // Standard list metadata. + metav1.ListMeta `json:"metadata,omitempty"` + + // List of ImagePrePullJob. + Items []ImagePrePullJob `json:"items"` +} + +// ImagePrePullSpec represents the specification of the desired behavior of ImagePrePullJob. +type ImagePrePullJobSpec struct { + // ImagePrepullTemplate represents original templates of imagePrePull + ImagePrePullTemplate ImagePrePullTemplate `json:"imagePrePullTemplate,omitempty"` +} + +// ImagePrePullTemplate represents original templates of imagePrePull +type ImagePrePullTemplate struct { + // Images is the image list to be prepull + Images []string `json:"images,omitempty"` + + // NodeNames is a request to select some specific nodes. If it is non-empty, + // the upgrade job simply select these edge nodes to do upgrade operation. + // Please note that sets of NodeNames and LabelSelector are ORed. + // Users must set one and can only set one. + // +optional + NodeNames []string `json:"nodeNames,omitempty"` + // LabelSelector is a filter to select member clusters by labels. + // It must match a node's labels for the NodeUpgradeJob to be operated on that node. + // Please note that sets of NodeNames and LabelSelector are ORed. + // Users must set one and can only set one. + // +optional + LabelSelector *metav1.LabelSelector `json:"labelSelector,omitempty"` + + // CheckItems specifies the items need to be checked before the task is executed. + // The default CheckItems value is disk. + // +optional + CheckItems []string `json:"checkItems,omitempty"` + + // ImageSecret specifies the secret for image pull if private registry used. + // Use {namespace}/{secretName} in format. + // +optional + ImageSecret string `json:"imageSecrets,omitempty"` + + // TimeoutSecondsOnEachNode limits the duration of the image prepull job on each edgenode. + // Default to 360. + // If set to 0, we'll use the default value 360. + // +optional + TimeoutSecondsOnEachNode *uint32 `json:"timeoutSecondsOnEachNode,omitempty"` + + // RetryTimes specifies the retry times if image pull failed on each edgenode. + // Default to 0 + // +optional + RetryTimes int32 `json:"retryTimes,omitempty"` +} + +// PrePullState describe the PrePullState of image prepull operation on edge nodes. +// +kubebuilder:validation:Enum=prepulling;successful;failed +type PrePullState string + +// Valid values of PrepullState +const ( + PrePullInitialValue PrePullState = "" + PrePulling PrePullState = "prepulling" + PrePullSuccessful PrePullState = "successful" + PrePullFailed PrePullState = "failed" +) + +// ImagePrePullJobStatus stores the status of ImagePrePullJob. +// contains images prepull status on multiple edge nodes. +// +kubebuilder:validation:Type=object +type ImagePrePullJobStatus struct { + // State represents for the state phase of the ImagePrePullJob. + // There are four possible state values: "", prechecking, prepulling, successful, failed. + State PrePullState `json:"state,omitempty"` + + // Status contains image prepull status for each edge node. + Status []ImagePrePullStatus `json:"status,omitempty"` +} + +// ImagePrePullStatus stores image prepull status for each edge node. +// +kubebuilder:validation:Type=object +type ImagePrePullStatus struct { + // NodeName is the name of edge node. + NodeName string `json:"nodeName,omitempty"` + + // State represents for the state phase of the ImagePrepullJob on the edge node. + // There are five possible state values: "", prepulling, successful, failed. + State PrePullState `json:"state,omitempty"` + + // Reason represents the fail reason if images prepull failed on the edge node + Reason string `json:"reason,omitempty"` + + // ImageStatus represents the prepull status for each image + ImageStatus []ImageStatus `json:"imageStatus,omitempty"` +} + +// ImageStatus stores the prepull status for each image. +// +kubebuilder:validation:Type=object +type ImageStatus struct { + // Image is the name of the image + Image string `json:"image,omitempty"` + + // State represents for the state phase of this image pull on the edge node + // There are two possible state values: successful, failed. + State PrePullState `json:"state,omitempty"` + + // Reason represents the fail reason if image pull failed + // +optional + Reason string `json:"reason,omitempty"` +} diff --git a/pkg/apis/operations/v1alpha1/register.go b/pkg/apis/operations/v1alpha1/register.go index c6aa1325e..d78a1013d 100644 --- a/pkg/apis/operations/v1alpha1/register.go +++ b/pkg/apis/operations/v1alpha1/register.go @@ -65,6 +65,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, &NodeUpgradeJob{}, &NodeUpgradeJobList{}, + &ImagePrePullJob{}, + &ImagePrePullJobList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/apis/operations/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/operations/v1alpha1/zz_generated.deepcopy.go index fb9863292..8b38fd4ec 100644 --- a/pkg/apis/operations/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/operations/v1alpha1/zz_generated.deepcopy.go @@ -43,6 +43,185 @@ func (in *History) DeepCopy() *History { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ImagePrePullJob) DeepCopyInto(out *ImagePrePullJob) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ImagePrePullJob. +func (in *ImagePrePullJob) DeepCopy() *ImagePrePullJob { + if in == nil { + return nil + } + out := new(ImagePrePullJob) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ImagePrePullJob) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ImagePrePullJobList) DeepCopyInto(out *ImagePrePullJobList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]ImagePrePullJob, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ImagePrePullJobList. +func (in *ImagePrePullJobList) DeepCopy() *ImagePrePullJobList { + if in == nil { + return nil + } + out := new(ImagePrePullJobList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ImagePrePullJobList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ImagePrePullJobSpec) DeepCopyInto(out *ImagePrePullJobSpec) { + *out = *in + in.ImagePrePullTemplate.DeepCopyInto(&out.ImagePrePullTemplate) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ImagePrePullJobSpec. +func (in *ImagePrePullJobSpec) DeepCopy() *ImagePrePullJobSpec { + if in == nil { + return nil + } + out := new(ImagePrePullJobSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ImagePrePullJobStatus) DeepCopyInto(out *ImagePrePullJobStatus) { + *out = *in + if in.Status != nil { + in, out := &in.Status, &out.Status + *out = make([]ImagePrePullStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ImagePrePullJobStatus. +func (in *ImagePrePullJobStatus) DeepCopy() *ImagePrePullJobStatus { + if in == nil { + return nil + } + out := new(ImagePrePullJobStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ImagePrePullStatus) DeepCopyInto(out *ImagePrePullStatus) { + *out = *in + if in.ImageStatus != nil { + in, out := &in.ImageStatus, &out.ImageStatus + *out = make([]ImageStatus, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ImagePrePullStatus. +func (in *ImagePrePullStatus) DeepCopy() *ImagePrePullStatus { + if in == nil { + return nil + } + out := new(ImagePrePullStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ImagePrePullTemplate) DeepCopyInto(out *ImagePrePullTemplate) { + *out = *in + if in.Images != nil { + in, out := &in.Images, &out.Images + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.NodeNames != nil { + in, out := &in.NodeNames, &out.NodeNames + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.LabelSelector != nil { + in, out := &in.LabelSelector, &out.LabelSelector + *out = new(v1.LabelSelector) + (*in).DeepCopyInto(*out) + } + if in.CheckItems != nil { + in, out := &in.CheckItems, &out.CheckItems + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.TimeoutSecondsOnEachNode != nil { + in, out := &in.TimeoutSecondsOnEachNode, &out.TimeoutSecondsOnEachNode + *out = new(uint32) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ImagePrePullTemplate. +func (in *ImagePrePullTemplate) DeepCopy() *ImagePrePullTemplate { + if in == nil { + return nil + } + out := new(ImagePrePullTemplate) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ImageStatus) DeepCopyInto(out *ImageStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ImageStatus. +func (in *ImageStatus) DeepCopy() *ImageStatus { + if in == nil { + return nil + } + out := new(ImageStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NodeUpgradeJob) DeepCopyInto(out *NodeUpgradeJob) { *out = *in out.TypeMeta = in.TypeMeta diff --git a/pkg/client/clientset/versioned/typed/operations/v1alpha1/fake/fake_imageprepulljob.go b/pkg/client/clientset/versioned/typed/operations/v1alpha1/fake/fake_imageprepulljob.go new file mode 100644 index 000000000..cb7a54910 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/operations/v1alpha1/fake/fake_imageprepulljob.go @@ -0,0 +1,132 @@ +/* +Copyright The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeImagePrePullJobs implements ImagePrePullJobInterface +type FakeImagePrePullJobs struct { + Fake *FakeOperationsV1alpha1 +} + +var imageprepulljobsResource = v1alpha1.SchemeGroupVersion.WithResource("imageprepulljobs") + +var imageprepulljobsKind = v1alpha1.SchemeGroupVersion.WithKind("ImagePrePullJob") + +// Get takes name of the imagePrePullJob, and returns the corresponding imagePrePullJob object, and an error if there is any. +func (c *FakeImagePrePullJobs) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.ImagePrePullJob, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootGetAction(imageprepulljobsResource, name), &v1alpha1.ImagePrePullJob{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ImagePrePullJob), err +} + +// List takes label and field selectors, and returns the list of ImagePrePullJobs that match those selectors. +func (c *FakeImagePrePullJobs) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.ImagePrePullJobList, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootListAction(imageprepulljobsResource, imageprepulljobsKind, opts), &v1alpha1.ImagePrePullJobList{}) + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.ImagePrePullJobList{ListMeta: obj.(*v1alpha1.ImagePrePullJobList).ListMeta} + for _, item := range obj.(*v1alpha1.ImagePrePullJobList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested imagePrePullJobs. +func (c *FakeImagePrePullJobs) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewRootWatchAction(imageprepulljobsResource, opts)) +} + +// Create takes the representation of a imagePrePullJob and creates it. Returns the server's representation of the imagePrePullJob, and an error, if there is any. +func (c *FakeImagePrePullJobs) Create(ctx context.Context, imagePrePullJob *v1alpha1.ImagePrePullJob, opts v1.CreateOptions) (result *v1alpha1.ImagePrePullJob, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootCreateAction(imageprepulljobsResource, imagePrePullJob), &v1alpha1.ImagePrePullJob{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ImagePrePullJob), err +} + +// Update takes the representation of a imagePrePullJob and updates it. Returns the server's representation of the imagePrePullJob, and an error, if there is any. +func (c *FakeImagePrePullJobs) Update(ctx context.Context, imagePrePullJob *v1alpha1.ImagePrePullJob, opts v1.UpdateOptions) (result *v1alpha1.ImagePrePullJob, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootUpdateAction(imageprepulljobsResource, imagePrePullJob), &v1alpha1.ImagePrePullJob{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ImagePrePullJob), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeImagePrePullJobs) UpdateStatus(ctx context.Context, imagePrePullJob *v1alpha1.ImagePrePullJob, opts v1.UpdateOptions) (*v1alpha1.ImagePrePullJob, error) { + obj, err := c.Fake. + Invokes(testing.NewRootUpdateSubresourceAction(imageprepulljobsResource, "status", imagePrePullJob), &v1alpha1.ImagePrePullJob{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ImagePrePullJob), err +} + +// Delete takes name of the imagePrePullJob and deletes it. Returns an error if one occurs. +func (c *FakeImagePrePullJobs) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewRootDeleteActionWithOptions(imageprepulljobsResource, name, opts), &v1alpha1.ImagePrePullJob{}) + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeImagePrePullJobs) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewRootDeleteCollectionAction(imageprepulljobsResource, listOpts) + + _, err := c.Fake.Invokes(action, &v1alpha1.ImagePrePullJobList{}) + return err +} + +// Patch applies the patch and returns the patched imagePrePullJob. +func (c *FakeImagePrePullJobs) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.ImagePrePullJob, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootPatchSubresourceAction(imageprepulljobsResource, name, pt, data, subresources...), &v1alpha1.ImagePrePullJob{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ImagePrePullJob), err +} diff --git a/pkg/client/clientset/versioned/typed/operations/v1alpha1/fake/fake_operations_client.go b/pkg/client/clientset/versioned/typed/operations/v1alpha1/fake/fake_operations_client.go index 7a6025ff5..4e898107d 100644 --- a/pkg/client/clientset/versioned/typed/operations/v1alpha1/fake/fake_operations_client.go +++ b/pkg/client/clientset/versioned/typed/operations/v1alpha1/fake/fake_operations_client.go @@ -28,6 +28,10 @@ type FakeOperationsV1alpha1 struct { *testing.Fake } +func (c *FakeOperationsV1alpha1) ImagePrePullJobs() v1alpha1.ImagePrePullJobInterface { + return &FakeImagePrePullJobs{c} +} + func (c *FakeOperationsV1alpha1) NodeUpgradeJobs() v1alpha1.NodeUpgradeJobInterface { return &FakeNodeUpgradeJobs{c} } diff --git a/pkg/client/clientset/versioned/typed/operations/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/operations/v1alpha1/generated_expansion.go index e664010bf..3f5448bb5 100644 --- a/pkg/client/clientset/versioned/typed/operations/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/operations/v1alpha1/generated_expansion.go @@ -18,4 +18,6 @@ limitations under the License. package v1alpha1 +type ImagePrePullJobExpansion interface{} + type NodeUpgradeJobExpansion interface{} diff --git a/pkg/client/clientset/versioned/typed/operations/v1alpha1/imageprepulljob.go b/pkg/client/clientset/versioned/typed/operations/v1alpha1/imageprepulljob.go new file mode 100644 index 000000000..7dd77f40b --- /dev/null +++ b/pkg/client/clientset/versioned/typed/operations/v1alpha1/imageprepulljob.go @@ -0,0 +1,184 @@ +/* +Copyright The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + "time" + + v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + scheme "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// ImagePrePullJobsGetter has a method to return a ImagePrePullJobInterface. +// A group's client should implement this interface. +type ImagePrePullJobsGetter interface { + ImagePrePullJobs() ImagePrePullJobInterface +} + +// ImagePrePullJobInterface has methods to work with ImagePrePullJob resources. +type ImagePrePullJobInterface interface { + Create(ctx context.Context, imagePrePullJob *v1alpha1.ImagePrePullJob, opts v1.CreateOptions) (*v1alpha1.ImagePrePullJob, error) + Update(ctx context.Context, imagePrePullJob *v1alpha1.ImagePrePullJob, opts v1.UpdateOptions) (*v1alpha1.ImagePrePullJob, error) + UpdateStatus(ctx context.Context, imagePrePullJob *v1alpha1.ImagePrePullJob, opts v1.UpdateOptions) (*v1alpha1.ImagePrePullJob, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.ImagePrePullJob, error) + List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.ImagePrePullJobList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.ImagePrePullJob, err error) + ImagePrePullJobExpansion +} + +// imagePrePullJobs implements ImagePrePullJobInterface +type imagePrePullJobs struct { + client rest.Interface +} + +// newImagePrePullJobs returns a ImagePrePullJobs +func newImagePrePullJobs(c *OperationsV1alpha1Client) *imagePrePullJobs { + return &imagePrePullJobs{ + client: c.RESTClient(), + } +} + +// Get takes name of the imagePrePullJob, and returns the corresponding imagePrePullJob object, and an error if there is any. +func (c *imagePrePullJobs) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.ImagePrePullJob, err error) { + result = &v1alpha1.ImagePrePullJob{} + err = c.client.Get(). + Resource("imageprepulljobs"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of ImagePrePullJobs that match those selectors. +func (c *imagePrePullJobs) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.ImagePrePullJobList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.ImagePrePullJobList{} + err = c.client.Get(). + Resource("imageprepulljobs"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested imagePrePullJobs. +func (c *imagePrePullJobs) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Resource("imageprepulljobs"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a imagePrePullJob and creates it. Returns the server's representation of the imagePrePullJob, and an error, if there is any. +func (c *imagePrePullJobs) Create(ctx context.Context, imagePrePullJob *v1alpha1.ImagePrePullJob, opts v1.CreateOptions) (result *v1alpha1.ImagePrePullJob, err error) { + result = &v1alpha1.ImagePrePullJob{} + err = c.client.Post(). + Resource("imageprepulljobs"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(imagePrePullJob). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a imagePrePullJob and updates it. Returns the server's representation of the imagePrePullJob, and an error, if there is any. +func (c *imagePrePullJobs) Update(ctx context.Context, imagePrePullJob *v1alpha1.ImagePrePullJob, opts v1.UpdateOptions) (result *v1alpha1.ImagePrePullJob, err error) { + result = &v1alpha1.ImagePrePullJob{} + err = c.client.Put(). + Resource("imageprepulljobs"). + Name(imagePrePullJob.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(imagePrePullJob). + Do(ctx). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *imagePrePullJobs) UpdateStatus(ctx context.Context, imagePrePullJob *v1alpha1.ImagePrePullJob, opts v1.UpdateOptions) (result *v1alpha1.ImagePrePullJob, err error) { + result = &v1alpha1.ImagePrePullJob{} + err = c.client.Put(). + Resource("imageprepulljobs"). + Name(imagePrePullJob.Name). + SubResource("status"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(imagePrePullJob). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the imagePrePullJob and deletes it. Returns an error if one occurs. +func (c *imagePrePullJobs) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + return c.client.Delete(). + Resource("imageprepulljobs"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *imagePrePullJobs) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Resource("imageprepulljobs"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched imagePrePullJob. +func (c *imagePrePullJobs) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.ImagePrePullJob, err error) { + result = &v1alpha1.ImagePrePullJob{} + err = c.client.Patch(pt). + Resource("imageprepulljobs"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/pkg/client/clientset/versioned/typed/operations/v1alpha1/operations_client.go b/pkg/client/clientset/versioned/typed/operations/v1alpha1/operations_client.go index 8d926b3cf..74af5f488 100644 --- a/pkg/client/clientset/versioned/typed/operations/v1alpha1/operations_client.go +++ b/pkg/client/clientset/versioned/typed/operations/v1alpha1/operations_client.go @@ -28,6 +28,7 @@ import ( type OperationsV1alpha1Interface interface { RESTClient() rest.Interface + ImagePrePullJobsGetter NodeUpgradeJobsGetter } @@ -36,6 +37,10 @@ type OperationsV1alpha1Client struct { restClient rest.Interface } +func (c *OperationsV1alpha1Client) ImagePrePullJobs() ImagePrePullJobInterface { + return newImagePrePullJobs(c) +} + func (c *OperationsV1alpha1Client) NodeUpgradeJobs() NodeUpgradeJobInterface { return newNodeUpgradeJobs(c) } diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index fa149a971..f56206105 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -70,6 +70,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Devices().V1beta1().DeviceModels().Informer()}, nil // Group=operations, Version=v1alpha1 + case operationsv1alpha1.SchemeGroupVersion.WithResource("imageprepulljobs"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Operations().V1alpha1().ImagePrePullJobs().Informer()}, nil case operationsv1alpha1.SchemeGroupVersion.WithResource("nodeupgradejobs"): return &genericInformer{resource: resource.GroupResource(), informer: f.Operations().V1alpha1().NodeUpgradeJobs().Informer()}, nil diff --git a/pkg/client/informers/externalversions/operations/v1alpha1/imageprepulljob.go b/pkg/client/informers/externalversions/operations/v1alpha1/imageprepulljob.go new file mode 100644 index 000000000..511d627a6 --- /dev/null +++ b/pkg/client/informers/externalversions/operations/v1alpha1/imageprepulljob.go @@ -0,0 +1,89 @@ +/* +Copyright The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + time "time" + + operationsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + versioned "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" + internalinterfaces "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/kubeedge/kubeedge/pkg/client/listers/operations/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// ImagePrePullJobInformer provides access to a shared informer and lister for +// ImagePrePullJobs. +type ImagePrePullJobInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.ImagePrePullJobLister +} + +type imagePrePullJobInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc +} + +// NewImagePrePullJobInformer constructs a new informer for ImagePrePullJob type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewImagePrePullJobInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredImagePrePullJobInformer(client, resyncPeriod, indexers, nil) +} + +// NewFilteredImagePrePullJobInformer constructs a new informer for ImagePrePullJob type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredImagePrePullJobInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.OperationsV1alpha1().ImagePrePullJobs().List(context.TODO(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.OperationsV1alpha1().ImagePrePullJobs().Watch(context.TODO(), options) + }, + }, + &operationsv1alpha1.ImagePrePullJob{}, + resyncPeriod, + indexers, + ) +} + +func (f *imagePrePullJobInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredImagePrePullJobInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *imagePrePullJobInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&operationsv1alpha1.ImagePrePullJob{}, f.defaultInformer) +} + +func (f *imagePrePullJobInformer) Lister() v1alpha1.ImagePrePullJobLister { + return v1alpha1.NewImagePrePullJobLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/informers/externalversions/operations/v1alpha1/interface.go b/pkg/client/informers/externalversions/operations/v1alpha1/interface.go index 10a3892ab..650ab553e 100644 --- a/pkg/client/informers/externalversions/operations/v1alpha1/interface.go +++ b/pkg/client/informers/externalversions/operations/v1alpha1/interface.go @@ -24,6 +24,8 @@ import ( // Interface provides access to all the informers in this group version. type Interface interface { + // ImagePrePullJobs returns a ImagePrePullJobInformer. + ImagePrePullJobs() ImagePrePullJobInformer // NodeUpgradeJobs returns a NodeUpgradeJobInformer. NodeUpgradeJobs() NodeUpgradeJobInformer } @@ -39,6 +41,11 @@ func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakList return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} } +// ImagePrePullJobs returns a ImagePrePullJobInformer. +func (v *version) ImagePrePullJobs() ImagePrePullJobInformer { + return &imagePrePullJobInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} +} + // NodeUpgradeJobs returns a NodeUpgradeJobInformer. func (v *version) NodeUpgradeJobs() NodeUpgradeJobInformer { return &nodeUpgradeJobInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} diff --git a/pkg/client/listers/operations/v1alpha1/expansion_generated.go b/pkg/client/listers/operations/v1alpha1/expansion_generated.go index 7e83d432d..7ff435cd6 100644 --- a/pkg/client/listers/operations/v1alpha1/expansion_generated.go +++ b/pkg/client/listers/operations/v1alpha1/expansion_generated.go @@ -18,6 +18,10 @@ limitations under the License. package v1alpha1 +// ImagePrePullJobListerExpansion allows custom methods to be added to +// ImagePrePullJobLister. +type ImagePrePullJobListerExpansion interface{} + // NodeUpgradeJobListerExpansion allows custom methods to be added to // NodeUpgradeJobLister. type NodeUpgradeJobListerExpansion interface{} diff --git a/pkg/client/listers/operations/v1alpha1/imageprepulljob.go b/pkg/client/listers/operations/v1alpha1/imageprepulljob.go new file mode 100644 index 000000000..bb20ac4ac --- /dev/null +++ b/pkg/client/listers/operations/v1alpha1/imageprepulljob.go @@ -0,0 +1,68 @@ +/* +Copyright The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// ImagePrePullJobLister helps list ImagePrePullJobs. +// All objects returned here must be treated as read-only. +type ImagePrePullJobLister interface { + // List lists all ImagePrePullJobs in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.ImagePrePullJob, err error) + // Get retrieves the ImagePrePullJob from the index for a given name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1alpha1.ImagePrePullJob, error) + ImagePrePullJobListerExpansion +} + +// imagePrePullJobLister implements the ImagePrePullJobLister interface. +type imagePrePullJobLister struct { + indexer cache.Indexer +} + +// NewImagePrePullJobLister returns a new ImagePrePullJobLister. +func NewImagePrePullJobLister(indexer cache.Indexer) ImagePrePullJobLister { + return &imagePrePullJobLister{indexer: indexer} +} + +// List lists all ImagePrePullJobs in the indexer. +func (s *imagePrePullJobLister) List(selector labels.Selector) (ret []*v1alpha1.ImagePrePullJob, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.ImagePrePullJob)) + }) + return ret, err +} + +// Get retrieves the ImagePrePullJob from the index for a given name. +func (s *imagePrePullJobLister) Get(name string) (*v1alpha1.ImagePrePullJob, error) { + obj, exists, err := s.indexer.GetByKey(name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("imageprepulljob"), name) + } + return obj.(*v1alpha1.ImagePrePullJob), nil +} |
