diff options
| author | KubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com> | 2024-01-17 17:50:27 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-01-17 17:50:27 +0800 |
| commit | dadbeb0e72565822afca48cb3f66a17473d7d585 (patch) | |
| tree | 253eded79262e2f0d83a3da66ae945c9985eec0f | |
| parent | Merge pull request #5321 from luomengY/ci-cri (diff) | |
| parent | support edge nodes upgrade and image pre pull (diff) | |
| download | kubeedge-dadbeb0e72565822afca48cb3f66a17473d7d585.tar.gz | |
Merge pull request #5330 from ZhengXinwei-F/task-manager
Implement task manager to complete cloud edge task execution
68 files changed, 3981 insertions, 2462 deletions
diff --git a/build/crd-samples/operations/imageprepulljob.yaml b/build/crd-samples/operations/imageprepulljob.yaml index cebfbe27f..940a99975 100644 --- a/build/crd-samples/operations/imageprepulljob.yaml +++ b/build/crd-samples/operations/imageprepulljob.yaml @@ -11,4 +11,4 @@ spec: nodes: - edgenode1 # Need to replaced with your own node name timeoutSecondsOnEachNode: 300 - retryTimesOnEachNode: 1
\ No newline at end of file + retryTimes: 1
\ No newline at end of file diff --git a/build/crd-samples/operations/nodeupgradejob.yaml b/build/crd-samples/operations/nodeupgradejob.yaml index 5b97287d7..2f5f730c8 100644 --- a/build/crd-samples/operations/nodeupgradejob.yaml +++ b/build/crd-samples/operations/nodeupgradejob.yaml @@ -5,10 +5,15 @@ metadata: labels: description: upgrade-label spec: - version: "v1.10.0" - timeoutSeconds: 60 + version: "v1.16.0" + checkItems: + - "cpu" + - "mem" + - "disk" + failureTolerate: "0.3" + concurrency: 2 + timeoutSeconds: 180 labelSelector: matchLabels: "node-role.kubernetes.io/edge": "" node-role.kubernetes.io/agent: "" - diff --git a/build/crds/operations/operations_v1alpha1_imageprepulljob.yaml b/build/crds/operations/operations_v1alpha1_imageprepulljob.yaml index 903516d32..a044a1532 100644 --- a/build/crds/operations/operations_v1alpha1_imageprepulljob.yaml +++ b/build/crds/operations/operations_v1alpha1_imageprepulljob.yaml @@ -48,6 +48,16 @@ spec: items: type: string type: array + concurrency: + description: Concurrency specifies the maximum number of edge + nodes that can pull images at the same time. The default Concurrency + value is 1. + format: int32 + type: integer + failureTolerate: + description: FailureTolerate specifies the task tolerance failure + ratio. The default FailureTolerate value is 0.1. + type: string imageSecrets: description: ImageSecret specifies the secret for image pull if private registry used. Use {namespace}/{secretName} in format. @@ -119,10 +129,10 @@ spec: failed on each edgenode. Default to 0 format: int32 type: integer - timeoutSecondsOnEachNode: - description: TimeoutSecondsOnEachNode limits the duration of the - image prepull job on each edgenode. Default to 360. If set to - 0, we'll use the default value 360. + timeoutSeconds: + description: TimeoutSeconds limits the duration of the node prepull + job on each edgenode. Default to 300. If set to 0, we'll use + the default value 300. format: int32 type: integer type: object @@ -130,14 +140,21 @@ spec: status: description: Status represents the status of ImagePrePullJob. properties: + action: + description: 'Action represents for the action of the ImagePrePullJob. + There are two possible action values: Success, Failure.' + type: string + event: + description: 'Event represents for the event of the ImagePrePullJob. + There are four possible event values: Init, Check, Pull, TimeOut.' + type: string + reason: + description: Reason represents for the reason of the ImagePrePullJob. + type: string state: description: 'State represents for the state phase of the ImagePrePullJob. - There are four possible state values: "", prechecking, prepulling, - successful, failed.' - enum: - - prepulling - - successful - - failed + There are five possible state values: "", checking, pulling, successful, + failed.' type: string status: description: Status contains image prepull status for each edge node. @@ -163,31 +180,42 @@ spec: description: 'State represents for the state phase of this image pull on the edge node There are two possible state values: successful, failed.' - enum: - - prepulling - - successful - - failed type: string type: object type: array - nodeName: - description: NodeName is the name of edge node. - type: string - reason: - description: Reason represents the fail reason if images prepull - failed on the edge node - type: string - state: - description: 'State represents for the state phase of the ImagePrepullJob - on the edge node. There are five possible state values: "", - prepulling, successful, failed.' - enum: - - prepulling - - successful - - failed - type: string + nodeStatus: + description: TaskStatus represents the status for each node + properties: + action: + description: 'Action represents for the action of the ImagePrePullJob. + There are three possible action values: Success, Failure, + TimeOut.' + type: string + event: + description: 'Event represents for the event of the ImagePrePullJob. + There are three possible event values: Init, Check, Pull.' + type: string + nodeName: + description: NodeName is the name of edge node. + type: string + reason: + description: Reason represents for the reason of the ImagePrePullJob. + type: string + state: + description: 'State represents for the upgrade state phase + of the edge node. There are several possible state values: + "", Upgrading, BackingUp, RollingBack and Checking.' + type: string + time: + description: Time represents for the running time of the + ImagePrePullJob. + type: string + type: object type: object type: array + time: + description: Time represents for the running time of the ImagePrePullJob. + type: string type: object required: - spec diff --git a/build/crds/operations/operations_v1alpha1_nodeupgradejob.yaml b/build/crds/operations/operations_v1alpha1_nodeupgradejob.yaml index 130df65ac..958272424 100644 --- a/build/crds/operations/operations_v1alpha1_nodeupgradejob.yaml +++ b/build/crds/operations/operations_v1alpha1_nodeupgradejob.yaml @@ -36,12 +36,22 @@ spec: spec: description: Specification of the desired behavior of NodeUpgradeJob. properties: + checkItems: + description: CheckItems specifies the items need to be checked before + the task is executed. The default CheckItems value is nil. + items: + type: string + type: array concurrency: description: Concurrency specifies the max number of edge nodes that can be upgraded at the same time. The default Concurrency value is 1. format: int32 type: integer + failureTolerate: + description: FailureTolerate specifies the task tolerance failure + ratio. The default FailureTolerate value is 0.1. + type: string image: description: 'Image specifies a container image name, the image contains: keadm and edgecore. keadm is used as upgradetool, to install the @@ -111,75 +121,71 @@ spec: job. Default to 300. If set to 0, we'll use the default value 300. format: int32 type: integer - upgradeTool: - description: UpgradeTool is a request to decide use which upgrade - tool. If it is empty, the upgrade job simply use default upgrade - tool keadm to do upgrade operation. - type: string version: type: string type: object status: description: Most recently observed status of the NodeUpgradeJob. properties: - state: - description: 'State represents for the state phase of the NodeUpgradeJob. - There are three possible state values: "", upgrading and completed.' - enum: - - upgrading - - completed + action: + description: 'Action represents for the action of the ImagePrePullJob. + There are two possible action values: Success, Failure.' type: string - status: + currentVersion: + description: CurrentVersion represents for the current status of the + EdgeCore. + type: string + event: + description: 'Event represents for the event of the ImagePrePullJob. + There are six possible event values: Init, Check, BackUp, Upgrade, + TimeOut, Rollback.' + type: string + historicVersion: + description: HistoricVersion represents for the historic status of + the EdgeCore. + type: string + nodeStatus: description: Status contains upgrade Status for each edge node. items: - description: UpgradeStatus stores the status of Upgrade for each - edge node. + description: TaskStatus stores the status of Upgrade for each edge + node. properties: - history: - description: History is the last upgrade result of the edge - node. - properties: - fromVersion: - description: FromVersion is the version which the edge node - is upgraded from. - type: string - historyID: - description: HistoryID is to uniquely identify an Upgrade - Operation. - type: string - reason: - description: Reason is the error reason of Upgrade failure. - If the upgrade is successful, this reason is an empty - string. - type: string - result: - description: Result represents the result of upgrade. - enum: - - upgrade_success - - upgrade_failed_rollback_success - - upgrade_failed_rollback_failed - type: string - toVersion: - description: ToVersion is the version which the edge node - is upgraded to. - type: string - upgradeTime: - description: UpgradeTime is the time of this Upgrade. - type: string - type: object + action: + description: 'Action represents for the action of the ImagePrePullJob. + There are three possible action values: Success, Failure, + TimeOut.' + type: string + event: + description: 'Event represents for the event of the ImagePrePullJob. + There are three possible event values: Init, Check, Pull.' + type: string nodeName: description: NodeName is the name of edge node. type: string + reason: + description: Reason represents for the reason of the ImagePrePullJob. + type: string state: description: 'State represents for the upgrade state phase of - the edge node. There are three possible state values: "", - upgrading and completed.' - enum: - - upgrading - - completed + the edge node. There are several possible state values: "", + Upgrading, BackingUp, RollingBack and Checking.' + type: string + time: + description: Time represents for the running time of the ImagePrePullJob. type: string type: object type: array + reason: + description: Reason represents for the reason of the ImagePrePullJob. + type: string + state: + description: 'State represents for the state phase of the NodeUpgradeJob. + There are several possible state values: "", Upgrading, BackingUp, + RollingBack and Checking.' + type: string + time: + description: Time represents for the running time of the ImagePrePullJob. + type: string type: object type: object served: true diff --git a/cloud/cmd/cloudcore/app/server.go b/cloud/cmd/cloudcore/app/server.go index fd346cd72..c4a7a2388 100644 --- a/cloud/cmd/cloudcore/app/server.go +++ b/cloud/cmd/cloudcore/app/server.go @@ -48,11 +48,10 @@ import ( "github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller" "github.com/kubeedge/kubeedge/cloud/pkg/dynamiccontroller" "github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller" - "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller" - "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller" "github.com/kubeedge/kubeedge/cloud/pkg/policycontroller" "github.com/kubeedge/kubeedge/cloud/pkg/router" "github.com/kubeedge/kubeedge/cloud/pkg/synccontroller" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager" "github.com/kubeedge/kubeedge/common/constants" "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1" "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1/validation" @@ -159,8 +158,7 @@ func registerModules(c *v1alpha1.CloudCoreConfig) { cloudhub.Register(c.Modules.CloudHub) edgecontroller.Register(c.Modules.EdgeController) devicecontroller.Register(c.Modules.DeviceController) - imageprepullcontroller.Register(c.Modules.ImagePrePullController) - nodeupgradejobcontroller.Register(c.Modules.NodeUpgradeJobController) + taskmanager.Register(c.Modules.TaskManager) synccontroller.Register(c.Modules.SyncController) cloudstream.Register(c.Modules.CloudStream, c.CommonConfig) router.Register(c.Modules.Router) diff --git a/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go b/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go index a236d3ebd..18382f13b 100644 --- a/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go +++ b/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go @@ -33,8 +33,8 @@ import ( "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/session" "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer" "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" - imageprepullcontroller "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/controller" "github.com/kubeedge/kubeedge/cloud/pkg/synccontroller" + taskutil "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util" commonconst "github.com/kubeedge/kubeedge/common/constants" v2 "github.com/kubeedge/kubeedge/edge/pkg/metamanager/dao/v2" "github.com/kubeedge/kubeedge/pkg/apis/reliablesyncs/v1alpha1" @@ -184,8 +184,9 @@ func (md *messageDispatcher) DispatchUpstream(message *beehivemodel.Message, inf message.Router.Resource = fmt.Sprintf("node/%s/%s", info.NodeID, message.Router.Resource) beehivecontext.Send(modules.RouterModuleName, *message) - case message.GetOperation() == imageprepullcontroller.ImagePrePull: - beehivecontext.SendToGroup(modules.ImagePrePullControllerModuleGroup, *message) + case message.GetOperation() == taskutil.TaskPrePull || + message.GetOperation() == taskutil.TaskUpgrade: + beehivecontext.SendToGroup(modules.TaskManagerModuleGroup, *message) default: err := md.PubToController(info, message) @@ -451,7 +452,9 @@ func noAckRequired(msg *beehivemodel.Message) bool { return true case msg.GetGroup() == modules.UserGroup: return true - case msg.GetSource() == modules.NodeUpgradeJobControllerModuleName || msg.GetSource() == modules.ImagePrePullControllerModuleName: + case msg.GetSource() == modules.TaskManagerModuleName: + return true + case msg.GetSource() == modules.NodeUpgradeJobControllerModuleName: return true case msg.GetOperation() == beehivemodel.ResponseOperation: content, ok := msg.Content.(string) diff --git a/cloud/pkg/cloudhub/servers/httpserver/report_task_status.go b/cloud/pkg/cloudhub/servers/httpserver/report_task_status.go new file mode 100644 index 000000000..784470502 --- /dev/null +++ b/cloud/pkg/cloudhub/servers/httpserver/report_task_status.go @@ -0,0 +1,80 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package httpserver + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/emicklei/go-restful" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/klog/v2" + + beehiveContext "github.com/kubeedge/beehive/pkg/core/context" + beehiveModel "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + "github.com/kubeedge/kubeedge/common/types" +) + +const ( + millionByte = int64(3 * 1024 * 1024) +) + +// reportTaskStatus report the status of task +func reportTaskStatus(request *restful.Request, response *restful.Response) { + resp := types.NodeTaskResponse{} + taskID := request.PathParameter("taskID") + taskType := request.PathParameter("taskType") + nodeID := request.PathParameter("nodeID") + + lr := &io.LimitedReader{ + R: request.Request.Body, + N: millionByte + 1, + } + body, err := io.ReadAll(lr) + if err != nil { + err = response.WriteError(http.StatusBadRequest, fmt.Errorf("failed to get req body: %v", err)) + if err != nil { + klog.Warning(err.Error()) + } + return + } + if lr.N <= 0 { + err = response.WriteError(http.StatusBadRequest, errors.NewRequestEntityTooLargeError("the request body can only be up to 1MB in size")) + if err != nil { + klog.Warning(err.Error()) + } + return + } + if err = json.Unmarshal(body, &resp); err != nil { + err = response.WriteError(http.StatusBadRequest, fmt.Errorf("failed to unmarshal task info: %v", err)) + if err != nil { + klog.Warning(err.Error()) + } + return + } + + msg := beehiveModel.NewMessage("").SetRoute(modules.CloudHubModuleName, modules.CloudHubModuleGroup). + SetResourceOperation(fmt.Sprintf("task/%s/node/%s", taskID, nodeID), taskType).FillBody(resp) + beehiveContext.Send(modules.TaskManagerModuleName, *msg) + + if _, err = response.Write([]byte("ok")); err != nil { + klog.Errorf("failed to send the task resp to edge , err: %v", err) + } +} diff --git a/cloud/pkg/cloudhub/servers/httpserver/server.go b/cloud/pkg/cloudhub/servers/httpserver/server.go index 1ae061770..0b8e61433 100644 --- a/cloud/pkg/cloudhub/servers/httpserver/server.go +++ b/cloud/pkg/cloudhub/servers/httpserver/server.go @@ -47,6 +47,7 @@ func StartHTTPServer() { ws.Route(ws.GET(constants.DefaultCertURL).To(edgeCoreClientCert)) ws.Route(ws.GET(constants.DefaultCAURL).To(getCA)) ws.Route(ws.POST(constants.DefaultNodeUpgradeURL).To(upgradeEdge)) + ws.Route(ws.POST(constants.DefaultTaskStateReportURL).To(reportTaskStatus)) serverContainer.Add(ws) addr := fmt.Sprintf("%s:%d", hubconfig.Config.HTTPS.Address, hubconfig.Config.HTTPS.Port) diff --git a/cloud/pkg/cloudhub/servers/httpserver/upgrade.go b/cloud/pkg/cloudhub/servers/httpserver/upgrade.go index c8b8eb65f..2dc343714 100644 --- a/cloud/pkg/cloudhub/servers/httpserver/upgrade.go +++ b/cloud/pkg/cloudhub/servers/httpserver/upgrade.go @@ -17,6 +17,7 @@ import ( "encoding/json" "fmt" "io" + "net/http" "github.com/emicklei/go-restful" "k8s.io/apimachinery/pkg/api/errors" @@ -25,40 +26,73 @@ import ( beehiveContext "github.com/kubeedge/beehive/pkg/core/context" beehiveModel "github.com/kubeedge/beehive/pkg/core/model" "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" - "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller/controller" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util" commontypes "github.com/kubeedge/kubeedge/common/types" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" +) + +const ( + UpgradeSuccess = "upgrade_success" + UpgradeFailedRollbackSuccess = "upgrade_failed_rollback_success" + UpgradeFailedRollbackFailed = "upgrade_failed_rollback_failed" ) // upgradeEdge upgrade the edgecore version func upgradeEdge(request *restful.Request, response *restful.Response) { resp := commontypes.NodeUpgradeJobResponse{} - defer func() { - if _, err := response.Write([]byte("ok")); err != nil { - klog.Errorf("failed to send upgrade edge resp, err: %v", err) - } - }() + taskID := resp.UpgradeID + taskType := util.TaskUpgrade + nodeID := resp.NodeName - limit := int64(3 * 1024 * 1024) lr := &io.LimitedReader{ R: request.Request.Body, - N: limit + 1, + N: millionByte + 1, } body, err := io.ReadAll(lr) if err != nil { - klog.Errorf("failed to get req body: %v", err) + err = response.WriteError(http.StatusBadRequest, fmt.Errorf("failed to get req body: %v", err)) + if err != nil { + klog.Warning(err.Error()) + } return } if lr.N <= 0 { - klog.Errorf("%v", errors.NewRequestEntityTooLargeError(fmt.Sprintf("limit is %d", limit))) + err = response.WriteError(http.StatusBadRequest, errors.NewRequestEntityTooLargeError("the request body can only be up to 1MB in size")) + if err != nil { + klog.Warning(err.Error()) + } return } - if err := json.Unmarshal(body, &resp); err != nil { - klog.Errorf("failed to marshal upgrade info: %v", err) + if err = json.Unmarshal(body, &resp); err != nil { + err = response.WriteError(http.StatusBadRequest, fmt.Errorf("failed to marshal task info: %v", err)) + if err != nil { + klog.Warning(err.Error()) + } return } + newResp := commontypes.NodeTaskResponse{ + NodeName: resp.NodeName, + Event: "Upgrade", + Action: api.ActionSuccess, + Reason: resp.Reason, + } - msg := beehiveModel.NewMessage("").SetRoute(modules.CloudHubModuleName, modules.CloudHubModuleName). - SetResourceOperation(fmt.Sprintf("%s/%s/node/%s", controller.NodeUpgrade, resp.UpgradeID, resp.NodeName), controller.NodeUpgrade).FillBody(resp) - beehiveContext.Send(modules.NodeUpgradeJobControllerModuleName, *msg) + if resp.Status == UpgradeFailedRollbackSuccess { + newResp.Event = "Rollback" + newResp.Action = api.ActionFailure + } + + if resp.Status == UpgradeFailedRollbackFailed { + newResp.Event = "Rollback" + newResp.Action = api.ActionSuccess + } + + msg := beehiveModel.NewMessage("").SetRoute(modules.CloudHubModuleName, modules.CloudHubModuleGroup). + SetResourceOperation(fmt.Sprintf("task/%s/node/%s", taskID, nodeID), taskType).FillBody(newResp) + beehiveContext.Send(modules.TaskManagerModuleName, *msg) + + if _, err = response.Write([]byte("ok")); err != nil { + klog.Errorf("failed to send the task resp to edge , err: %v", err) + } } diff --git a/cloud/pkg/common/messagelayer/context.go b/cloud/pkg/common/messagelayer/context.go index 8a2e9d4bd..58bee37a7 100644 --- a/cloud/pkg/common/messagelayer/context.go +++ b/cloud/pkg/common/messagelayer/context.go @@ -96,18 +96,10 @@ func DynamicControllerMessageLayer() MessageLayer { } } -func NodeUpgradeJobControllerMessageLayer() MessageLayer { +func TaskManagerMessageLayer() MessageLayer { return &ContextMessageLayer{ SendModuleName: modules.CloudHubModuleName, - ReceiveModuleName: modules.NodeUpgradeJobControllerModuleName, - ResponseModuleName: modules.CloudHubModuleName, - } -} - -func ImagePrePullControllerMessageLayer() MessageLayer { - return &ContextMessageLayer{ - SendModuleName: modules.CloudHubModuleName, - ReceiveModuleName: modules.ImagePrePullControllerModuleName, + ReceiveModuleName: modules.TaskManagerModuleName, ResponseModuleName: modules.CloudHubModuleName, } } diff --git a/cloud/pkg/common/modules/modules.go b/cloud/pkg/common/modules/modules.go index f458aee5e..12fa9927e 100644 --- a/cloud/pkg/common/modules/modules.go +++ b/cloud/pkg/common/modules/modules.go @@ -16,8 +16,8 @@ const ( NodeUpgradeJobControllerModuleName = "nodeupgradejobcontroller" NodeUpgradeJobControllerModuleGroup = "nodeupgradejobcontroller" - ImagePrePullControllerModuleName = "imageprepullcontroller" - ImagePrePullControllerModuleGroup = "imageprepullcontroller" + TaskManagerModuleName = "taskmanager" + TaskManagerModuleGroup = "taskmanager" SyncControllerModuleName = "synccontroller" SyncControllerModuleGroup = "synccontroller" diff --git a/cloud/pkg/imageprepullcontroller/controller/downstream.go b/cloud/pkg/imageprepullcontroller/controller/downstream.go deleted file mode 100644 index d82c58413..000000000 --- a/cloud/pkg/imageprepullcontroller/controller/downstream.go +++ /dev/null @@ -1,268 +0,0 @@ -/* -Copyright 2023 The KubeEdge Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "fmt" - "time" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - k8sinformer "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" - - beehiveContext "github.com/kubeedge/beehive/pkg/core/context" - "github.com/kubeedge/beehive/pkg/core/model" - "github.com/kubeedge/kubeedge/cloud/pkg/common/client" - "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" - "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer" - "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" - "github.com/kubeedge/kubeedge/cloud/pkg/common/util" - "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/manager" - commontypes "github.com/kubeedge/kubeedge/common/types" - "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" - crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" - crdinformers "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions" -) - -type DownstreamController struct { - kubeClient kubernetes.Interface - informer k8sinformer.SharedInformerFactory - crdClient crdClientset.Interface - messageLayer messagelayer.MessageLayer - - imagePrePullJobManager *manager.ImagePrePullJobManager -} - -// Start DownstreamController -func (dc *DownstreamController) Start() error { - klog.Info("Start ImagePrePullJob Downstream Controller") - go dc.syncImagePrePullJob() - return nil -} - -// syncImagePrePullJob is used to get events from informer -func (dc *DownstreamController) syncImagePrePullJob() { - for { - select { - case <-beehiveContext.Done(): - klog.Info("stop sync ImagePrePullJob") - return - case e := <-dc.imagePrePullJobManager.Events(): - imagePrePull, ok := e.Object.(*v1alpha1.ImagePrePullJob) - if !ok { - klog.Warningf("object type: %T unsupported", e.Object) - continue - } - switch e.Type { - case watch.Added: - dc.imagePrePullJobAdded(imagePrePull) - case watch.Deleted: - dc.imagePrePullJobDeleted(imagePrePull) - case watch.Modified: - dc.imagePrePullJobUpdate(imagePrePull) - default: - klog.Warningf("ImagePrePullJob event type: %s unsupported", e.Type) - } - } - } -} - -// imagePrePullJobAdded is used to process addition of new ImagePrePullJob in apiserver -func (dc *DownstreamController) imagePrePullJobAdded(imagePrePull *v1alpha1.ImagePrePullJob) { - klog.V(4).Infof("add ImagePrePullJob: %v", imagePrePull) - // store in cache map - dc.imagePrePullJobManager.ImagePrePullMap.Store(imagePrePull.Name, imagePrePull) - - // If imagePrePullJob is not initial state, we don't need to send message\ - if imagePrePull.Status.State != v1alpha1.PrePullInitialValue { - klog.Errorf("The ImagePrePullJob %s is already running or completed, don't send message again", imagePrePull.Name) - return - } - - // get node list that need prepull images - var nodesToPrePullImage []string - if len(imagePrePull.Spec.ImagePrePullTemplate.NodeNames) != 0 { - for _, node := range imagePrePull.Spec.ImagePrePullTemplate.NodeNames { - nodeInfo, err := dc.informer.Core().V1().Nodes().Lister().Get(node) - if err != nil { - klog.Errorf("Failed to get node(%s) info: %v", node, err) - continue - } - - if validateNode(nodeInfo) { - nodesToPrePullImage = append(nodesToPrePullImage, nodeInfo.Name) - } - } - } else if imagePrePull.Spec.ImagePrePullTemplate.LabelSelector != nil { - selector, err := metav1.LabelSelectorAsSelector(imagePrePull.Spec.ImagePrePullTemplate.LabelSelector) - if err != nil { - klog.Errorf("LabelSelector(%s) is not valid: %v", imagePrePull.Spec.ImagePrePullTemplate.LabelSelector, err) - return - } - - nodes, err := dc.informer.Core().V1().Nodes().Lister().List(selector) - if err != nil { - klog.Errorf("Failed to get nodes with label %s: %v", selector.String(), err) - return - } - - for _, node := range nodes { - if validateNode(node) { - nodesToPrePullImage = append(nodesToPrePullImage, node.Name) - } - } - } - - // deduplicate: remove duplicate nodes to avoid repeating prepull images on the same node - nodesToPrePullImage = util.RemoveDuplicateElement(nodesToPrePullImage) - - klog.Infof("Filtered finished, images will be prepulled on below nodes\n%v\n", nodesToPrePullImage) - - go func() { - for _, node := range nodesToPrePullImage { - dc.processPrePull(node, imagePrePull) - } - }() -} - -// imagePrePullJobDeleted is used to process deleted ImagePrePullJob in apiServer -func (dc *DownstreamController) imagePrePullJobDeleted(imagePrePull *v1alpha1.ImagePrePullJob) { - // delete drom cache map - dc.imagePrePullJobManager.ImagePrePullMap.Delete(imagePrePull.Name) -} - -// imagePrePullJobUpdate is used to process update of ImagePrePullJob in apiServer -// Now we don't allow update spec, so we only update the cache map in imagePrePullJobUpdate func. -func (dc *DownstreamController) imagePrePullJobUpdate(imagePrePull *v1alpha1.ImagePrePullJob) { - _, ok := dc.imagePrePullJobManager.ImagePrePullMap.Load(imagePrePull.Name) - // store in cache map - dc.imagePrePullJobManager.ImagePrePullMap.Store(imagePrePull.Name, imagePrePull) - if !ok { - klog.Infof("ImagePrePull Job %s not exist, and store it into first", imagePrePull.Name) - // If ImagePrePullJob not present in ImagePrePull map means it is not modified and added. - dc.imagePrePullJobAdded(imagePrePull) - } -} - -// processPrePull process prepull job added and send it to edge nodes. -func (dc *DownstreamController) processPrePull(node string, imagePrePull *v1alpha1.ImagePrePullJob) { - klog.V(4).Infof("begin to send imagePrePull message to %s", node) - - imagePrePullTemplateInfo := imagePrePull.Spec.ImagePrePullTemplate - imagePrePullRequest := commontypes.ImagePrePullJobRequest{ - Images: imagePrePullTemplateInfo.Images, - NodeName: node, - Secret: imagePrePullTemplateInfo.ImageSecret, - RetryTimes: imagePrePullTemplateInfo.RetryTimes, - CheckItems: imagePrePullTemplateInfo.CheckItems, - } - - // handle timeout: could not receive image prepull msg feedback from edge node - // send prepull timeout response message to upstream - go dc.handleImagePrePullJobTimeoutOnEachNode(node, imagePrePull.Name, imagePrePullTemplateInfo.TimeoutSecondsOnEachNode) - - // send prepull msg to edge node - msg := model.NewMessage("") - resource := buildPrePullResource(imagePrePull.Name, node) - msg.BuildRouter(modules.ImagePrePullControllerModuleName, modules.ImagePrePullControllerModuleGroup, resource, ImagePrePull). - FillBody(imagePrePullRequest) - - err := dc.messageLayer.Send(*msg) - if err != nil { - klog.Errorf("Failed to send prepull message %s due to error %v", msg.GetID(), err) - return - } - - // update imagePrePullJob status to prepulling - status := &v1alpha1.ImagePrePullStatus{ - NodeName: node, - State: v1alpha1.PrePulling, - } - err = patchImagePrePullStatus(dc.crdClient, imagePrePull, status) - if err != nil { - klog.Errorf("Failed to mark imagePrePullJob prepulling status: %v", err) - } -} - -// handleImagePrePullJobTimeoutOnEachNode is used to handle the situation that cloud don't receive prepull result -// from edge node within the timeout period. -// If so, the ImagePrePullJobState will update to timeout -func (dc *DownstreamController) handleImagePrePullJobTimeoutOnEachNode(node, jobName string, timeoutSecondsEachNode *uint32) { - var timeout uint32 = 360 - if timeoutSecondsEachNode != nil && *timeoutSecondsEachNode != 0 { - timeout = *timeoutSecondsEachNode - } - - receiveFeedback := false - - _ = wait.Poll(10*time.Second, time.Duration(timeout)*time.Second, func() (bool, error) { - cacheValue, ok := dc.imagePrePullJobManager.ImagePrePullMap.Load(jobName) - if !ok { - receiveFeedback = true - klog.Errorf("ImagePrePullJob %s is not exist", jobName) - return false, fmt.Errorf("imagePrePullJob %s is not exist", jobName) - } - imagePrePullValue := cacheValue.(*v1alpha1.ImagePrePullJob) - for _, statusValue := range imagePrePullValue.Status.Status { - if statusValue.NodeName == node && (statusValue.State == v1alpha1.PrePullSuccessful || statusValue.State == v1alpha1.PrePullFailed) { - receiveFeedback = true - return true, nil - } - break - } - return false, nil - }) - - if receiveFeedback { - return - } - klog.Errorf("TIMEOUT to receive image prepull %s response from edge node %s", jobName, node) - - // construct timeout image prepull response and send it to upstream controller - responseResource := buildPrePullResource(jobName, node) - resp := commontypes.ImagePrePullJobResponse{ - NodeName: node, - State: v1alpha1.PrePullFailed, - Reason: "timeout to receive response from edge", - } - - respMsg := model.NewMessage(""). - BuildRouter(modules.ImagePrePullControllerModuleName, modules.ImagePrePullControllerModuleGroup, responseResource, ImagePrePull). - FillBody(resp) - beehiveContext.Send(modules.ImagePrePullControllerModuleName, *respMsg) -} - -// NewDownstreamController new downstream controller to process downstream imageprepull msg to edge nodes. -func NewDownstreamController(crdInformerFactory crdinformers.SharedInformerFactory) (*DownstreamController, error) { - imagePrePullJobManager, err := manager.NewImagePrePullJobManager(crdInformerFactory.Operations().V1alpha1().ImagePrePullJobs().Informer()) - if err != nil { - klog.Warningf("Create ImagePrePullJob manager failed with error: %s", err) - return nil, err - } - - dc := &DownstreamController{ - kubeClient: client.GetKubeClient(), - informer: informers.GetInformersManager().GetKubeInformerFactory(), - crdClient: client.GetCRDClient(), - imagePrePullJobManager: imagePrePullJobManager, - messageLayer: messagelayer.ImagePrePullControllerMessageLayer(), - } - return dc, nil -} diff --git a/cloud/pkg/imageprepullcontroller/controller/util.go b/cloud/pkg/imageprepullcontroller/controller/util.go deleted file mode 100644 index 592164833..000000000 --- a/cloud/pkg/imageprepullcontroller/controller/util.go +++ /dev/null @@ -1,131 +0,0 @@ -/* -Copyright 2023 The KubeEdge Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "context" - "encoding/json" - "fmt" - "strings" - - jsonpatch "github.com/evanphx/json-patch" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - apimachineryType "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" - - "github.com/kubeedge/kubeedge/cloud/pkg/common/util" - "github.com/kubeedge/kubeedge/common/constants" - "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" - crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" -) - -const ImagePrePull = "prepull" - -func validateNode(node *v1.Node) bool { - if !util.IsEdgeNode(node) { - klog.Warningf("Node(%s) is not edge node", node.Name) - return false - } - - // if node is in NotReady state, cannot prepull images - for _, condition := range node.Status.Conditions { - if condition.Type == v1.NodeReady && condition.Status != v1.ConditionTrue { - klog.Warningf("Node(%s) is in NotReady state", node.Name) - return false - } - } - - return true -} - -// buildPrePullResource build prepull resource in msg send to edge node -func buildPrePullResource(imagePrePullName, nodeName string) string { - resource := fmt.Sprintf("%s/%s/%s/%s", "node", nodeName, ImagePrePull, imagePrePullName) - return resource -} - -func parsePrePullresource(resource string) (string, string, error) { - var nodeName, jobName string - sli := strings.Split(resource, constants.ResourceSep) - if len(sli) != 4 { - return nodeName, jobName, fmt.Errorf("the resource %s is not the standard type", resource) - } - return sli[1], sli[3], nil -} - -func patchImagePrePullStatus(crdClient crdClientset.Interface, imagePrePull *v1alpha1.ImagePrePullJob, status *v1alpha1.ImagePrePullStatus) error { - oldValue := imagePrePull.DeepCopy() - newValue := updateNodeImagePrePullStatus(oldValue, status) - - var completeFlag int - var failedFlag bool - newValue.Status.State = v1alpha1.PrePulling - for _, statusValue := range newValue.Status.Status { - if statusValue.State == v1alpha1.PrePullFailed { - failedFlag = true - completeFlag++ - } - if statusValue.State == v1alpha1.PrePullSuccessful { - completeFlag++ - } - } - if completeFlag == len(newValue.Status.Status) { - if failedFlag { - newValue.Status.State = v1alpha1.PrePullFailed - } else { - newValue.Status.State = v1alpha1.PrePullSuccessful - } - } - - oldData, err := json.Marshal(oldValue) - if err != nil { - return fmt.Errorf("failed to marshal the old ImagePrePullJob(%s): %v", oldValue.Name, err) - } - - newData, err := json.Marshal(newValue) - if err != nil { - return fmt.Errorf("failed to marshal the new ImagePrePullJob(%s): %v", newValue.Name, err) - } - - patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) - if err != nil { - return fmt.Errorf("failed to create a merge patch: %v", err) - } - - _, err = crdClient.OperationsV1alpha1().ImagePrePullJobs().Patch(context.TODO(), newValue.Name, apimachineryType.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") - if err != nil { - return fmt.Errorf("failed to patch update ImagePrePullJob status: %v", err) - } - - return nil -} - -func updateNodeImagePrePullStatus(imagePrePull *v1alpha1.ImagePrePullJob, status *v1alpha1.ImagePrePullStatus) *v1alpha1.ImagePrePullJob { - // return value imageprepull cannot populate the input parameter old - newValue := imagePrePull.DeepCopy() - - for index, nodeStatus := range newValue.Status.Status { - if nodeStatus.NodeName == status.NodeName { - newValue.Status.Status[index] = *status - return newValue - } - } - - newValue.Status.Status = append(newValue.Status.Status, *status) - return newValue -} diff --git a/cloud/pkg/imageprepullcontroller/imageprepullcontroller.go b/cloud/pkg/imageprepullcontroller/imageprepullcontroller.go deleted file mode 100644 index 4f772d258..000000000 --- a/cloud/pkg/imageprepullcontroller/imageprepullcontroller.go +++ /dev/null @@ -1,91 +0,0 @@ -/* -Copyright 2023 The KubeEdge Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package imageprepullcontroller - -import ( - "time" - - "k8s.io/klog/v2" - - "github.com/kubeedge/beehive/pkg/core" - "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" - "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" - "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/config" - "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/controller" - "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1" -) - -// ImagePrePullController is controller for processing prepull images on edge nodes -type ImagePrePullController struct { - downstream *controller.DownstreamController - upstream *controller.UpstreamController - enable bool -} - -var _ core.Module = (*ImagePrePullController)(nil) - -func newImagePrePullController(enable bool) *ImagePrePullController { - if !enable { - return &ImagePrePullController{enable: enable} - } - downstream, err := controller.NewDownstreamController(informers.GetInformersManager().GetKubeEdgeInformerFactory()) - if err != nil { - klog.Exitf("New ImagePrePull Controller downstream failed with error: %s", err) - } - upstream, err := controller.NewUpstreamController(downstream) - if err != nil { - klog.Exitf("New ImagePrePull Controller upstream failed with error: %s", err) - } - return &ImagePrePullController{ - downstream: downstream, - upstream: upstream, - enable: enable, - } -} - -func Register(dc *v1alpha1.ImagePrePullController) { - config.InitConfigure(dc) - core.Register(newImagePrePullController(dc.Enable)) -} - -// Name of controller -func (uc *ImagePrePullController) Name() string { - return modules.ImagePrePullControllerModuleName -} - -// Group of controller -func (uc *ImagePrePullController) Group() string { - return modules.ImagePrePullControllerModuleGroup -} - -// Enable indicates whether enable this module -func (uc *ImagePrePullController) Enable() bool { - return uc.enable -} - -// Start controller -func (uc *ImagePrePullController) Start() { - if err := uc.downstream.Start(); err != nil { - klog.Exitf("start ImagePrePullJob controller downstream failed with error: %s", err) - } - // wait for downstream controller to start and load ImagePrePullJob - // TODO think about sync - time.Sleep(1 * time.Second) - if err := uc.upstream.Start(); err != nil { - klog.Exitf("start ImagePrePullJob controller upstream failed with error: %s", err) - } -} diff --git a/cloud/pkg/imageprepullcontroller/manager/common.go b/cloud/pkg/imageprepullcontroller/manager/common.go deleted file mode 100644 index 9f371875a..000000000 --- a/cloud/pkg/imageprepullcontroller/manager/common.go +++ /dev/null @@ -1,62 +0,0 @@ -/* -Copyright 2023 The KubeEdge Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package manager - -import ( - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/klog/v2" -) - -// Manager define the interface of a Manager, ImagePrePullJob Manager implement it -type Manager interface { - Events() chan watch.Event -} - -// CommonResourceEventHandler can be used by ImagePrePullJob Manager -type CommonResourceEventHandler struct { - events chan watch.Event -} - -func (c *CommonResourceEventHandler) obj2Event(t watch.EventType, obj interface{}) { - eventObj, ok := obj.(runtime.Object) - if !ok { - klog.Warningf("unknown type: %T, ignore", obj) - return - } - c.events <- watch.Event{Type: t, Object: eventObj} -} - -// OnAdd handle Add event -func (c *CommonResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool) { - c.obj2Event(watch.Added, obj) -} - -// OnUpdate handle Update event -func (c *CommonResourceEventHandler) OnUpdate(oldObj, newObj interface{}) { - c.obj2Event(watch.Modified, newObj) -} - -// OnDelete handle Delete event -func (c *CommonResourceEventHandler) OnDelete(obj interface{}) { - c.obj2Event(watch.Deleted, obj) -} - -// NewCommonResourceEventHandler create CommonResourceEventHandler used by ImagePrePullJob Manager -func NewCommonResourceEventHandler(events chan watch.Event) *CommonResourceEventHandler { - return &CommonResourceEventHandler{events: events} -} diff --git a/cloud/pkg/imageprepullcontroller/manager/imageprepull.go b/cloud/pkg/imageprepullcontroller/manager/imageprepull.go deleted file mode 100644 index cdb2eaccc..000000000 --- a/cloud/pkg/imageprepullcontroller/manager/imageprepull.go +++ /dev/null @@ -1,52 +0,0 @@ -/* -Copyright 2023 The KubeEdge Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package manager - -import ( - "sync" - - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" - - "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/config" -) - -// ImagePrePullJobManager is a manager watch ImagePrePullJob change event -type ImagePrePullJobManager struct { - // events from watch kubernetes api server - events chan watch.Event - - // ImagePrePullMap, key is ImagePrePullJob.Name, value is *v1alpha1.ImagePrePullJob{} - ImagePrePullMap sync.Map -} - -// Events return a channel, can receive all ImagePrePullJob event -func (dmm *ImagePrePullJobManager) Events() chan watch.Event { - return dmm.events -} - -// NewImagePrePullJobManager create ImagePrePullJobManager from config -func NewImagePrePullJobManager(si cache.SharedIndexInformer) (*ImagePrePullJobManager, error) { - events := make(chan watch.Event, config.Config.Buffer.ImagePrePullJobEvent) - rh := NewCommonResourceEventHandler(events) - _, err := si.AddEventHandler(rh) - if err != nil { - return nil, err - } - - return &ImagePrePullJobManager{events: events}, nil -} diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go b/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go deleted file mode 100644 index 6efee8012..000000000 --- a/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go +++ /dev/null @@ -1,447 +0,0 @@ -/* -Copyright 2022 The KubeEdge Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "context" - "encoding/json" - "fmt" - "time" - - "github.com/google/uuid" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - apimachineryType "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - k8sinformer "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" - - beehiveContext "github.com/kubeedge/beehive/pkg/core/context" - "github.com/kubeedge/beehive/pkg/core/model" - "github.com/kubeedge/kubeedge/cloud/pkg/common/client" - "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" - "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer" - "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" - "github.com/kubeedge/kubeedge/cloud/pkg/common/util" - "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller/manager" - "github.com/kubeedge/kubeedge/common/constants" - commontypes "github.com/kubeedge/kubeedge/common/types" - "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" - crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" - crdinformers "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions" -) - -type DownstreamController struct { - kubeClient kubernetes.Interface - informer k8sinformer.SharedInformerFactory - crdClient crdClientset.Interface - messageLayer messagelayer.MessageLayer - - nodeUpgradeJobManager *manager.NodeUpgradeJobManager -} - -// Start DownstreamController -func (dc *DownstreamController) Start() error { - klog.Info("Start NodeUpgradeJob Downstream Controller") - - go dc.syncNodeUpgradeJob() - - return nil -} - -// syncNodeUpgradeJob is used to get events from informer -func (dc *DownstreamController) syncNodeUpgradeJob() { - for { - select { - case <-beehiveContext.Done(): - klog.Info("stop sync NodeUpgradeJob") - return - case e := <-dc.nodeUpgradeJobManager.Events(): - upgrade, ok := e.Object.(*v1alpha1.NodeUpgradeJob) - if !ok { - klog.Warningf("object type: %T unsupported", e.Object) - continue - } - switch e.Type { - case watch.Added: - dc.nodeUpgradeJobAdded(upgrade) - case watch.Deleted: - dc.nodeUpgradeJobDeleted(upgrade) - case watch.Modified: - dc.nodeUpgradeJobUpdated(upgrade) - default: - klog.Warningf("NodeUpgradeJob event type: %s unsupported", e.Type) - } - } - } -} - -func buildUpgradeResource(upgradeID, nodeID string) string { - resource := fmt.Sprintf("%s%s%s%s%s%s%s", NodeUpgrade, constants.ResourceSep, upgradeID, constants.ResourceSep, "node", constants.ResourceSep, nodeID) - return resource -} - -// nodeUpgradeJobAdded is used to process addition of new NodeUpgradeJob in apiserver -func (dc *DownstreamController) nodeUpgradeJobAdded(upgrade *v1alpha1.NodeUpgradeJob) { - klog.V(4).Infof("add NodeUpgradeJob: %v", upgrade) - // store in cache map - dc.nodeUpgradeJobManager.UpgradeMap.Store(upgrade.Name, upgrade) - - // If all or partial edge nodes upgrade is upgrading or completed, we don't need to send upgrade message - if isCompleted(upgrade) { - klog.Errorf("The nodeUpgradeJob is already running or completed, don't send upgrade message again") - return - } - - // get node list that need upgrading - var nodesToUpgrade []string - if len(upgrade.Spec.NodeNames) != 0 { - for _, node := range upgrade.Spec.NodeNames { - nodeInfo, err := dc.informer.Core().V1().Nodes().Lister().Get(node) - if err != nil { - klog.Errorf("Failed to get node(%s) info: %v", node, err) - continue - } - - if needUpgrade(nodeInfo, upgrade.Spec.Version) { - nodesToUpgrade = append(nodesToUpgrade, nodeInfo.Name) - } - } - } else if upgrade.Spec.LabelSelector != nil { - selector, err := metav1.LabelSelectorAsSelector(upgrade.Spec.LabelSelector) - if err != nil { - klog.Errorf("LabelSelector(%s) is not valid: %v", upgrade.Spec.LabelSelector, err) - return - } - - nodes, err := dc.informer.Core().V1().Nodes().Lister().List(selector) - if err != nil { - klog.Errorf("Failed to get nodes with label %s: %v", selector.String(), err) - return - } - - for _, node := range nodes { - if needUpgrade(node, upgrade.Spec.Version) { - nodesToUpgrade = append(nodesToUpgrade, node.Name) - } - } - } - - // deduplicate: remove duplicate nodes to avoid repeating upgrade to the same node - nodesToUpgrade = util.RemoveDuplicateElement(nodesToUpgrade) - - klog.Infof("Filtered finished, the below nodes are to upgrade\n%v\n", nodesToUpgrade) - - // upgrade most `UpgradeJob.Spec.Concurrency` nodes once a time - nodesChan := make(chan string, upgrade.Spec.Concurrency) - - // select nodes to do upgrade operation - go dc.selectConcurrentNodes(nodesChan, nodesToUpgrade, upgrade) - - go func() { - for node := range nodesChan { - dc.processUpgrade(node, upgrade) - } - }() -} - -// processUpgrade do the upgrade operation on node -func (dc *DownstreamController) processUpgrade(node string, upgrade *v1alpha1.NodeUpgradeJob) { - klog.V(4).Infof("begin to upgrade node %s", node) - // if users specify Image, we'll use upgrade Version as its image tag, even though Image contains tag. - // if not, we'll use default image: kubeedge/installation-package:${Version} - var repo string - var err error - repo = "kubeedge/installation-package" - if upgrade.Spec.Image != "" { - repo, err = GetImageRepo(upgrade.Spec.Image) - if err != nil { - klog.Errorf("Image format is not right: %v", err) - return - } - } - imageTag := upgrade.Spec.Version - image := fmt.Sprintf("%s:%s", repo, imageTag) - - // send upgrade msg to edge node - msg := model.NewMessage("") - - resource := buildUpgradeResource(upgrade.Name, node) - - upgradeReq := commontypes.NodeUpgradeJobRequest{ - UpgradeID: upgrade.Name, - HistoryID: uuid.New().String(), - UpgradeTool: upgrade.Spec.UpgradeTool, - Version: upgrade.Spec.Version, - Image: image, - } - - msg.BuildRouter(modules.NodeUpgradeJobControllerModuleName, modules.NodeUpgradeJobControllerModuleGroup, resource, NodeUpgrade). - FillBody(upgradeReq) - - err = dc.messageLayer.Send(*msg) - if err != nil { - klog.Errorf("Failed to send upgrade message %v due to error %v", msg.GetID(), err) - return - } - - // process time out: could not receive upgrade feedback from edge - // send upgrade timeout response message to upstream - go dc.handleNodeUpgradeJobTimeout(node, upgrade.Name, upgrade.Spec.Version, upgradeReq.HistoryID, upgrade.Spec.TimeoutSeconds) - - // mark Upgrade state upgrading - status := &v1alpha1.UpgradeStatus{ - NodeName: node, - State: v1alpha1.Upgrading, - History: v1alpha1.History{ - HistoryID: upgradeReq.HistoryID, - UpgradeTime: time.Now().Format(ISO8601UTC), - }, - } - err = patchNodeUpgradeJobStatus(dc.crdClient, upgrade, status) - if err != nil { - // not return, continue to mark node unschedulable - klog.Errorf("Failed to mark Upgrade upgrading status: %v", err) - } - - // mark edge node unschedulable - // the effect is like running cmd: kubectl drain <node-to-drain> --ignore-daemonsets - unscheduleNode := v1.Node{} - unscheduleNode.Spec.Unschedulable = true - - // add a upgrade label - unscheduleNode.Labels = map[string]string{NodeUpgradeJobStatusKey: NodeUpgradeJobStatusValue} - byteNode, err := json.Marshal(unscheduleNode) - if err != nil { - klog.Warningf("marshal data failed: %v", err) - return - } - - _, err = dc.kubeClient.CoreV1().Nodes().Patch(context.Background(), node, apimachineryType.StrategicMergePatchType, byteNode, metav1.PatchOptions{}) - if err != nil { - klog.Errorf("failed to drain node %s: %v", node, err) - return - } -} - -// selectConcurrentNodes select the nodes to do upgrade operation, and put it into channel nodesChan -func (dc *DownstreamController) selectConcurrentNodes(nodesChan chan string, allNodes []string, upgrade *v1alpha1.NodeUpgradeJob) { - // the default concurrency is 1 - // this means that we will upgrade nodes one by one - // only when the last one node upgrade finished, we'll continue to upgrade the next one node - concurrency := 1 - if upgrade.Spec.Concurrency != 0 { - concurrency = int(upgrade.Spec.Concurrency) - } - - timeout := int(*upgrade.Spec.TimeoutSeconds) * (len(allNodes) + 1) / concurrency - - err := wait.Poll(10*time.Second, time.Duration(timeout)*time.Second, func() (bool, error) { - upgradeJob, err := dc.crdClient.OperationsV1alpha1().NodeUpgradeJobs().Get(context.TODO(), upgrade.Name, metav1.GetOptions{}) - if err != nil { - return false, nil - } - - // if all the nodes upgrade is completed, close channel to inform that the upgrade is finished - if upgradeJob.Status.State == v1alpha1.Completed { - klog.Infof("all the nodes upgrade status are completed") - close(nodesChan) - return true, nil - } - - // calculate the number of nodes in upgrading operation - upgradingNum := 0 - for _, status := range upgradeJob.Status.Status { - if status.State == v1alpha1.Upgrading { - upgradingNum++ - } - } - - // ensure the max number of upgrading nodes is Concurrency - if upgradingNum < concurrency { - for i := 0; i < concurrency-upgradingNum; i++ { - nodesChan <- allNodes[i] - } - - allNodes = allNodes[:int(concurrency)-upgradingNum] - } - - if len(allNodes) == 0 { - // all the nodes are to upgrade - klog.Infof("The number of nodes need to be upgraded reach 0") - close(nodesChan) - return true, nil - } - return false, nil - }) - - if err != nil { - klog.Errorf("failed to select all the related nodes to do upgrade operation: %v", err) - close(nodesChan) - } -} - -func needUpgrade(node *v1.Node, upgradeVersion string) bool { - if filterVersion(node.Status.NodeInfo.KubeletVersion, upgradeVersion) { - klog.Warningf("Node(%s) version(%s) already on the expected version %s.", node.Name, node.Status.NodeInfo.KubeletVersion, upgradeVersion) - return false - } - - // we only care about edge nodes, so just remove not edge nodes - if !util.IsEdgeNode(node) { - klog.Warningf("Node(%s) is not edge node", node.Name) - return false - } - - // if node is in Upgrading state, don't need upgrade - if _, ok := node.Labels[NodeUpgradeJobStatusKey]; ok { - klog.Warningf("Node(%s) is in upgrade state", node.Name) - return false - } - - // if node is in NotReady state, don't need upgrade - for _, condition := range node.Status.Conditions { - if condition.Type == v1.NodeReady && condition.Status != v1.ConditionTrue { - klog.Warningf("Node(%s) is in NotReady state", node.Name) - return false - } - } - - return true -} - -// handleNodeUpgradeJobTimeout is used to handle the situation that cloud don't receive upgrade result from edge node -// within the timeout period -func (dc *DownstreamController) handleNodeUpgradeJobTimeout(node string, upgradeID string, upgradeVersion string, historyID string, timeoutSeconds *uint32) { - // by default, if we don't receive upgrade response in 300s, we think it's timeout - // if we have specified the timeout in Upgrade, we'll use it as the timeout time - var timeout uint32 = 300 - if timeoutSeconds != nil && *timeoutSeconds != 0 { - timeout = *timeoutSeconds - } - - receiveFeedback := false - - // check whether edgecore report to the cloud about the upgrade result - // we don't care about function Poll return error - // if we don't receive Upgrade response, Poll function also return error: timed out waiting for the condition - // we only care about variable: receiveFeedback - _ = wait.Poll(10*time.Second, time.Duration(timeout)*time.Second, func() (bool, error) { - v, ok := dc.nodeUpgradeJobManager.UpgradeMap.Load(upgradeID) - if !ok { - // we think it's receiveFeedback to avoid construct timeout response by ourselves - receiveFeedback = true - klog.Errorf("NodeUpgradeJob %v not exist", upgradeID) - return false, fmt.Errorf("nodeUpgrade %v not exist", upgradeID) - } - upgradeValue := v.(*v1alpha1.NodeUpgradeJob) - for index := range upgradeValue.Status.Status { - if upgradeValue.Status.Status[index].NodeName == node { - // if HistoryID matches and state is v1alpha1.Completed - // it means we've received the specified Upgrade Operation - if upgradeValue.Status.Status[index].History.HistoryID == historyID && - upgradeValue.Status.Status[index].State == v1alpha1.Completed { - receiveFeedback = true - return true, nil - } - break - } - } - return false, nil - }) - - if receiveFeedback { - // if already receive edge upgrade feedback, do nothing - return - } - - klog.Errorf("NOT receive node(%s) upgrade(%s) feedback response", node, upgradeID) - - // construct timeout upgrade response - // and send it to upgrade controller upstream - upgradeResource := buildUpgradeResource(upgradeID, node) - resp := commontypes.NodeUpgradeJobResponse{ - UpgradeID: upgradeID, - HistoryID: historyID, - NodeName: node, - FromVersion: "", - ToVersion: upgradeVersion, - Status: string(v1alpha1.UpgradeFailedRollbackSuccess), - Reason: "timeout to get upgrade response from edge, maybe error due to cloud or edge", - } - - updateMsg := model.NewMessage(""). - BuildRouter(modules.NodeUpgradeJobControllerModuleName, modules.NodeUpgradeJobControllerModuleGroup, upgradeResource, NodeUpgrade). - FillBody(resp) - - // send upgrade resp message to upgrade controller upstream directly - // let upgrade controller upstream to update upgrade status - beehiveContext.Send(modules.NodeUpgradeJobControllerModuleName, *updateMsg) -} - -// nodeUpgradeJobDeleted is used to process deleted NodeUpgradeJob in apiserver -func (dc *DownstreamController) nodeUpgradeJobDeleted(upgrade *v1alpha1.NodeUpgradeJob) { - // just need to delete from cache map - dc.nodeUpgradeJobManager.UpgradeMap.Delete(upgrade.Name) -} - -// upgradeAdded is used to process update of new NodeUpgradeJob in apiserver -func (dc *DownstreamController) nodeUpgradeJobUpdated(upgrade *v1alpha1.NodeUpgradeJob) { - oldValue, ok := dc.nodeUpgradeJobManager.UpgradeMap.Load(upgrade.Name) - // store in cache map - dc.nodeUpgradeJobManager.UpgradeMap.Store(upgrade.Name, upgrade) - if !ok { - klog.Infof("Upgrade %s not exist, and store it first", upgrade.Name) - // If Upgrade not present in Upgrade map means it is not modified and added. - dc.nodeUpgradeJobAdded(upgrade) - return - } - - old := oldValue.(*v1alpha1.NodeUpgradeJob) - if !isUpgradeUpdated(upgrade, old) { - klog.V(4).Infof("Upgrade %s no need to update", upgrade.Name) - return - } - - dc.nodeUpgradeJobAdded(upgrade) -} - -// isUpgradeUpdated checks Upgrade is actually updated or not -func isUpgradeUpdated(new *v1alpha1.NodeUpgradeJob, old *v1alpha1.NodeUpgradeJob) bool { - // now we don't allow update spec fields - // so always return false to avoid sending Upgrade msg to edge again when status fields changed - return false -} - -func NewDownstreamController(crdInformerFactory crdinformers.SharedInformerFactory) (*DownstreamController, error) { - nodeUpgradeJobManager, err := manager.NewNodeUpgradeJobManager(crdInformerFactory.Operations().V1alpha1().NodeUpgradeJobs().Informer()) - if err != nil { - klog.Warningf("Create NodeUpgradeJob manager failed with error: %s", err) - return nil, err - } - - dc := &DownstreamController{ - kubeClient: client.GetKubeClient(), - informer: informers.GetInformersManager().GetKubeInformerFactory(), - crdClient: client.GetCRDClient(), - nodeUpgradeJobManager: nodeUpgradeJobManager, - messageLayer: messagelayer.NodeUpgradeJobControllerMessageLayer(), - } - return dc, nil -} diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/upstream.go b/cloud/pkg/nodeupgradejobcontroller/controller/upstream.go deleted file mode 100644 index 6b3e53ffb..000000000 --- a/cloud/pkg/nodeupgradejobcontroller/controller/upstream.go +++ /dev/null @@ -1,244 +0,0 @@ -/* -Copyright 2022 The KubeEdge Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "context" - "encoding/json" - "fmt" - "strings" - - jsonpatch "github.com/evanphx/json-patch" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - apimachineryType "k8s.io/apimachinery/pkg/types" - k8sinformer "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" - - beehiveContext "github.com/kubeedge/beehive/pkg/core/context" - "github.com/kubeedge/beehive/pkg/core/model" - keclient "github.com/kubeedge/kubeedge/cloud/pkg/common/client" - "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" - "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer" - "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller/config" - "github.com/kubeedge/kubeedge/common/types" - "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" - crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" -) - -// UpstreamController subscribe messages from edge and sync to k8s api server -type UpstreamController struct { - // downstream controller to update NodeUpgradeJob status in cache - dc *DownstreamController - - kubeClient kubernetes.Interface - informer k8sinformer.SharedInformerFactory - crdClient crdClientset.Interface - messageLayer messagelayer.MessageLayer - // message channel - nodeUpgradeJobStatusChan chan model.Message -} - -// Start UpstreamController -func (uc *UpstreamController) Start() error { - klog.Info("Start NodeUpgradeJob Upstream Controller") - - uc.nodeUpgradeJobStatusChan = make(chan model.Message, config.Config.Buffer.UpdateNodeUpgradeJobStatus) - go uc.dispatchMessage() - - for i := 0; i < int(config.Config.Load.NodeUpgradeJobWorkers); i++ { - go uc.updateNodeUpgradeJobStatus() - } - return nil -} - -// Start UpstreamController -func (uc *UpstreamController) dispatchMessage() { - for { - select { - case <-beehiveContext.Done(): - klog.Info("Stop dispatch NodeUpgradeJob upstream message") - return - default: - } - - msg, err := uc.messageLayer.Receive() - if err != nil { - klog.Warningf("Receive message failed, %v", err) - continue - } - - klog.V(4).Infof("NodeUpgradeJob upstream controller receive msg %#v", msg) - - uc.nodeUpgradeJobStatusChan <- msg - } -} - -// updateNodeUpgradeJobStatus update NodeUpgradeJob status field -func (uc *UpstreamController) updateNodeUpgradeJobStatus() { - for { - select { - case <-beehiveContext.Done(): - klog.Info("Stop update NodeUpgradeJob status") - return - case msg := <-uc.nodeUpgradeJobStatusChan: - klog.V(4).Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource()) - - // get nodeID and upgradeID from Upgrade msg: - nodeID := getNodeName(msg.GetResource()) - upgradeID := getUpgradeID(msg.GetResource()) - - oldValue, ok := uc.dc.nodeUpgradeJobManager.UpgradeMap.Load(upgradeID) - if !ok { - klog.Errorf("NodeUpgradeJob %s not exist", upgradeID) - continue - } - - upgrade, ok := oldValue.(*v1alpha1.NodeUpgradeJob) - if !ok { - klog.Errorf("NodeUpgradeJob info %T is not valid", oldValue) - continue - } - - data, err := msg.GetContentData() - if err != nil { - klog.Errorf("failed to get node upgrade content data: %v", err) - continue - } - resp := &types.NodeUpgradeJobResponse{} - err = json.Unmarshal(data, resp) - if err != nil { - klog.Errorf("Failed to unmarshal node upgrade response: %v", err) - continue - } - - status := &v1alpha1.UpgradeStatus{ - NodeName: nodeID, - State: v1alpha1.Completed, - History: v1alpha1.History{ - HistoryID: resp.HistoryID, - FromVersion: resp.FromVersion, - ToVersion: resp.ToVersion, - Result: v1alpha1.UpgradeResult(resp.Status), - Reason: resp.Reason, - }, - } - err = patchNodeUpgradeJobStatus(uc.crdClient, upgrade, status) - if err != nil { - klog.Errorf("Failed to mark NodeUpgradeJob status to completed: %v", err) - } - - // The below are to mark edge node schedulable - // And to keep a successful record in node annotation only when upgrade is successful - // like: nodeupgradejob.operations.kubeedge.io/history: "v1.9.0->v1.10.0->v1.11.1" - nodeInfo, err := uc.informer.Core().V1().Nodes().Lister().Get(nodeID) - if err != nil { - klog.Errorf("Failed to get node info: %v", err) - continue - } - - // mark edge node schedulable - // the effect is like running cmd: kubectl uncordon <node-to-drain> - if nodeInfo.Labels != nil { - if value, ok := nodeInfo.Labels[NodeUpgradeJobStatusKey]; ok { - if value == NodeUpgradeJobStatusValue { - nodeInfo.Spec.Unschedulable = false - delete(nodeInfo.Labels, NodeUpgradeJobStatusKey) - } - } - } - // record upgrade logs in node annotation - if v1alpha1.UpgradeResult(resp.Status) == v1alpha1.UpgradeSuccess { - if nodeInfo.Annotations == nil { - nodeInfo.Annotations = make(map[string]string) - } - nodeInfo.Annotations[NodeUpgradeHistoryKey] = mergeAnnotationUpgradeHistory(nodeInfo.Annotations[NodeUpgradeHistoryKey], resp.FromVersion, resp.ToVersion) - } - _, err = uc.kubeClient.CoreV1().Nodes().Update(context.Background(), nodeInfo, metav1.UpdateOptions{}) - if err != nil { - // just log, and continue to process the next step - klog.Errorf("Failed to mark node schedulable and add upgrade record: %v", err) - } - } - } -} - -// patchNodeUpgradeJobStatus call patch api to patch update NodeUpgradeJob status -func patchNodeUpgradeJobStatus(crdClient crdClientset.Interface, upgrade *v1alpha1.NodeUpgradeJob, status *v1alpha1.UpgradeStatus) error { - oldValue := upgrade.DeepCopy() - - newValue := UpdateNodeUpgradeJobStatus(oldValue, status) - - // after mark each node upgrade state, we also need to judge whether all edge node upgrade is completed - // if all edge node is in completed state, we should set the total state to completed - var completed int - for _, v := range newValue.Status.Status { - if v.State == v1alpha1.Completed { - completed++ - } - } - if completed == len(newValue.Status.Status) { - newValue.Status.State = v1alpha1.Completed - } else { - newValue.Status.State = v1alpha1.Upgrading - } - - oldData, err := json.Marshal(oldValue) - if err != nil { - return fmt.Errorf("failed to marshal the old NodeUpgradeJob(%s): %v", oldValue.Name, err) - } - - newData, err := json.Marshal(newValue) - if err != nil { - return fmt.Errorf("failed to marshal the new NodeUpgradeJob(%s): %v", newValue.Name, err) - } - - patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) - if err != nil { - return fmt.Errorf("failed to create a merge patch: %v", err) - } - - _, err = crdClient.OperationsV1alpha1().NodeUpgradeJobs().Patch(context.TODO(), newValue.Name, apimachineryType.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") - if err != nil { - return fmt.Errorf("failed to patch update NodeUpgradeJob status: %v", err) - } - - return nil -} - -func getNodeName(resource string) string { - // upgrade/${UpgradeID}/node/${NodeID} - s := strings.Split(resource, "/") - return s[3] -} -func getUpgradeID(resource string) string { - // upgrade/${UpgradeID}/node/${NodeID} - s := strings.Split(resource, "/") - return s[1] -} - -// NewUpstreamController create UpstreamController from config -func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) { - uc := &UpstreamController{ - kubeClient: keclient.GetKubeClient(), - informer: informers.GetInformersManager().GetKubeInformerFactory(), - crdClient: keclient.GetCRDClient(), - messageLayer: messagelayer.NodeUpgradeJobControllerMessageLayer(), - dc: dc, - } - return uc, nil -} diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/util.go b/cloud/pkg/nodeupgradejobcontroller/controller/util.go deleted file mode 100644 index 8b80c4261..000000000 --- a/cloud/pkg/nodeupgradejobcontroller/controller/util.go +++ /dev/null @@ -1,124 +0,0 @@ -/* -Copyright 2022 The KubeEdge Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "fmt" - "strings" - "time" - - "github.com/distribution/distribution/v3/reference" - - "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" -) - -const ( - NodeUpgradeJobStatusKey = "nodeupgradejob.operations.kubeedge.io/status" - NodeUpgradeJobStatusValue = "" - NodeUpgradeHistoryKey = "nodeupgradejob.operations.kubeedge.io/history" -) - -const ( - NodeUpgrade = "upgrade" - - ISO8601UTC = "2006-01-02T15:04:05Z" -) - -// filterVersion returns true only if the edge node version already on the upgrade req -// version is like: v1.22.6-kubeedge-v1.10.0-beta.0.185+95378fb019912a, expected is like v1.10.0 -func filterVersion(version string, expected string) bool { - // if not correct version format, also return true - index := strings.Index(version, "-kubeedge-") - if index == -1 { - return false - } - - length := len("-kubeedge-") - - // filter nodes that already in the required version - return version[index+length:] == expected -} - -// isCompleted returns true only if some/all edge upgrade is upgrading or completed -func isCompleted(upgrade *v1alpha1.NodeUpgradeJob) bool { - // all edge node upgrade is upgrading or completed - if upgrade.Status.State != v1alpha1.InitialValue { - return true - } - - // partial edge node upgrade is upgrading or completed - for _, status := range upgrade.Status.Status { - if status.State != v1alpha1.InitialValue { - return true - } - } - - return false -} - -// UpdateNodeUpgradeJobStatus updates the status -// return the updated result -func UpdateNodeUpgradeJobStatus(old *v1alpha1.NodeUpgradeJob, status *v1alpha1.UpgradeStatus) *v1alpha1.NodeUpgradeJob { - // return value upgrade cannot populate the input parameter old - upgrade := old.DeepCopy() - - for index := range upgrade.Status.Status { - // If Node's Upgrade info exist, just overwrite - if upgrade.Status.Status[index].NodeName == status.NodeName { - // The input status no upgradeTime, we need set it with old value - status.History.UpgradeTime = upgrade.Status.Status[index].History.UpgradeTime - upgrade.Status.Status[index] = *status - return upgrade - } - } - - // if Node's Upgrade info not exist, just append - if status.History.UpgradeTime == "" { - // If upgrade time is blank, set to the current time - status.History.UpgradeTime = time.Now().Format(ISO8601UTC) - } - upgrade.Status.Status = append(upgrade.Status.Status, *status) - - return upgrade -} - -// mergeAnnotationUpgradeHistory constructs the new history based on the origin history -// and we'll only keep 3 records -func mergeAnnotationUpgradeHistory(origin, fromVersion, toVersion string) string { - newHistory := fmt.Sprintf("%s->%s", fromVersion, toVersion) - if origin == "" { - return newHistory - } - - sets := strings.Split(origin, ";") - if len(sets) > 2 { - sets = sets[1:] - } - - sets = append(sets, newHistory) - return strings.Join(sets, ";") -} - -// GetImageRepo gets repo from a container image -func GetImageRepo(image string) (string, error) { - named, err := reference.ParseNormalizedNamed(image) - if err != nil { - return "", fmt.Errorf("failed to parse image name: %v", err) - } - - return named.Name(), nil -} diff --git a/cloud/pkg/nodeupgradejobcontroller/nodeupgradejobcontroller.go b/cloud/pkg/nodeupgradejobcontroller/nodeupgradejobcontroller.go deleted file mode 100644 index d9c4d8e4c..000000000 --- a/cloud/pkg/nodeupgradejobcontroller/nodeupgradejobcontroller.go +++ /dev/null @@ -1,91 +0,0 @@ -/* -Copyright 2022 The KubeEdge Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package nodeupgradejobcontroller - -import ( - "time" - - "k8s.io/klog/v2" - - "github.com/kubeedge/beehive/pkg/core" - "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" - "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" - "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller/config" - "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller/controller" - "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1" -) - -// NodeUpgradeJobController is controller for processing upgrading edge node from cloud -type NodeUpgradeJobController struct { - downstream *controller.DownstreamController - upstream *controller.UpstreamController - enable bool -} - -var _ core.Module = (*NodeUpgradeJobController)(nil) - -func newNodeUpgradeJobController(enable bool) *NodeUpgradeJobController { - if !enable { - return &NodeUpgradeJobController{enable: enable} - } - downstream, err := controller.NewDownstreamController(informers.GetInformersManager().GetKubeEdgeInformerFactory()) - if err != nil { - klog.Exitf("New NodeUpgradeJob Controller downstream failed with error: %s", err) - } - upstream, err := controller.NewUpstreamController(downstream) - if err != nil { - klog.Exitf("New NodeUpgradeJob Controller upstream failed with error: %s", err) - } - return &NodeUpgradeJobController{ - downstream: downstream, - upstream: upstream, - enable: enable, - } -} - -func Register(dc *v1alpha1.NodeUpgradeJobController) { - config.InitConfigure(dc) - core.Register(newNodeUpgradeJobController(dc.Enable)) -} - -// Name of controller -func (uc *NodeUpgradeJobController) Name() string { - return modules.NodeUpgradeJobControllerModuleName -} - -// Group of controller -func (uc *NodeUpgradeJobController) Group() string { - return modules.NodeUpgradeJobControllerModuleGroup -} - -// Enable indicates whether enable this module -func (uc *NodeUpgradeJobController) Enable() bool { - return uc.enable -} - -// Start controller -func (uc *NodeUpgradeJobController) Start() { - if err := uc.downstream.Start(); err != nil { - klog.Exitf("start NodeUpgradeJob controller downstream failed with error: %s", err) - } - // wait for downstream controller to start and load NodeUpgradeJob - // TODO think about sync - time.Sleep(1 * time.Second) - if err := uc.upstream.Start(); err != nil { - klog.Exitf("start NodeUpgradeJob controller upstream failed with error: %s", err) - } -} diff --git a/cloud/pkg/imageprepullcontroller/config/config.go b/cloud/pkg/taskmanager/config/config.go index 88572a842..1940f14f8 100644 --- a/cloud/pkg/imageprepullcontroller/config/config.go +++ b/cloud/pkg/taskmanager/config/config.go @@ -26,13 +26,13 @@ var Config Configure var once sync.Once type Configure struct { - v1alpha1.ImagePrePullController + v1alpha1.TaskManager } -func InitConfigure(dc *v1alpha1.ImagePrePullController) { +func InitConfigure(tm *v1alpha1.TaskManager) { once.Do(func() { Config = Configure{ - ImagePrePullController: *dc, + TaskManager: *tm, } }) } diff --git a/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_controller.go b/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_controller.go new file mode 100644 index 000000000..31dc525aa --- /dev/null +++ b/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_controller.go @@ -0,0 +1,338 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package imageprepullcontroller + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strconv" + "sync" + "time" + + jsonpatch "github.com/evanphx/json-patch" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apimachineryType "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2" + + beehiveContext "github.com/kubeedge/beehive/pkg/core/context" + "github.com/kubeedge/kubeedge/cloud/pkg/common/client" + keclient "github.com/kubeedge/kubeedge/cloud/pkg/common/client" + "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/controller" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/manager" + commontypes "github.com/kubeedge/kubeedge/common/types" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" + "github.com/kubeedge/kubeedge/pkg/util/fsm" +) + +type ImagePrePullController struct { + sync.Mutex + *controller.BaseController +} + +var cache *manager.TaskCache + +func NewImagePrePullController(messageChan chan util.TaskMessage) (*ImagePrePullController, error) { + var err error + cache, err = manager.NewTaskCache( + informers.GetInformersManager().GetKubeEdgeInformerFactory().Operations().V1alpha1().ImagePrePullJobs().Informer()) + if err != nil { + klog.Warningf("Create image pre pull controller failed with error: %s", err) + return nil, err + } + return &ImagePrePullController{ + BaseController: &controller.BaseController{ + Informer: informers.GetInformersManager().GetKubeInformerFactory(), + TaskManager: cache, + MessageChan: messageChan, + CrdClient: client.GetCRDClient(), + KubeClient: keclient.GetKubeClient(), + }, + }, nil +} + +func (ndc *ImagePrePullController) ReportNodeStatus(taskID, nodeID string, event fsm.Event) (api.State, error) { + nodeFSM := NewImagePrePullNodeFSM(taskID, nodeID) + err := nodeFSM.AllowTransit(event) + if err != nil { + return "", err + } + state, err := nodeFSM.CurrentState() + if err != nil { + return "", err + } + ndc.Lock() + defer ndc.Unlock() + err = nodeFSM.Transit(event) + if err != nil { + return "", err + } + checkStatusChanged(nodeFSM, state) + state, err = nodeFSM.CurrentState() + if err != nil { + return "", err + } + return state, nil +} + +func checkStatusChanged(nodeFSM *fsm.FSM, state api.State) { + err := wait.Poll(100*time.Millisecond, time.Second, func() (bool, error) { + nowState, err := nodeFSM.CurrentState() + if err != nil { + return false, nil + } + if nowState == state { + return false, nil + } + return true, err + }) + if err != nil { + klog.V(4).Infof("check status changed failed: %s", err.Error()) + } +} + +func (ndc *ImagePrePullController) ReportTaskStatus(taskID string, event fsm.Event) (api.State, error) { + taskFSM := NewImagePrePullTaskFSM(taskID) + state, err := taskFSM.CurrentState() + if err != nil { + return "", err + } + err = taskFSM.AllowTransit(event) + if err != nil { + return "", err + } + err = taskFSM.Transit(event) + if err != nil { + return "", err + } + checkStatusChanged(taskFSM, state) + return taskFSM.CurrentState() +} + +func (ndc *ImagePrePullController) StageCompleted(taskID string, state api.State) bool { + taskFSM := NewImagePrePullTaskFSM(taskID) + return taskFSM.TaskStagCompleted(state) +} + +func (ndc *ImagePrePullController) GetNodeStatus(name string) ([]v1alpha1.TaskStatus, error) { + imagePrePull, err := ndc.CrdClient.OperationsV1alpha1().ImagePrePullJobs().Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + statusList := make([]v1alpha1.TaskStatus, len(imagePrePull.Status.Status)) + for i, status := range imagePrePull.Status.Status { + if status.TaskStatus == nil { + statusList[i] = v1alpha1.TaskStatus{} + continue + } + statusList[i] = *status.TaskStatus + } + return statusList, nil +} + +func (ndc *ImagePrePullController) UpdateNodeStatus(name string, nodeStatus []v1alpha1.TaskStatus) error { + imagePrePull, err := ndc.CrdClient.OperationsV1alpha1().ImagePrePullJobs().Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return err + } + status := imagePrePull.Status + statusList := make([]v1alpha1.ImagePrePullStatus, len(nodeStatus)) + for i := 0; i < len(nodeStatus); i++ { + statusList[i].TaskStatus = &nodeStatus[i] + } + status.Status = statusList + err = patchStatus(imagePrePull, status, ndc.CrdClient) + if err != nil { + return err + } + return nil +} + +func patchStatus(imagePrePullJob *v1alpha1.ImagePrePullJob, status v1alpha1.ImagePrePullJobStatus, crdClient crdClientset.Interface) error { + oldData, err := json.Marshal(imagePrePullJob) + if err != nil { + return fmt.Errorf("failed to marshal the old ImagePrePullJob(%s): %v", imagePrePullJob.Name, err) + } + imagePrePullJob.Status = status + newData, err := json.Marshal(imagePrePullJob) + if err != nil { + return fmt.Errorf("failed to marshal the new ImagePrePullJob(%s): %v", imagePrePullJob.Name, err) + } + + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return fmt.Errorf("failed to create a merge patch: %v", err) + } + + result, err := crdClient.OperationsV1alpha1().ImagePrePullJobs().Patch(context.TODO(), imagePrePullJob.Name, apimachineryType.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") + if err != nil { + return fmt.Errorf("failed to patch update ImagePrePullJob status: %v", err) + } + klog.V(4).Info("patch update task status result: ", result) + return nil +} + +func (ndc *ImagePrePullController) Start() error { + go ndc.startSync() + return nil +} + +func (ndc *ImagePrePullController) startSync() { + imagePrePullList, err := ndc.CrdClient.OperationsV1alpha1().ImagePrePullJobs().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + klog.Errorf(err.Error()) + os.Exit(2) + } + for _, imagePrePull := range imagePrePullList.Items { + if fsm.TaskFinish(imagePrePull.Status.State) { + continue + } + ndc.imagePrePullJobAdded(&imagePrePull) + } + for { + select { + case <-beehiveContext.Done(): + klog.Info("stop sync ImagePrePullJob") + return + case e := <-ndc.TaskManager.Events(): + prePull, ok := e.Object.(*v1alpha1.ImagePrePullJob) + if !ok { + klog.Warningf("object type: %T unsupported", e.Object) + continue + } + switch e.Type { + case watch.Added: + ndc.imagePrePullJobAdded(prePull) + case watch.Deleted: + ndc.imagePrePullJobDeleted(prePull) + case watch.Modified: + ndc.imagePrePullJobUpdated(prePull) + default: + klog.Warningf("ImagePrePullJob event type: %s unsupported", e.Type) + } + } + } +} + +// imagePrePullJobAdded is used to process addition of new ImagePrePullJob in apiserver +func (ndc *ImagePrePullController) imagePrePullJobAdded(imagePrePull *v1alpha1.ImagePrePullJob) { + klog.V(4).Infof("add ImagePrePullJob: %v", imagePrePull) + // store in cache map + ndc.TaskManager.CacheMap.Store(imagePrePull.Name, imagePrePull) + + // If all or partial edge nodes image pull is pulling or completed, we don't need to send pull message + if fsm.TaskFinish(imagePrePull.Status.State) { + klog.Warning("The ImagePrePullJob is completed, don't send pull message again") + return + } + + ndc.processPrePull(imagePrePull) +} + +// processPrePull do the pre pull operation on node +func (ndc *ImagePrePullController) processPrePull(imagePrePull *v1alpha1.ImagePrePullJob) { + imagePrePullTemplateInfo := imagePrePull.Spec.ImagePrePullTemplate + imagePrePullRequest := commontypes.ImagePrePullJobRequest{ + Images: imagePrePullTemplateInfo.Images, + Secret: imagePrePullTemplateInfo.ImageSecret, + RetryTimes: imagePrePullTemplateInfo.RetryTimes, + CheckItems: imagePrePullTemplateInfo.CheckItems, + } + tolerate, err := strconv.ParseFloat(imagePrePull.Spec.ImagePrePullTemplate.FailureTolerate, 64) + if err != nil { + klog.Errorf("convert FailureTolerate to float64 failed: %v", err) + tolerate = 0.1 + } + + concurrency := imagePrePull.Spec.ImagePrePullTemplate.Concurrency + if concurrency <= 0 { + concurrency = 1 + } + klog.V(4).Infof("deal task message: %v", imagePrePull) + ndc.MessageChan <- util.TaskMessage{ + Type: util.TaskPrePull, + CheckItem: imagePrePull.Spec.ImagePrePullTemplate.CheckItems, + Name: imagePrePull.Name, + TimeOutSeconds: imagePrePull.Spec.ImagePrePullTemplate.TimeoutSeconds, + Concurrency: concurrency, + FailureTolerate: tolerate, + NodeNames: imagePrePull.Spec.ImagePrePullTemplate.NodeNames, + LabelSelector: imagePrePull.Spec.ImagePrePullTemplate.LabelSelector, + Status: v1alpha1.TaskStatus{}, + Msg: imagePrePullRequest, + } +} + +// imagePrePullJobDeleted is used to process deleted ImagePrePullJob in apiserver +func (ndc *ImagePrePullController) imagePrePullJobDeleted(imagePrePull *v1alpha1.ImagePrePullJob) { + // just need to delete from cache map + ndc.TaskManager.CacheMap.Delete(imagePrePull.Name) + klog.Errorf("image pre pull job %s delete", imagePrePull.Name) + ndc.MessageChan <- util.TaskMessage{ + Type: util.TaskPrePull, + Name: imagePrePull.Name, + ShutDown: true, + } +} + +// imagePrePullJobUpdated is used to process update of new ImagePrePullJob in apiserver +func (ndc *ImagePrePullController) imagePrePullJobUpdated(pullJob *v1alpha1.ImagePrePullJob) { + oldValue, ok := ndc.TaskManager.CacheMap.Load(pullJob.Name) + old := oldValue.(*v1alpha1.ImagePrePullJob) + if !ok { + klog.Infof("Update %s not exist, and store it first", pullJob.Name) + // If PrePull not present in PrePull map means it is not modified and added. + ndc.imagePrePullJobAdded(pullJob) + return + } + + // store in cache map + ndc.TaskManager.CacheMap.Store(pullJob.Name, pullJob) + + node := checkUpdateNode(old, pullJob) + if node == nil { + klog.Info("none node update") + return + } + + ndc.MessageChan <- util.TaskMessage{ + Type: util.TaskPrePull, + Name: pullJob.Name, + Status: *node, + } +} + +func checkUpdateNode(old, new *v1alpha1.ImagePrePullJob) *v1alpha1.TaskStatus { + if len(old.Status.Status) == 0 { + return nil + } + for i, updateNode := range new.Status.Status { + oldNode := old.Status.Status[i] + if !util.NodeUpdated(*oldNode.TaskStatus, *updateNode.TaskStatus) { + continue + } + return updateNode.TaskStatus + } + return nil +} diff --git a/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_task.go b/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_task.go new file mode 100644 index 000000000..3778a0ed8 --- /dev/null +++ b/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_task.go @@ -0,0 +1,133 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package imageprepullcontroller + +import ( + "encoding/json" + "fmt" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/kubeedge/cloud/pkg/common/client" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util" + fsmapi "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + v1alpha12 "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" +) + +func currentPrePullNodeState(id, nodeName string) (v1alpha12.State, error) { + v, ok := cache.CacheMap.Load(id) + if !ok { + return "", fmt.Errorf("can not find task %s", id) + } + task := v.(*v1alpha1.ImagePrePullJob) + var state v1alpha12.State + for _, status := range task.Status.Status { + if status.NodeName == nodeName { + state = status.State + break + } + } + if state == "" { + state = v1alpha12.TaskInit + } + return state, nil +} + +func updatePrePullNodeState(id, nodeName string, state v1alpha12.State, event fsm.Event) error { + v, ok := cache.CacheMap.Load(id) + if !ok { + return fmt.Errorf("can not find task %s", id) + } + task := v.(*v1alpha1.ImagePrePullJob) + newTask := task.DeepCopy() + status := newTask.Status.DeepCopy() + for i, nodeStatus := range status.Status { + if nodeStatus.NodeName == nodeName { + var imagesStatus []v1alpha1.ImageStatus + err := json.Unmarshal([]byte(event.ExternalMessage), &imagesStatus) + if err != nil { + klog.Warningf("Failed to unmarshal images status: %v", err) + } + status.Status[i] = v1alpha1.ImagePrePullStatus{ + TaskStatus: &v1alpha1.TaskStatus{ + NodeName: nodeName, + State: state, + Event: event.Type, + Action: event.Action, + Time: time.Now().Format(util.ISO8601UTC), + Reason: event.Msg, + }, + ImageStatus: imagesStatus, + } + break + } + } + err := patchStatus(newTask, *status, client.GetCRDClient()) + if err != nil { + return err + } + return nil +} + +func NewImagePrePullNodeFSM(taskName, nodeName string) *fsm.FSM { + fsm := &fsm.FSM{} + return fsm.NodeName(nodeName).ID(taskName).Guard(fsmapi.PrePullRule).StageSequence(fsmapi.PrePullStageSequence).CurrentFunc(currentPrePullNodeState).UpdateFunc(updatePrePullNodeState) +} + +func NewImagePrePullTaskFSM(taskName string) *fsm.FSM { + fsm := &fsm.FSM{} + return fsm.ID(taskName).Guard(fsmapi.PrePullRule).StageSequence(fsmapi.PrePullStageSequence).CurrentFunc(currentPrePullTaskState).UpdateFunc(updateUpgradeTaskState) +} + +func currentPrePullTaskState(id, _ string) (v1alpha12.State, error) { + v, ok := cache.CacheMap.Load(id) + if !ok { + return "", fmt.Errorf("can not find task %s", id) + } + task := v.(*v1alpha1.ImagePrePullJob) + state := task.Status.State + if state == "" { + state = v1alpha12.TaskInit + } + return state, nil +} + +func updateUpgradeTaskState(id, _ string, state v1alpha12.State, event fsm.Event) error { + v, ok := cache.CacheMap.Load(id) + if !ok { + return fmt.Errorf("can not find task %s", id) + } + task := v.(*v1alpha1.ImagePrePullJob) + newTask := task.DeepCopy() + status := newTask.Status.DeepCopy() + + status.Event = event.Type + status.Action = event.Action + status.Reason = event.Msg + status.State = state + status.Time = time.Now().Format(util.ISO8601UTC) + + err := patchStatus(newTask, *status, client.GetCRDClient()) + + if err != nil { + return err + } + return nil +} diff --git a/cloud/pkg/taskmanager/manager/downstream.go b/cloud/pkg/taskmanager/manager/downstream.go new file mode 100644 index 000000000..9e8e47263 --- /dev/null +++ b/cloud/pkg/taskmanager/manager/downstream.go @@ -0,0 +1,64 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "k8s.io/klog/v2" + + beehiveContext "github.com/kubeedge/beehive/pkg/core/context" + "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer" +) + +type DownstreamController struct { + downStreamChan chan model.Message + messageLayer messagelayer.MessageLayer +} + +// Start DownstreamController +func (dc *DownstreamController) Start() error { + klog.Info("Start TaskManager Downstream Controller") + + go dc.syncTask() + + return nil +} + +// syncTask is used to get events from informer +func (dc *DownstreamController) syncTask() { + for { + select { + case <-beehiveContext.Done(): + klog.Info("stop sync tasks") + return + case msg := <-dc.downStreamChan: + err := dc.messageLayer.Send(msg) + if err != nil { + klog.Errorf("Failed to send upgrade message %v due to error %v", msg.GetID(), err) + return + } + } + } +} + +func NewDownstreamController(messageChan chan model.Message) (*DownstreamController, error) { + dc := &DownstreamController{ + downStreamChan: messageChan, + messageLayer: messagelayer.TaskManagerMessageLayer(), + } + return dc, nil +} diff --git a/cloud/pkg/taskmanager/manager/executor.go b/cloud/pkg/taskmanager/manager/executor.go new file mode 100644 index 000000000..d3a7aec45 --- /dev/null +++ b/cloud/pkg/taskmanager/manager/executor.go @@ -0,0 +1,433 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "fmt" + "reflect" + "strings" + "sync" + "time" + + "github.com/google/uuid" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + beehiveContext "github.com/kubeedge/beehive/pkg/core/context" + "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/cloud/pkg/common/client" + "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/nodeupgradecontroller" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/controller" + "github.com/kubeedge/kubeedge/common/constants" + commontypes "github.com/kubeedge/kubeedge/common/types" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" +) + +const TimeOutSecond = 300 + +type Executor struct { + task util.TaskMessage + statusChan chan *v1alpha1.TaskStatus + nodes []v1alpha1.TaskStatus + controller controller.Controller + maxFailedNodes float64 + failedNodes map[string]bool + workers workers +} + +func NewExecutorMachine(messageChan chan util.TaskMessage, downStreamChan chan model.Message) (*ExecutorMachine, error) { + executorMachine = &ExecutorMachine{ + kubeClient: client.GetKubeClient(), + executors: map[string]*Executor{}, + messageChan: messageChan, + downStreamChan: downStreamChan, + } + return executorMachine, nil +} + +func GetExecutorMachine() *ExecutorMachine { + return executorMachine +} + +// Start ExecutorMachine +func (em *ExecutorMachine) Start() error { + klog.Info("Start ExecutorMachine") + + go em.syncTask() + + return nil +} + +// syncTask is used to get events from informer +func (em *ExecutorMachine) syncTask() { + for { + select { + case <-beehiveContext.Done(): + klog.Info("stop sync tasks") + return + case msg := <-em.messageChan: + if msg.ShutDown { + klog.Errorf("delete executor %s ", msg.Name) + DeleteExecutor(msg) + break + } + err := GetExecutor(msg).HandleMessage(msg.Status) + if err != nil { + klog.Errorf("Failed to handel %s message due to error %s", msg.Type, err.Error()) + break + } + } + } +} + +type ExecutorMachine struct { + kubeClient kubernetes.Interface + executors map[string]*Executor + messageChan chan util.TaskMessage + downStreamChan chan model.Message + sync.Mutex +} + +var executorMachine *ExecutorMachine + +func GetExecutor(msg util.TaskMessage) *Executor { + executorMachine.Lock() + e, ok := executorMachine.executors[fmt.Sprintf("%s::%s", msg.Type, msg.Name)] + executorMachine.Unlock() + if ok && e != nil { + return e + } + e, err := initExecutor(msg) + if err != nil { + klog.Errorf("executor init failed, error: %s", err.Error()) + return nil + } + return e +} + +func DeleteExecutor(msg util.TaskMessage) { + executorMachine.Lock() + defer executorMachine.Unlock() + delete(executorMachine.executors, fmt.Sprintf("%s::%s", msg.Type, msg.Name)) +} + +func (e *Executor) HandleMessage(status v1alpha1.TaskStatus) error { + if e == nil { + return fmt.Errorf("executor is nil") + } + e.statusChan <- &status + return nil +} + +func (e *Executor) initMessage(node v1alpha1.TaskStatus) *model.Message { + // delete it in 1.18 + if e.task.Type == util.TaskUpgrade { + msg := e.initHistoryMessage(node) + if msg != nil { + klog.Warningf("send history message to node") + return msg + } + } + + msg := model.NewMessage("") + resource := buildTaskResource(e.task.Type, e.task.Name, node.NodeName) + + taskReq := commontypes.NodeTaskRequest{ + TaskID: e.task.Name, + Type: e.task.Type, + State: string(node.State), + } + taskReq.Item = e.task.Msg + if node.State == api.TaskChecking { + taskReq.Item = commontypes.NodePreCheckRequest{ + CheckItem: e.task.CheckItem, + } + } + msg.BuildRouter(modules.TaskManagerModuleName, modules.TaskManagerModuleGroup, resource, e.task.Type). + FillBody(taskReq) + return msg +} + +func (e *Executor) initHistoryMessage(node v1alpha1.TaskStatus) *model.Message { + resource := buildUpgradeResource(e.task.Name, node.NodeName) + req := e.task.Msg.(commontypes.NodeUpgradeJobRequest) + upgradeController := e.controller.(*nodeupgradecontroller.NodeUpgradeController) + edgeVersion, err := upgradeController.GetNodeVersion(node.NodeName) + if err != nil { + klog.Errorf("get node version failed: %s", err.Error()) + return nil + } + less, err := util.VersionLess(edgeVersion, "v1.16.0") + if err != nil { + klog.Errorf("version less failed: %s", err.Error()) + return nil + } + if !less { + return nil + } + klog.Warningf("edge version is %s, is less than version %s", edgeVersion, "v1.16.0") + upgradeReq := commontypes.NodeUpgradeJobRequest{ + UpgradeID: e.task.Name, + HistoryID: uuid.New().String(), + UpgradeTool: "keadm", + Version: req.Version, + Image: req.Image, + } + msg := model.NewMessage("") + msg.BuildRouter(modules.NodeUpgradeJobControllerModuleName, modules.NodeUpgradeJobControllerModuleGroup, resource, util.TaskUpgrade). + FillBody(upgradeReq) + return msg +} + +func initExecutor(message util.TaskMessage) (*Executor, error) { + controller, err := controller.GetController(message.Type) + if err != nil { + return nil, err + } + nodeStatus, err := controller.GetNodeStatus(message.Name) + if err != nil { + return nil, err + } + if len(nodeStatus) == 0 { + nodeList := controller.ValidateNode(message) + if len(nodeList) == 0 { + return nil, fmt.Errorf("no node need to be upgrade") + } + nodeStatus = make([]v1alpha1.TaskStatus, len(nodeList)) + for i, node := range nodeList { + nodeStatus[i] = v1alpha1.TaskStatus{NodeName: node.Name} + } + err = controller.UpdateNodeStatus(message.Name, nodeStatus) + if err != nil { + return nil, err + } + } + e := &Executor{ + task: message, + statusChan: make(chan *v1alpha1.TaskStatus, 10), + nodes: nodeStatus, + controller: controller, + maxFailedNodes: float64(len(nodeStatus)) * (message.FailureTolerate), + failedNodes: map[string]bool{}, + workers: workers{ + number: int(message.Concurrency), + jobs: make(map[string]int), + shuttingDown: false, + Mutex: sync.Mutex{}, + }, + } + go e.start() + executorMachine.executors[fmt.Sprintf("%s::%s", message.Type, message.Name)] = e + return e, nil +} + +func (e *Executor) start() { + index, err := e.initWorker(0) + if err != nil { + klog.Errorf(err.Error()) + return + } + for { + select { + case <-beehiveContext.Done(): + klog.Info("stop sync tasks") + return + case status := <-e.statusChan: + if reflect.DeepEqual(*status, v1alpha1.TaskStatus{}) { + break + } + if !e.controller.StageCompleted(e.task.Name, status.State) { + break + } + var endNode int + endNode, err = e.workers.endJob(status.NodeName) + if err != nil { + klog.Errorf(err.Error()) + break + } + + e.nodes[endNode] = *status + err = e.dealFailedNode(*status) + if err != nil { + klog.Warning(err.Error()) + break + } + + if index >= len(e.nodes) { + if len(e.workers.jobs) != 0 { + break + } + var state api.State + state, err = e.completedTaskStage() + if err != nil { + klog.Errorf(err.Error()) + break + } + if fsm.TaskFinish(state) { + DeleteExecutor(e.task) + klog.Infof("task %s is finish", e.task.Name) + return + } + + // next stage + index = 0 + } + + index, err = e.initWorker(index) + if err != nil { + klog.Errorf(err.Error()) + } + } + } +} + +func (e *Executor) dealFailedNode(node v1alpha1.TaskStatus) error { + if node.State == api.TaskFailed { + e.failedNodes[node.NodeName] = true + } + if float64(len(e.failedNodes)) < e.maxFailedNodes { + return nil + } + e.workers.shuttingDown = true + if len(e.workers.jobs) > 0 { + klog.Warningf("wait for all workers(%d/%d) for task %s to finish running ", len(e.workers.jobs), e.workers.number, e.task.Name) + return nil + } + + errMsg := fmt.Sprintf("the number of failed nodes is %d/%d, which exceeds the failure tolerance threshold.", len(e.failedNodes), len(e.nodes)) + _, err := e.controller.ReportTaskStatus(e.task.Name, fsm.Event{ + Type: node.Event, + Action: api.ActionFailure, + Msg: errMsg, + }) + if err != nil { + return fmt.Errorf("%s, report status failed, %s", errMsg, err.Error()) + } + return fmt.Errorf(errMsg) +} + +func (e *Executor) completedTaskStage() (api.State, error) { + var event = e.nodes[0].Event + for _, node := range e.nodes { + if node.State != api.TaskFailed { + event = node.Event + break + } + } + state, err := e.controller.ReportTaskStatus(e.task.Name, fsm.Event{ + Type: event, + Action: api.ActionSuccess, + }) + if err != nil { + return "", err + } + return state, nil +} + +func (e *Executor) initWorker(index int) (int, error) { + for ; index < len(e.nodes); index++ { + node := e.nodes[index] + if e.controller.StageCompleted(e.task.Name, node.State) { + err := e.dealFailedNode(node) + if err != nil { + return 0, err + } + continue + } + err := e.workers.addJob(node, index, e) + if err != nil { + klog.V(4).Info(err.Error()) + break + } + } + return index, nil +} + +type workers struct { + number int + jobs map[string]int + sync.Mutex + shuttingDown bool +} + +func (w *workers) addJob(node v1alpha1.TaskStatus, index int, e *Executor) error { + if w.shuttingDown { + return fmt.Errorf("workers is stopped") + } + w.Lock() + if len(w.jobs) >= w.number { + w.Unlock() + return fmt.Errorf("workers are all running, %v/%v", len(w.jobs), w.number) + } + w.jobs[node.NodeName] = index + w.Unlock() + msg := e.initMessage(node) + go e.handelTimeOutJob(index) + executorMachine.downStreamChan <- *msg + return nil +} + +func (e *Executor) handelTimeOutJob(index int) { + lastState := e.nodes[index].State + timeoutSecond := *e.task.TimeOutSeconds + if timeoutSecond == 0 { + timeoutSecond = TimeOutSecond + } + err := wait.Poll(1*time.Second, time.Duration(timeoutSecond)*time.Second, func() (bool, error) { + if lastState != e.nodes[index].State || fsm.TaskFinish(e.nodes[index].State) { + return true, nil + } + klog.V(4).Infof("node %s stage is not completed", e.nodes[index].NodeName) + return false, nil + }) + if err != nil { + _, err = e.controller.ReportNodeStatus(e.task.Name, e.nodes[index].NodeName, fsm.Event{ + Type: api.EventTimeOut, + Action: api.ActionFailure, + Msg: fmt.Sprintf("node task %s execution timeout, %s", lastState, err.Error()), + }) + if err != nil { + klog.Warningf(err.Error()) + } + } +} + +func (w *workers) endJob(job string) (int, error) { + index, ok := w.jobs[job] + if !ok { + return index, fmt.Errorf("end job %s error, job not exist", job) + } + w.Lock() + delete(w.jobs, job) + w.Unlock() + return index, nil +} + +func buildTaskResource(task, taskID, nodeID string) string { + resource := strings.Join([]string{task, taskID, "node", nodeID}, constants.ResourceSep) + return resource +} + +func buildUpgradeResource(upgradeID, nodeID string) string { + resource := strings.Join([]string{util.TaskUpgrade, upgradeID, "node", nodeID}, constants.ResourceSep) + return resource +} diff --git a/cloud/pkg/imageprepullcontroller/controller/upstream.go b/cloud/pkg/taskmanager/manager/upstream.go index 98fd28bea..b94370cf3 100644 --- a/cloud/pkg/imageprepullcontroller/controller/upstream.go +++ b/cloud/pkg/taskmanager/manager/upstream.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package controller +package manager import ( "encoding/json" @@ -28,15 +28,17 @@ import ( keclient "github.com/kubeedge/kubeedge/cloud/pkg/common/client" "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer" - "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/config" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/config" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/controller" "github.com/kubeedge/kubeedge/common/types" - "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" + "github.com/kubeedge/kubeedge/pkg/util/fsm" ) // UpstreamController subscribe messages from edge and sync to k8s api server type UpstreamController struct { - // downstream controller to update ImagePrePullJob status in cache + // downstream controller to update NodeUpgradeJob status in cache dc *DownstreamController kubeClient kubernetes.Interface @@ -44,103 +46,99 @@ type UpstreamController struct { crdClient crdClientset.Interface messageLayer messagelayer.MessageLayer // message channel - imagePrePullJobStatusChan chan model.Message + taskStatusChan chan model.Message } -// Start imageprepull UpstreamController +// Start UpstreamController func (uc *UpstreamController) Start() error { - klog.Info("Start ImagePrePullJob Upstream Controller") + klog.Info("Start Task Upstream Controller") - uc.imagePrePullJobStatusChan = make(chan model.Message, config.Config.Buffer.ImagePrePullJobStatus) + uc.taskStatusChan = make(chan model.Message, config.Config.Buffer.TaskStatus) go uc.dispatchMessage() - for i := 0; i < int(config.Config.Load.ImagePrePullJobWorkers); i++ { - go uc.updateImagePrePullJobStatus() + for i := 0; i < int(config.Config.Load.TaskWorkers); i++ { + go uc.updateTaskStatus() } return nil } -// updateImagePrePullJobStatus update imagePrePullJob status -func (uc *UpstreamController) updateImagePrePullJobStatus() { +// Start UpstreamController +func (uc *UpstreamController) dispatchMessage() { + for { + select { + case <-beehiveContext.Done(): + klog.Info("Stop dispatch task upstream message") + return + default: + } + + msg, err := uc.messageLayer.Receive() + if err != nil { + klog.Warningf("Receive message failed, %v", err) + continue + } + + klog.V(4).Infof("task upstream controller receive msg %#v", msg) + + uc.taskStatusChan <- msg + } +} + +// updateTaskStatus update NodeUpgradeJob status field +func (uc *UpstreamController) updateTaskStatus() { for { select { case <-beehiveContext.Done(): - klog.Info("Stop update ImagePrePullJob status") + klog.Info("Stop update NodeUpgradeJob status") return - case msg := <-uc.imagePrePullJobStatusChan: + case msg := <-uc.taskStatusChan: klog.V(4).Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource()) - nodeName, jobName, err := parsePrePullresource(msg.GetResource()) - if err != nil { - klog.Errorf("message resource %s is not supported", msg.GetResource()) - continue - } - oldValue, ok := uc.dc.imagePrePullJobManager.ImagePrePullMap.Load(jobName) - if !ok { - klog.Errorf("ImagePrePullJob %s not exist", jobName) - continue - } + // get nodeID and upgradeID from Upgrade msg: + nodeID := util.GetNodeName(msg.GetResource()) + taskID := util.GetTaskID(msg.GetResource()) - imagePrePull, ok := oldValue.(*v1alpha1.ImagePrePullJob) - if !ok { - klog.Errorf("ImagePrePullJob info %T is not valid", oldValue) + data, err := msg.GetContentData() + if err != nil { + klog.Errorf("failed to get node upgrade content data: %v", err) continue } - data, err := msg.GetContentData() + c, err := controller.GetController(msg.GetOperation()) if err != nil { - klog.Errorf("failed to get image prepull content from response msg, err: %v", err) + klog.Errorf("Failed to get controller: %v", err) continue } - resp := &types.ImagePrePullJobResponse{} - err = json.Unmarshal(data, resp) + + resp := types.NodeTaskResponse{} + err = json.Unmarshal(data, &resp) if err != nil { - klog.Errorf("Failed to unmarshal image prepull response: %v", err) + klog.Errorf("Failed to unmarshal node upgrade response: %v", err) continue } - - status := &v1alpha1.ImagePrePullStatus{ - NodeName: nodeName, - State: resp.State, - Reason: resp.Reason, - ImageStatus: resp.ImageStatus, + event := fsm.Event{ + Type: resp.Event, + Action: resp.Action, + Msg: resp.Reason, + ExternalMessage: resp.ExternalMessage, } - err = patchImagePrePullStatus(uc.crdClient, imagePrePull, status) + + _, err = c.ReportNodeStatus(taskID, nodeID, event) if err != nil { - klog.Errorf("Failed to patch ImagePrePullJob status, err: %v", err) + klog.Errorf("Failed to report status: %v", err) + continue } } } } -// dispatchMessage receive the message from edge and write into imagePrePullJobStatusChan -func (uc *UpstreamController) dispatchMessage() { - for { - select { - case <-beehiveContext.Done(): - klog.Info("Stop dispatch ImagePrePullJob upstream message") - return - default: - } - - msg, err := uc.messageLayer.Receive() - if err != nil { - klog.Warningf("Receive message failed, %v", err) - continue - } - - klog.V(4).Infof("ImagePrePullJob upstream controller receive msg %#v", msg) - uc.imagePrePullJobStatusChan <- msg - } -} - // NewUpstreamController create UpstreamController from config func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) { uc := &UpstreamController{ kubeClient: keclient.GetKubeClient(), informer: informers.GetInformersManager().GetKubeInformerFactory(), crdClient: keclient.GetCRDClient(), - messageLayer: messagelayer.ImagePrePullControllerMessageLayer(), + messageLayer: messagelayer.TaskManagerMessageLayer(), dc: dc, } return uc, nil diff --git a/cloud/pkg/taskmanager/nodeupgradecontroller/node_upgrade_controller.go b/cloud/pkg/taskmanager/nodeupgradecontroller/node_upgrade_controller.go new file mode 100644 index 000000000..649b95cbb --- /dev/null +++ b/cloud/pkg/taskmanager/nodeupgradecontroller/node_upgrade_controller.go @@ -0,0 +1,386 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeupgradecontroller + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strconv" + "strings" + "sync" + "time" + + jsonpatch "github.com/evanphx/json-patch" + "github.com/google/uuid" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apimachineryType "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2" + + beehiveContext "github.com/kubeedge/beehive/pkg/core/context" + "github.com/kubeedge/kubeedge/cloud/pkg/common/client" + keclient "github.com/kubeedge/kubeedge/cloud/pkg/common/client" + "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/controller" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/manager" + commontypes "github.com/kubeedge/kubeedge/common/types" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" + "github.com/kubeedge/kubeedge/pkg/util/fsm" +) + +const NodeUpgrade = "NodeUpgradeController" + +type NodeUpgradeController struct { + sync.Mutex + *controller.BaseController +} + +var cache *manager.TaskCache + +func NewNodeUpgradeController(messageChan chan util.TaskMessage) (*NodeUpgradeController, error) { + var err error + cache, err = manager.NewTaskCache( + informers.GetInformersManager().GetKubeEdgeInformerFactory().Operations().V1alpha1().NodeUpgradeJobs().Informer()) + if err != nil { + klog.Warningf("Create NodeUpgradeJob manager failed with error: %s", err) + return nil, err + } + return &NodeUpgradeController{ + BaseController: &controller.BaseController{ + Informer: informers.GetInformersManager().GetKubeInformerFactory(), + TaskManager: cache, + MessageChan: messageChan, + CrdClient: client.GetCRDClient(), + KubeClient: keclient.GetKubeClient(), + }, + }, nil +} + +func (ndc *NodeUpgradeController) ReportNodeStatus(taskID, nodeID string, event fsm.Event) (api.State, error) { + nodeFSM := NewUpgradeNodeFSM(taskID, nodeID) + err := nodeFSM.AllowTransit(event) + if err != nil { + return "", err + } + state, err := nodeFSM.CurrentState() + if err != nil { + return "", err + } + ndc.Lock() + defer ndc.Unlock() + err = nodeFSM.Transit(event) + if err != nil { + return "", err + } + checkStatusChanged(nodeFSM, state) + state, err = nodeFSM.CurrentState() + if err != nil { + return "", err + } + return state, nil +} + +func checkStatusChanged(nodeFSM *fsm.FSM, state api.State) { + err := wait.Poll(100*time.Millisecond, time.Second, func() (bool, error) { + nowState, err := nodeFSM.CurrentState() + if err != nil { + return false, nil + } + if nowState == state { + return false, nil + } + return true, err + }) + if err != nil { + klog.V(4).Infof("check status changed failed: %s", err.Error()) + } +} + +func (ndc *NodeUpgradeController) ReportTaskStatus(taskID string, event fsm.Event) (api.State, error) { + taskFSM := NewUpgradeTaskFSM(taskID) + state, err := taskFSM.CurrentState() + if err != nil { + return "", err + } + err = taskFSM.AllowTransit(event) + if err != nil { + return "", err + } + err = taskFSM.Transit(event) + if err != nil { + return "", err + } + checkStatusChanged(taskFSM, state) + return taskFSM.CurrentState() +} + +func (ndc *NodeUpgradeController) ValidateNode(taskMessage util.TaskMessage) []v1.Node { + var validateNodes []v1.Node + nodes := ndc.BaseController.ValidateNode(taskMessage) + req, ok := taskMessage.Msg.(commontypes.NodeUpgradeJobRequest) + if !ok { + klog.Errorf("convert message to commontypes.NodeUpgradeJobRequest failed") + return nil + } + for _, node := range nodes { + if needUpgrade(node, req.Version) { + validateNodes = append(validateNodes, node) + } + } + return validateNodes +} + +func (ndc *NodeUpgradeController) StageCompleted(taskID string, state api.State) bool { + taskFSM := NewUpgradeTaskFSM(taskID) + return taskFSM.TaskStagCompleted(state) +} + +func (ndc *NodeUpgradeController) GetNodeStatus(name string) ([]v1alpha1.TaskStatus, error) { + nodeUpgrade, err := ndc.CrdClient.OperationsV1alpha1().NodeUpgradeJobs().Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return nodeUpgrade.Status.Status, nil +} + +func (ndc *NodeUpgradeController) GetNodeVersion(name string) (string, error) { + node, err := ndc.Informer.Core().V1().Nodes().Lister().Get(name) + if err != nil { + return "", err + } + strs := strings.Split(node.Status.NodeInfo.KubeletVersion, "-") + return strs[2], nil +} + +func (ndc *NodeUpgradeController) UpdateNodeStatus(name string, nodeStatus []v1alpha1.TaskStatus) error { + nodeUpgrade, err := ndc.CrdClient.OperationsV1alpha1().NodeUpgradeJobs().Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return err + } + status := nodeUpgrade.Status + status.Status = nodeStatus + err = patchStatus(nodeUpgrade, status, ndc.CrdClient) + if err != nil { + return err + } + return nil +} + +func patchStatus(nodeUpgrade *v1alpha1.NodeUpgradeJob, status v1alpha1.NodeUpgradeJobStatus, crdClient crdClientset.Interface) error { + oldData, err := json.Marshal(nodeUpgrade) + if err != nil { + return fmt.Errorf("failed to marshal the old NodeUpgradeJob(%s): %v", nodeUpgrade.Name, err) + } + nodeUpgrade.Status = status + newData, err := json.Marshal(nodeUpgrade) + if err != nil { + return fmt.Errorf("failed to marshal the new NodeUpgradeJob(%s): %v", nodeUpgrade.Name, err) + } + + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return fmt.Errorf("failed to create a merge patch: %v", err) + } + + result, err := crdClient.OperationsV1alpha1().NodeUpgradeJobs().Patch(context.TODO(), nodeUpgrade.Name, apimachineryType.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") + if err != nil { + return fmt.Errorf("failed to patch update NodeUpgradeJob status: %v", err) + } + klog.V(4).Info("patch upgrade task status result: ", result) + return nil +} + +func (ndc *NodeUpgradeController) Start() error { + go ndc.startSync() + return nil +} + +func (ndc *NodeUpgradeController) startSync() { + nodeUpgradeList, err := ndc.CrdClient.OperationsV1alpha1().NodeUpgradeJobs().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + klog.Errorf(err.Error()) + os.Exit(2) + } + for _, nodeUpgrade := range nodeUpgradeList.Items { + if fsm.TaskFinish(nodeUpgrade.Status.State) { + continue + } + ndc.nodeUpgradeJobAdded(&nodeUpgrade) + } + for { + select { + case <-beehiveContext.Done(): + klog.Info("stop sync NodeUpgradeJob") + return + case e := <-ndc.TaskManager.Events(): + upgrade, ok := e.Object.(*v1alpha1.NodeUpgradeJob) + if !ok { + klog.Warningf("object type: %T unsupported", e.Object) + continue + } + switch e.Type { + case watch.Added: + ndc.nodeUpgradeJobAdded(upgrade) + case watch.Deleted: + ndc.nodeUpgradeJobDeleted(upgrade) + case watch.Modified: + ndc.nodeUpgradeJobUpdated(upgrade) + default: + klog.Warningf("NodeUpgradeJob event type: %s unsupported", e.Type) + } + } + } +} + +// nodeUpgradeJobAdded is used to process addition of new NodeUpgradeJob in apiserver +func (ndc *NodeUpgradeController) nodeUpgradeJobAdded(upgrade *v1alpha1.NodeUpgradeJob) { + klog.V(4).Infof("add NodeUpgradeJob: %v", upgrade) + // store in cache map + ndc.TaskManager.CacheMap.Store(upgrade.Name, upgrade) + + // If all or partial edge nodes upgrade is upgrading or completed, we don't need to send upgrade message + if fsm.TaskFinish(upgrade.Status.State) { + klog.Warning("The nodeUpgradeJob is completed, don't send upgrade message again") + return + } + + ndc.processUpgrade(upgrade) +} + +// processUpgrade do the upgrade operation on node +func (ndc *NodeUpgradeController) processUpgrade(upgrade *v1alpha1.NodeUpgradeJob) { + // if users specify Image, we'll use upgrade Version as its image tag, even though Image contains tag. + // if not, we'll use default image: kubeedge/installation-package:${Version} + var repo string + var err error + repo = "kubeedge/installation-package" + if upgrade.Spec.Image != "" { + repo, err = util.GetImageRepo(upgrade.Spec.Image) + if err != nil { + klog.Errorf("Image format is not right: %v", err) + return + } + } + imageTag := upgrade.Spec.Version + image := fmt.Sprintf("%s:%s", repo, imageTag) + + upgradeReq := commontypes.NodeUpgradeJobRequest{ + UpgradeID: upgrade.Name, + HistoryID: uuid.New().String(), + Version: upgrade.Spec.Version, + Image: image, + } + + tolerate, err := strconv.ParseFloat(upgrade.Spec.FailureTolerate, 64) + if err != nil { + klog.Errorf("convert FailureTolerate to float64 failed: %v", err) + tolerate = 0.1 + } + + concurrency := upgrade.Spec.Concurrency + if concurrency <= 0 { + concurrency = 1 + } + klog.V(4).Infof("deal task message: %v", upgrade) + ndc.MessageChan <- util.TaskMessage{ + Type: util.TaskUpgrade, + CheckItem: upgrade.Spec.CheckItems, + Name: upgrade.Name, + TimeOutSeconds: upgrade.Spec.TimeoutSeconds, + Concurrency: concurrency, + FailureTolerate: tolerate, + NodeNames: upgrade.Spec.NodeNames, + LabelSelector: upgrade.Spec.LabelSelector, + Status: v1alpha1.TaskStatus{}, + Msg: upgradeReq, + } +} + +func needUpgrade(node v1.Node, upgradeVersion string) bool { + if util.FilterVersion(node.Status.NodeInfo.KubeletVersion, upgradeVersion) { + klog.Warningf("Node(%s) version(%s) already on the expected version %s.", node.Name, node.Status.NodeInfo.KubeletVersion, upgradeVersion) + return false + } + + // if node is in Upgrading state, don't need upgrade + if _, ok := node.Labels[util.NodeUpgradeJobStatusKey]; ok { + klog.Warningf("Node(%s) is in upgrade state", node.Name) + return false + } + + return true +} + +// nodeUpgradeJobDeleted is used to process deleted NodeUpgradeJob in apiserver +func (ndc *NodeUpgradeController) nodeUpgradeJobDeleted(upgrade *v1alpha1.NodeUpgradeJob) { + // just need to delete from cache map + ndc.TaskManager.CacheMap.Delete(upgrade.Name) + klog.Errorf("upgrade job %s delete", upgrade.Name) + ndc.MessageChan <- util.TaskMessage{ + Type: util.TaskUpgrade, + Name: upgrade.Name, + ShutDown: true, + } +} + +// upgradeAdded is used to process update of new NodeUpgradeJob in apiserver +func (ndc *NodeUpgradeController) nodeUpgradeJobUpdated(upgrade *v1alpha1.NodeUpgradeJob) { + oldValue, ok := ndc.TaskManager.CacheMap.Load(upgrade.Name) + old := oldValue.(*v1alpha1.NodeUpgradeJob) + if !ok { + klog.Infof("Upgrade %s not exist, and store it first", upgrade.Name) + // If Upgrade not present in Upgrade map means it is not modified and added. + ndc.nodeUpgradeJobAdded(upgrade) + return + } + + // store in cache map + ndc.TaskManager.CacheMap.Store(upgrade.Name, upgrade) + + node := checkUpdateNode(old, upgrade) + if node == nil { + klog.Info("none node update") + return + } + + ndc.MessageChan <- util.TaskMessage{ + Type: util.TaskUpgrade, + Name: upgrade.Name, + Status: *node, + } +} + +func checkUpdateNode(old, new *v1alpha1.NodeUpgradeJob) *v1alpha1.TaskStatus { + if len(old.Status.Status) == 0 { + return nil + } + for i, updateNode := range new.Status.Status { + oldNode := old.Status.Status[i] + if !util.NodeUpdated(oldNode, updateNode) { + continue + } + return &updateNode + } + return nil +} diff --git a/cloud/pkg/taskmanager/nodeupgradecontroller/upgrade_task.go b/cloud/pkg/taskmanager/nodeupgradecontroller/upgrade_task.go new file mode 100644 index 000000000..6f28eb9ae --- /dev/null +++ b/cloud/pkg/taskmanager/nodeupgradecontroller/upgrade_task.go @@ -0,0 +1,122 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeupgradecontroller + +import ( + "fmt" + "time" + + "github.com/kubeedge/kubeedge/cloud/pkg/common/client" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util" + fsmapi "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + v1alpha12 "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" +) + +func currentUpgradeNodeState(id, nodeName string) (v1alpha12.State, error) { + v, ok := cache.CacheMap.Load(id) + if !ok { + return "", fmt.Errorf("can not find task %s", id) + } + task := v.(*v1alpha1.NodeUpgradeJob) + var state v1alpha12.State + for _, status := range task.Status.Status { + if status.NodeName == nodeName { + state = status.State + break + } + } + if state == "" { + state = v1alpha12.TaskInit + } + return state, nil +} + +func updateUpgradeNodeState(id, nodeName string, state v1alpha12.State, event fsm.Event) error { + v, ok := cache.CacheMap.Load(id) + if !ok { + return fmt.Errorf("can not find task %s", id) + } + task := v.(*v1alpha1.NodeUpgradeJob) + newTask := task.DeepCopy() + status := newTask.Status.DeepCopy() + for i, nodeStatus := range status.Status { + if nodeStatus.NodeName == nodeName { + status.Status[i] = v1alpha1.TaskStatus{ + NodeName: nodeName, + State: state, + Event: event.Type, + Action: event.Action, + Time: time.Now().Format(util.ISO8601UTC), + Reason: event.Msg, + } + break + } + } + err := patchStatus(newTask, *status, client.GetCRDClient()) + if err != nil { + return err + } + return nil +} + +func NewUpgradeNodeFSM(taskName, nodeName string) *fsm.FSM { + fsm := &fsm.FSM{} + return fsm.NodeName(nodeName).ID(taskName).Guard(fsmapi.UpgradeRule).StageSequence(fsmapi.UpdateStageSequence).CurrentFunc(currentUpgradeNodeState).UpdateFunc(updateUpgradeNodeState) +} + +func NewUpgradeTaskFSM(taskName string) *fsm.FSM { + fsm := &fsm.FSM{} + return fsm.ID(taskName).Guard(fsmapi.UpgradeRule).StageSequence(fsmapi.UpdateStageSequence).CurrentFunc(currentUpgradeTaskState).UpdateFunc(updateUpgradeTaskState) +} + +func currentUpgradeTaskState(id, _ string) (v1alpha12.State, error) { + v, ok := cache.CacheMap.Load(id) + if !ok { + return "", fmt.Errorf("can not find task %s", id) + } + task := v.(*v1alpha1.NodeUpgradeJob) + state := task.Status.State + if state == "" { + state = v1alpha12.TaskInit + } + return state, nil +} + +func updateUpgradeTaskState(id, _ string, state v1alpha12.State, event fsm.Event) error { + v, ok := cache.CacheMap.Load(id) + if !ok { + return fmt.Errorf("can not find task %s", id) + } + task := v.(*v1alpha1.NodeUpgradeJob) + newTask := task.DeepCopy() + status := newTask.Status.DeepCopy() + + status.Event = event.Type + status.Action = event.Action + status.Reason = event.Msg + status.State = state + status.Time = time.Now().Format(util.ISO8601UTC) + + err := patchStatus(newTask, *status, client.GetCRDClient()) + + if err != nil { + return err + } + return nil +} diff --git a/cloud/pkg/taskmanager/task_manager.go b/cloud/pkg/taskmanager/task_manager.go new file mode 100644 index 000000000..425ecff64 --- /dev/null +++ b/cloud/pkg/taskmanager/task_manager.go @@ -0,0 +1,123 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package taskmanager + +import ( + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/beehive/pkg/core" + "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/config" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/imageprepullcontroller" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/manager" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/nodeupgradecontroller" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/controller" + "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1" +) + +type TaskManager struct { + downstream *manager.DownstreamController + executorMachine *manager.ExecutorMachine + upstream *manager.UpstreamController + enable bool +} + +var _ core.Module = (*TaskManager)(nil) + +func newTaskManager(enable bool) *TaskManager { + if !enable { + return &TaskManager{enable: enable} + } + taskMessage := make(chan util.TaskMessage, 10) + downStreamMessage := make(chan model.Message, 10) + downstream, err := manager.NewDownstreamController(downStreamMessage) + if err != nil { + klog.Exitf("New task manager downstream failed with error: %s", err) + } + upstream, err := manager.NewUpstreamController(downstream) + if err != nil { + klog.Exitf("New task manager upstream failed with error: %s", err) + } + executorMachine, err := manager.NewExecutorMachine(taskMessage, downStreamMessage) + if err != nil { + klog.Exitf("New executor machine failed with error: %s", err) + } + + upgradeNodeController, err := nodeupgradecontroller.NewNodeUpgradeController(taskMessage) + if err != nil { + klog.Exitf("New upgrade node controller failed with error: %s", err) + } + + imagePrePullController, err := imageprepullcontroller.NewImagePrePullController(taskMessage) + if err != nil { + klog.Exitf("New upgrade node controller failed with error: %s", err) + } + controller.Register(util.TaskUpgrade, upgradeNodeController) + controller.Register(util.TaskPrePull, imagePrePullController) + + return &TaskManager{ + downstream: downstream, + executorMachine: executorMachine, + upstream: upstream, + enable: enable, + } +} + +func Register(dc *v1alpha1.TaskManager) { + config.InitConfigure(dc) + core.Register(newTaskManager(dc.Enable)) + //core.Register(newNodeUpgradeJobController()) +} + +// Name of controller +func (uc *TaskManager) Name() string { + return modules.TaskManagerModuleName +} + +// Group of controller +func (uc *TaskManager) Group() string { + return modules.TaskManagerModuleGroup +} + +// Enable indicates whether enable this module +func (uc *TaskManager) Enable() bool { + return uc.enable +} + +// Start controller +func (uc *TaskManager) Start() { + if err := uc.downstream.Start(); err != nil { + klog.Exitf("start task manager downstream failed with error: %s", err) + } + // wait for downstream controller to start and load NodeUpgradeJob + // TODO think about sync + time.Sleep(1 * time.Second) + if err := uc.upstream.Start(); err != nil { + klog.Exitf("start task manager upstream failed with error: %s", err) + } + if err := uc.executorMachine.Start(); err != nil { + klog.Exitf("start task manager executorMachine failed with error: %s", err) + } + + if err := controller.StartAllController(); err != nil { + klog.Exitf("start controller failed with error: %s", err) + } +} diff --git a/cloud/pkg/taskmanager/util/controller/controller.go b/cloud/pkg/taskmanager/util/controller/controller.go new file mode 100644 index 000000000..78259d3ab --- /dev/null +++ b/cloud/pkg/taskmanager/util/controller/controller.go @@ -0,0 +1,169 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sinformer "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/manager" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" + "github.com/kubeedge/kubeedge/pkg/util/fsm" +) + +type Controller interface { + Name() string + Start() error + ReportNodeStatus(string, string, fsm.Event) (api.State, error) + ReportTaskStatus(string, fsm.Event) (api.State, error) + ValidateNode(util.TaskMessage) []v1.Node + GetNodeStatus(string) ([]v1alpha1.TaskStatus, error) + UpdateNodeStatus(string, []v1alpha1.TaskStatus) error + StageCompleted(taskID string, state api.State) bool +} + +type BaseController struct { + name string + Informer k8sinformer.SharedInformerFactory + TaskManager *manager.TaskCache + MessageChan chan util.TaskMessage + KubeClient kubernetes.Interface + CrdClient crdClientset.Interface +} + +func (bc *BaseController) Name() string { + return bc.name +} + +func (bc *BaseController) Start() error { + return fmt.Errorf("controller not implemented") +} + +func (bc *BaseController) StageCompleted(taskID string, state api.State) bool { + return false +} + +func (bc *BaseController) ValidateNode(taskMessage util.TaskMessage) []v1.Node { + var validateNodes []v1.Node + nodes, err := bc.getNodeList(taskMessage.NodeNames, taskMessage.LabelSelector) + if err != nil { + klog.Warningf("get node list error: %s", err.Error()) + return nil + } + for _, node := range nodes { + if !util.IsEdgeNode(node) { + klog.Warningf("Node(%s) is not edge node", node.Name) + continue + } + ready := isNodeReady(node) + if !ready { + continue + } + validateNodes = append(validateNodes, *node) + } + return validateNodes +} + +func (bc *BaseController) GetNodeStatus(name string) ([]v1alpha1.TaskStatus, error) { + return nil, fmt.Errorf("function GetNodeStatus need to be init") +} + +func (bc *BaseController) UpdateNodeStatus(name string, status []v1alpha1.TaskStatus) error { + return fmt.Errorf("function UpdateNodeStatus need to be init") +} + +func isNodeReady(node *v1.Node) bool { + for _, condition := range node.Status.Conditions { + if condition.Type == v1.NodeReady && condition.Status != v1.ConditionTrue { + klog.Warningf("Node(%s) is in NotReady state", node.Name) + return false + } + } + return true +} + +func (bc *BaseController) ReportNodeStatus(taskID string, nodeID string, event fsm.Event) (api.State, error) { + return "", fmt.Errorf("function ReportNodeStatus need to be init") +} + +func (bc *BaseController) ReportTaskStatus(taskID string, event fsm.Event) (api.State, error) { + return "", fmt.Errorf("function ReportTaskStatus need to be init") +} + +var ( + controllers = map[string]Controller{} +) + +func Register(name string, controller Controller) { + if _, ok := controllers[name]; ok { + klog.Warning("controller %s exists ", name) + } + controllers[name] = controller +} + +func StartAllController() error { + for name, controller := range controllers { + err := controller.Start() + if err != nil { + return fmt.Errorf("start %s controller failed: %s", name, err.Error()) + } + } + return nil +} + +func GetController(name string) (Controller, error) { + controller, ok := controllers[name] + if !ok { + return nil, fmt.Errorf("controller %s is not registered", name) + } + return controller, nil +} + +func (bc *BaseController) getNodeList(nodeNames []string, labelSelector *metav1.LabelSelector) ([]*v1.Node, error) { + var nodesToUpgrade []*v1.Node + + if len(nodeNames) != 0 { + for _, name := range nodeNames { + node, err := bc.Informer.Core().V1().Nodes().Lister().Get(name) + if err != nil { + return nil, fmt.Errorf("failed to get node with name %s: %v", name, err) + } + nodesToUpgrade = append(nodesToUpgrade, node) + } + } else if labelSelector != nil { + selector, err := metav1.LabelSelectorAsSelector(labelSelector) + if err != nil { + return nil, fmt.Errorf("labelSelector(%s) is not valid: %v", labelSelector, err) + } + + nodes, err := bc.Informer.Core().V1().Nodes().Lister().List(selector) + if err != nil { + return nil, fmt.Errorf("failed to get nodes with label %s: %v", selector.String(), err) + } + nodesToUpgrade = nodes + } + + return nodesToUpgrade, nil +} diff --git a/cloud/pkg/nodeupgradejobcontroller/manager/common.go b/cloud/pkg/taskmanager/util/manager/common.go index 6e8008abf..59200daa3 100644 --- a/cloud/pkg/nodeupgradejobcontroller/manager/common.go +++ b/cloud/pkg/taskmanager/util/manager/common.go @@ -1,5 +1,5 @@ /* -Copyright 2022 The KubeEdge Authors. +Copyright 2023 The KubeEdge Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/cloud/pkg/nodeupgradejobcontroller/manager/nodeupgrade.go b/cloud/pkg/taskmanager/util/manager/task_cache.go index 7cdb78e1a..9614c7732 100644 --- a/cloud/pkg/nodeupgradejobcontroller/manager/nodeupgrade.go +++ b/cloud/pkg/taskmanager/util/manager/task_cache.go @@ -1,5 +1,5 @@ /* -Copyright 2022 The KubeEdge Authors. +Copyright 2023 The KubeEdge Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -22,31 +22,31 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" - "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller/config" + "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/config" ) -// NodeUpgradeJobManager is a manager watch NodeUpgradeJob change event -type NodeUpgradeJobManager struct { +// TaskCache is a manager watch CRD change event +type TaskCache struct { // events from watch kubernetes api server events chan watch.Event - // UpgradeMap, key is NodeUpgradeJob.Name, value is *v1alpha1.NodeUpgradeJob{} - UpgradeMap sync.Map + // CacheMap, key is NodeUpgradeJob.Name, value is *v1alpha1.NodeUpgradeJob{} + CacheMap sync.Map } // Events return a channel, can receive all NodeUpgradeJob event -func (dmm *NodeUpgradeJobManager) Events() chan watch.Event { +func (dmm *TaskCache) Events() chan watch.Event { return dmm.events } -// NewNodeUpgradeJobManager create NodeUpgradeJobManager from config -func NewNodeUpgradeJobManager(si cache.SharedIndexInformer) (*NodeUpgradeJobManager, error) { - events := make(chan watch.Event, config.Config.Buffer.NodeUpgradeJobEvent) +// NewTaskCache create TaskCache from config +func NewTaskCache(si cache.SharedIndexInformer) (*TaskCache, error) { + events := make(chan watch.Event, config.Config.Buffer.TaskEvent) rh := NewCommonResourceEventHandler(events) _, err := si.AddEventHandler(rh) if err != nil { return nil, err } - return &NodeUpgradeJobManager{events: events}, nil + return &TaskCache{events: events}, nil } diff --git a/cloud/pkg/taskmanager/util/util.go b/cloud/pkg/taskmanager/util/util.go new file mode 100644 index 000000000..bf3aea571 --- /dev/null +++ b/cloud/pkg/taskmanager/util/util.go @@ -0,0 +1,181 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "strings" + + "github.com/distribution/distribution/v3/reference" + metav1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1" + versionutil "k8s.io/apimachinery/pkg/util/version" + "k8s.io/klog/v2" + + "github.com/kubeedge/kubeedge/common/constants" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" +) + +const ( + NodeUpgradeJobStatusKey = "nodeupgradejob.operations.kubeedge.io/status" + NodeUpgradeJobStatusValue = "" + NodeUpgradeHistoryKey = "nodeupgradejob.operations.kubeedge.io/history" +) + +const ( + TaskUpgrade = "upgrade" + TaskRollback = "rollback" + TaskBackup = "backup" + TaskPrePull = "prepull" + + ISO8601UTC = "2006-01-02T15:04:05Z" +) + +type TaskMessage struct { + Type string + Name string + TimeOutSeconds *uint32 + ShutDown bool + CheckItem []string + Concurrency int32 + FailureTolerate float64 + NodeNames []string + LabelSelector *v1.LabelSelector + Status v1alpha1.TaskStatus + Msg interface{} +} + +// FilterVersion returns true only if the edge node version already on the upgrade req +// version is like: v1.22.6-kubeedge-v1.10.0-beta.0.185+95378fb019912a, expected is like v1.10.0 +func FilterVersion(version string, expected string) bool { + // if not correct version format, also return true + strs := strings.Split(version, "-") + if len(strs) < 3 { + klog.Warningf("version format should be {k8s version}-kubeedge-{edgecore version}, but got : %s", version) + return true + } + + // filter nodes that already in the required version + less, err := VersionLess(strs[2], expected) + if err != nil { + klog.Warningf("version filter failed: %s", err.Error()) + less = false + } + return !less +} + +// IsEdgeNode checks whether a node is an Edge Node +// only if label {"node-role.kubernetes.io/edge": ""} exists, it is an edge node +func IsEdgeNode(node *metav1.Node) bool { + if node.Labels == nil { + return false + } + if _, ok := node.Labels[constants.EdgeNodeRoleKey]; !ok { + return false + } + + if node.Labels[constants.EdgeNodeRoleKey] != constants.EdgeNodeRoleValue { + return false + } + + return true +} + +// RemoveDuplicateElement deduplicate +func RemoveDuplicateElement(s []string) []string { + result := make([]string, 0, len(s)) + temp := make(map[string]struct{}, len(s)) + + for _, item := range s { + if _, ok := temp[item]; !ok { + temp[item] = struct{}{} + result = append(result, item) + } + } + + return result +} + +// MergeAnnotationUpgradeHistory constructs the new history based on the origin history +// and we'll only keep 3 records +func MergeAnnotationUpgradeHistory(origin, fromVersion, toVersion string) string { + newHistory := fmt.Sprintf("%s->%s", fromVersion, toVersion) + if origin == "" { + return newHistory + } + + sets := strings.Split(origin, ";") + if len(sets) > 2 { + sets = sets[1:] + } + + sets = append(sets, newHistory) + return strings.Join(sets, ";") +} + +// GetImageRepo gets repo from a container image +func GetImageRepo(image string) (string, error) { + named, err := reference.ParseNormalizedNamed(image) + if err != nil { + return "", fmt.Errorf("failed to parse image name: %v", err) + } + + return named.Name(), nil +} + +func GetNodeName(resource string) string { + // task/${TaskID}/node/${NodeID} + s := strings.Split(resource, "/") + return s[3] +} +func GetTaskID(resource string) string { + // task/${TaskID}/node/${NodeID} + s := strings.Split(resource, "/") + return s[1] +} + +func VersionLess(version1, version2 string) (bool, error) { + less := false + ver1, err := versionutil.ParseGeneric(version1) + if err != nil { + return less, fmt.Errorf("version1 error: %v", err) + } + ver2, err := versionutil.ParseGeneric(version2) + if err != nil { + return less, fmt.Errorf("version2 error: %v", err) + } + // If the remote Major version is bigger or if the Major versions are the same, + // but the remote Minor is bigger use the client version release. This handles Major bumps too. + if ver1.Major() < ver2.Major() || + (ver1.Major() == ver2.Major()) && ver1.Minor() < ver2.Minor() || + (ver1.Major() == ver2.Major() && ver1.Minor() == ver2.Minor()) && ver1.Patch() < ver2.Patch() { + less = true + } + return less, nil +} + +func NodeUpdated(old, new v1alpha1.TaskStatus) bool { + if old.NodeName != new.NodeName { + klog.V(4).Infof("old node %s and new node %s is not same", old.NodeName, new.NodeName) + return false + } + if old.State == new.State || new.State == "" { + klog.V(4).Infof("node %s state is not change", old.NodeName) + return false + } + return true +} diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go b/cloud/pkg/taskmanager/util/util_test.go index 499da62fa..9a13e2295 100644 --- a/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go +++ b/cloud/pkg/taskmanager/util/util_test.go @@ -1,5 +1,5 @@ /* -Copyright 2022 The KubeEdge Authors. +Copyright 2023 The KubeEdge Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,13 +14,11 @@ See the License for the specific language governing permissions and limitations under the License. */ -package controller +package util import ( "reflect" "testing" - - "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" ) func TestFilterVersion(t *testing.T) { @@ -40,13 +38,13 @@ func TestFilterVersion(t *testing.T) { name: "not match expected version", version: "v1.22.6-kubeedge-v1.10.0-beta.0.194+77ea462f402efb", expected: "v1.10.0", - expectResult: false, + expectResult: true, }, { name: "no right format version", version: "v1.22.6", expected: "v1.10.0", - expectResult: false, + expectResult: true, }, { name: "match expected version", @@ -58,7 +56,7 @@ func TestFilterVersion(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - result := filterVersion(test.version, test.expected) + result := FilterVersion(test.version, test.expected) if result != test.expectResult { t.Errorf("Got = %v, Want = %v", result, test.expectResult) } @@ -66,84 +64,34 @@ func TestFilterVersion(t *testing.T) { } } -func TestUpdateUpgradeStatus(t *testing.T) { - upgrade := v1alpha1.NodeUpgradeJob{ - Status: v1alpha1.NodeUpgradeJobStatus{ - Status: []v1alpha1.UpgradeStatus{ - { - NodeName: "edge-node", - State: v1alpha1.Completed, - History: v1alpha1.History{ - Reason: "the first upgrade", - UpgradeTime: "2023-09-22T17:33:00Z", - }, - }, - }, - }, - } - upgrade2 := upgrade.DeepCopy() - upgrade2.Status.Status[0].History.Reason = "the second upgrade" - - upgrade3 := upgrade.DeepCopy() - upgrade3.Status.Status = append(upgrade3.Status.Status, v1alpha1.UpgradeStatus{ - NodeName: "edge-node2", - State: v1alpha1.Completed, - History: v1alpha1.History{ - Reason: "the first upgrade", - UpgradeTime: "2023-09-22T17:35:00Z", - }, - }) - +func TestRemoveDuplicateElement(t *testing.T) { tests := []struct { name string - upgrade *v1alpha1.NodeUpgradeJob - status *v1alpha1.UpgradeStatus - expected *v1alpha1.NodeUpgradeJob + input []string + expected []string }{ { - name: "case1: first add one node", - upgrade: &v1alpha1.NodeUpgradeJob{}, - status: &v1alpha1.UpgradeStatus{ - NodeName: "edge-node", - State: v1alpha1.Completed, - History: v1alpha1.History{ - Reason: "the first upgrade", - UpgradeTime: "2023-09-22T17:33:00Z", - }, - }, - expected: upgrade.DeepCopy(), + name: "case 1", + input: []string{"a", "b", "c"}, + expected: []string{"a", "b", "c"}, }, { - name: "case2: add to one NOT exist node record", - upgrade: upgrade.DeepCopy(), - status: &v1alpha1.UpgradeStatus{ - NodeName: "edge-node2", - State: v1alpha1.Completed, - History: v1alpha1.History{ - Reason: "the first upgrade", - UpgradeTime: "2023-09-22T17:35:00Z", - }, - }, - expected: upgrade3, + name: "case 2", + input: []string{"a", "a", "b", "c", "b", "a", "a"}, + expected: []string{"a", "b", "c"}, }, { - name: "case3: add to one exist node record", - upgrade: upgrade.DeepCopy(), - status: &v1alpha1.UpgradeStatus{ - NodeName: "edge-node", - State: v1alpha1.Completed, - History: v1alpha1.History{ - Reason: "the second upgrade", - }, - }, - expected: upgrade2, + name: "case 3", + input: []string{}, + expected: []string{}, }, } + for _, test := range tests { t.Run(test.name, func(t *testing.T) { - newValue := UpdateNodeUpgradeJobStatus(test.upgrade, test.status) - if !reflect.DeepEqual(newValue, test.expected) { - t.Errorf("Got = %v, Want = %v", newValue, test.expected) + result := RemoveDuplicateElement(test.input) + if !reflect.DeepEqual(result, test.expected) { + t.Errorf("Got = %v, Want = %v", result, test.expected) } }) } @@ -182,7 +130,7 @@ func TestMergeAnnotationUpgradeHistory(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - result := mergeAnnotationUpgradeHistory(test.origin, test.fromVersion, test.toVersion) + result := MergeAnnotationUpgradeHistory(test.origin, test.fromVersion, test.toVersion) if result != test.expected { t.Errorf("Got = %v, Want = %v", result, test.expected) } diff --git a/common/constants/default.go b/common/constants/default.go index fab460eee..6b1e5c9da 100644 --- a/common/constants/default.go +++ b/common/constants/default.go @@ -28,6 +28,7 @@ const ( DefaultCAURL = "/ca.crt" DefaultCertURL = "/edge.crt" DefaultNodeUpgradeURL = "/nodeupgrade" + DefaultTaskStateReportURL = "/task/{taskType}/name/{taskID}/node/{nodeID}/status" DefaultServiceAccountIssuer = "https://kubernetes.default.svc.cluster.local" // Edged @@ -112,7 +113,7 @@ const ( DefaultDeviceModelEventBuffer = 1 DefaultUpdateDeviceStatusWorkers = 1 - // NodeUpgradeJobController + // TaskManager DefaultNodeUpgradeJobStatusBuffer = 1024 DefaultNodeUpgradeJobEventBuffer = 1 DefaultNodeUpgradeJobWorkers = 1 diff --git a/common/types/types.go b/common/types/types.go index e03667a1b..dfe14dad1 100644 --- a/common/types/types.go +++ b/common/types/types.go @@ -6,6 +6,7 @@ import ( metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" ) @@ -50,6 +51,38 @@ type NodeUpgradeJobResponse struct { Reason string } +// NodePreCheckRequest is pre-check msg coming from cloud to edge +type NodePreCheckRequest struct { + CheckItem []string +} + +type NodeTaskRequest struct { + TaskID string + Type string + State string + Item interface{} +} + +type NodeTaskResponse struct { + // NodeName is the name of edge node. + NodeName string + // State represents for the upgrade state phase of the edge node. + // There are several possible state values: "", Upgrading, BackingUp, RollingBack and Checking. + State api.State + // Event represents for the event of the ImagePrePullJob. + // There are three possible event values: Init, Check, Pull. + Event string + // Action represents for the action of the ImagePrePullJob. + // There are three possible action values: Success, Failure, TimeOut. + Action api.Action + // Reason represents for the reason of the ImagePrePullJob. + Reason string + // Time represents for the running time of the ImagePrePullJob. + Time string + + ExternalMessage string +} + // ObjectResp is the object that api-server response type ObjectResp struct { Object metaV1.Object @@ -68,7 +101,7 @@ type ImagePrePullJobRequest struct { // ImagePrePullJobResponse is used to report status msg to cloudhub https service from each node type ImagePrePullJobResponse struct { NodeName string - State v1alpha1.PrePullState + State string Reason string ImageStatus []v1alpha1.ImageStatus } diff --git a/edge/pkg/common/util/util.go b/edge/pkg/common/util/util.go new file mode 100644 index 000000000..ec978c191 --- /dev/null +++ b/edge/pkg/common/util/util.go @@ -0,0 +1,32 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + + beehiveContext "github.com/kubeedge/beehive/pkg/core/context" + "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/edge/pkg/common/modules" +) + +func ReportTaskResult(taskType, taskID string, resp types.NodeTaskResponse) { + msg := model.NewMessage("").SetRoute(modules.EdgeHubModuleName, modules.HubGroup). + SetResourceOperation(fmt.Sprintf("task/%s/node/%s", taskID, resp.NodeName), taskType).FillBody(resp) + beehiveContext.Send(modules.EdgeHubModuleName, *msg) +} diff --git a/edge/pkg/edgehub/edgehub.go b/edge/pkg/edgehub/edgehub.go index 2498d07f2..bf63f1f9e 100644 --- a/edge/pkg/edgehub/edgehub.go +++ b/edge/pkg/edgehub/edgehub.go @@ -13,8 +13,8 @@ import ( "github.com/kubeedge/kubeedge/edge/pkg/edgehub/certificate" "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients" "github.com/kubeedge/kubeedge/edge/pkg/edgehub/config" - // register Upgrade handler - _ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/upgrade" + // register Task handler + _ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/task" "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha2" ) diff --git a/edge/pkg/edgehub/task/task_handler.go b/edge/pkg/edgehub/task/task_handler.go new file mode 100644 index 000000000..ac42d64bb --- /dev/null +++ b/edge/pkg/edgehub/task/task_handler.go @@ -0,0 +1,78 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package task + +import ( + "encoding/json" + "fmt" + + "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + commontypes "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options" + "github.com/kubeedge/kubeedge/edge/pkg/common/util" + "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients" + "github.com/kubeedge/kubeedge/edge/pkg/edgehub/common/msghandler" + "github.com/kubeedge/kubeedge/edge/pkg/edgehub/task/taskexecutor" +) + +func init() { + handler := &taskHandler{} + msghandler.RegisterHandler(handler) +} + +type taskHandler struct{} + +func (th *taskHandler) Filter(message *model.Message) bool { + name := message.GetGroup() + return name == modules.TaskManagerModuleName +} + +func (th *taskHandler) Process(message *model.Message, clientHub clients.Adapter) error { + taskReq := &commontypes.NodeTaskRequest{} + data, err := message.GetContentData() + if err != nil { + return fmt.Errorf("failed to get content data: %v", err) + } + err = json.Unmarshal(data, taskReq) + if err != nil { + return fmt.Errorf("unmarshal failed: %v", err) + } + executor, err := taskexecutor.GetExecutor(taskReq.Type) + if err != nil { + return err + } + event, err := executor.Do(*taskReq) + if err != nil { + return err + } + + // use external tool like keadm + //Or the task needs to use the goroutine to control the message reply itself. + if event.Action == "" { + return nil + } + + resp := commontypes.NodeTaskResponse{ + NodeName: options.GetEdgeCoreConfig().Modules.Edged.HostnameOverride, + Event: event.Type, + Action: event.Action, + Reason: event.Msg, + } + util.ReportTaskResult(taskReq.Type, taskReq.TaskID, resp) + return nil +} diff --git a/edge/pkg/edgehub/task/taskexecutor/image_prepull.go b/edge/pkg/edgehub/task/taskexecutor/image_prepull.go new file mode 100644 index 000000000..a2b19c8e7 --- /dev/null +++ b/edge/pkg/edgehub/task/taskexecutor/image_prepull.go @@ -0,0 +1,184 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package taskexecutor + +import ( + "encoding/json" + "fmt" + "strings" + + v1 "k8s.io/api/core/v1" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/klog/v2" + + "github.com/kubeedge/kubeedge/common/constants" + "github.com/kubeedge/kubeedge/common/types" + commontypes "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options" + edgeutil "github.com/kubeedge/kubeedge/edge/pkg/common/util" + metaclient "github.com/kubeedge/kubeedge/edge/pkg/metamanager/client" + "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" +) + +const ( + TaskPrePull = "prepull" +) + +type PrePull struct { + *BaseExecutor +} + +func (p *PrePull) Name() string { + return p.name +} + +func NewPrePullExecutor() Executor { + methods := map[string]func(types.NodeTaskRequest) fsm.Event{ + string(api.TaskChecking): preCheck, + string(api.TaskInit): emptyInit, + "": emptyInit, + string(api.PullingState): pullImages, + } + return &PrePull{ + BaseExecutor: NewBaseExecutor(TaskPrePull, methods), + } +} + +func pullImages(taskReq types.NodeTaskRequest) fsm.Event { + event := fsm.Event{ + Type: "Pull", + Action: api.ActionSuccess, + } + + // get edgecore config + edgeCoreConfig := options.GetEdgeCoreConfig() + if edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint == "" { + edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint = edgeCoreConfig.Modules.Edged.RemoteRuntimeEndpoint + } + + // parse message request + prePullReq, err := getImagePrePullJobRequest(taskReq) + if err != nil { + event.Msg = err.Error() + event.Action = api.ActionFailure + return event + } + + // pull images + container, err := util.NewContainerRuntime(edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint, edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.CgroupDriver) + if err != nil { + event.Msg = err.Error() + event.Action = api.ActionFailure + return event + } + + go func() { + errorStr, imageStatus := prePullImages(*prePullReq, container) + if errorStr != "" { + event.Action = api.ActionFailure + event.Msg = errorStr + } + + data, err := json.Marshal(imageStatus) + if err != nil { + klog.Warningf("marshal imageStatus failed: %v", err) + } + resp := commontypes.NodeTaskResponse{ + NodeName: edgeCoreConfig.Modules.Edged.HostnameOverride, + Event: event.Type, + Action: event.Action, + Reason: event.Msg, + ExternalMessage: string(data), + } + edgeutil.ReportTaskResult(taskReq.Type, taskReq.TaskID, resp) + }() + return fsm.Event{} +} + +func getImagePrePullJobRequest(taskReq commontypes.NodeTaskRequest) (*commontypes.ImagePrePullJobRequest, error) { + var prePullReq commontypes.ImagePrePullJobRequest + data, err := json.Marshal(taskReq.Item) + if err != nil { + return nil, err + } + err = json.Unmarshal(data, &prePullReq) + if err != nil { + return nil, err + } + return &prePullReq, err +} + +func prePullImages(prePullReq commontypes.ImagePrePullJobRequest, container util.ContainerRuntime) (string, []v1alpha1.ImageStatus) { + errorStr := "" + authConfig, err := makeAuthConfig(prePullReq.Secret) + if err != nil { + return errorStr, []v1alpha1.ImageStatus{} + } + + var imageStatus []v1alpha1.ImageStatus + for _, image := range prePullReq.Images { + prePullStatus := v1alpha1.ImageStatus{ + Image: image, + } + for i := 0; i < int(prePullReq.RetryTimes); i++ { + err = container.PullImage(image, authConfig, nil) + if err == nil { + break + } + } + if err != nil { + klog.Errorf("pull image %s failed, err: %v", image, err) + errorStr = fmt.Sprintf("pull image failed, err: %v", err) + prePullStatus.State = api.TaskFailed + prePullStatus.Reason = err.Error() + } else { + klog.Infof("pull image %s successfully!", image) + prePullStatus.State = api.TaskSuccessful + } + imageStatus = append(imageStatus, prePullStatus) + } + + return errorStr, imageStatus +} + +func makeAuthConfig(pullsecret string) (*runtimeapi.AuthConfig, error) { + if pullsecret == "" { + return nil, nil + } + + secretSli := strings.Split(pullsecret, constants.ResourceSep) + if len(secretSli) != 2 { + return nil, fmt.Errorf("pull secret format is not correct") + } + + client := metaclient.New() + secret, err := client.Secrets(secretSli[0]).Get(secretSli[1]) + if err != nil { + return nil, fmt.Errorf("get secret %s failed, %v", secretSli[1], err) + } + + var auth runtimeapi.AuthConfig + err = json.Unmarshal(secret.Data[v1.DockerConfigJsonKey], &auth) + if err != nil { + return nil, fmt.Errorf("unmarshal secret %s to auth file failed, %v", secretSli[1], err) + } + + return &auth, nil +} diff --git a/edge/pkg/edgehub/task/taskexecutor/node_backup.go b/edge/pkg/edgehub/task/taskexecutor/node_backup.go new file mode 100644 index 000000000..1d4583978 --- /dev/null +++ b/edge/pkg/edgehub/task/taskexecutor/node_backup.go @@ -0,0 +1,106 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package taskexecutor + +import ( + "fmt" + "io" + "os" + "path/filepath" + + "k8s.io/klog/v2" + + "github.com/kubeedge/kubeedge/common/constants" + commontypes "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options" + "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" + "github.com/kubeedge/kubeedge/pkg/version" +) + +func backupNode(taskReq commontypes.NodeTaskRequest) (event fsm.Event) { + event = fsm.Event{ + Type: "Backup", + Action: api.ActionSuccess, + } + var err error + defer func() { + if err != nil { + event.Action = api.ActionFailure + event.Msg = err.Error() + } + }() + backupPath := filepath.Join(util.KubeEdgeBackupPath, version.Get().String()) + err = backup(backupPath) + if err != nil { + cleanErr := os.Remove(backupPath) + if cleanErr != nil { + klog.Warningf("clean backup path failed: %s", err.Error()) + } + return + } + return event +} + +func backup(backupPath string) error { + config := options.GetEdgeCoreConfig() + klog.Infof("backup start, backup path: %s", backupPath) + if err := os.MkdirAll(backupPath, 0750); err != nil { + return fmt.Errorf("mkdirall failed: %v", err) + } + + // backup edgecore.db: copy from origin path to backup path + if err := copy(config.DataBase.DataSource, filepath.Join(backupPath, "edgecore.db")); err != nil { + return fmt.Errorf("failed to backup db: %v", err) + } + // backup edgecore.yaml: copy from origin path to backup path + if err := copy(constants.DefaultConfigDir+"edgecore.yaml", filepath.Join(backupPath, "edgecore.yaml")); err != nil { + return fmt.Errorf("failed to back config: %v", err) + } + // backup edgecore: copy from origin path to backup path + if err := copy(filepath.Join(util.KubeEdgeUsrBinPath, util.KubeEdgeBinaryName), filepath.Join(backupPath, util.KubeEdgeBinaryName)); err != nil { + return fmt.Errorf("failed to backup edgecore: %v", err) + } + return nil +} + +func copy(src, dst string) error { + sourceFileStat, err := os.Stat(src) + if err != nil { + return err + } + + if !sourceFileStat.Mode().IsRegular() { + return fmt.Errorf("%s is not a regular file", src) + } + + source, err := os.Open(src) + if err != nil { + return err + } + defer source.Close() + + // copy file using src file mode + destination, err := os.OpenFile(dst, os.O_RDWR|os.O_CREATE|os.O_TRUNC, sourceFileStat.Mode()) + if err != nil { + return err + } + defer destination.Close() + _, err = io.Copy(destination, source) + return err +} diff --git a/edge/pkg/edgehub/task/taskexecutor/node_rollback.go b/edge/pkg/edgehub/task/taskexecutor/node_rollback.go new file mode 100644 index 000000000..85233dc45 --- /dev/null +++ b/edge/pkg/edgehub/task/taskexecutor/node_rollback.go @@ -0,0 +1,72 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package taskexecutor + +import ( + "fmt" + "os/exec" + + "k8s.io/klog/v2" + + commontypes "github.com/kubeedge/kubeedge/common/types" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" + "github.com/kubeedge/kubeedge/pkg/version" +) + +func rollbackNode(taskReq commontypes.NodeTaskRequest) (event fsm.Event) { + event = fsm.Event{ + Type: "Rollback", + Action: api.ActionSuccess, + } + var err error + defer func() { + if err != nil { + event.Action = api.ActionFailure + event.Msg = err.Error() + } + }() + + var upgradeReq *commontypes.NodeUpgradeJobRequest + upgradeReq, err = getTaskRequest(taskReq) + if err != nil { + return + } + + err = rollback(upgradeReq) + if err != nil { + return + } + return event +} + +func rollback(upgradeReq *commontypes.NodeUpgradeJobRequest) error { + klog.Infof("Begin to run rollback command") + rollBackCmd := fmt.Sprintf("keadm rollback edge --name %s --history %s >> /tmp/keadm.log 2>&1", + upgradeReq.UpgradeID, version.Get()) + + // run upgrade cmd to upgrade edge node + // use nohup command to start a child progress + command := fmt.Sprintf("nohup %s &", rollBackCmd) + cmd := exec.Command("bash", "-c", command) + s, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("run rollback command %s failed: %v, %s", command, err, s) + } + klog.Infof("!!! Finish rollback ") + return nil +} diff --git a/edge/pkg/edgehub/upgrade/upgrade.go b/edge/pkg/edgehub/task/taskexecutor/node_upgrade.go index e26cd50ca..8546ab96d 100644 --- a/edge/pkg/edgehub/upgrade/upgrade.go +++ b/edge/pkg/edgehub/task/taskexecutor/node_upgrade.go @@ -1,5 +1,5 @@ /* -Copyright 2022 The KubeEdge Authors. +Copyright 2023 The KubeEdge Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,100 +14,137 @@ See the License for the specific language governing permissions and limitations under the License. */ -package upgrade +package taskexecutor import ( "encoding/json" "fmt" "os/exec" "path/filepath" - "strings" - "sync" "k8s.io/klog/v2" - "github.com/kubeedge/beehive/pkg/core/model" - "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + "github.com/kubeedge/kubeedge/common/types" commontypes "github.com/kubeedge/kubeedge/common/types" "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options" - "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients" - "github.com/kubeedge/kubeedge/edge/pkg/edgehub/common/msghandler" "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" "github.com/kubeedge/kubeedge/pkg/version" ) -func init() { - handler := &upgradeHandler{} - msghandler.RegisterHandler(handler) +const ( + TaskUpgrade = "upgrade" +) - // register upgrade tool: keadm - // if not specify it, also use default upgrade tool: keadm - RegisterUpgradeProvider("", &keadmUpgrade{}) - RegisterUpgradeProvider("keadm", &keadmUpgrade{}) +type Upgrade struct { + *BaseExecutor } -type upgradeHandler struct{} - -func (uh *upgradeHandler) Filter(message *model.Message) bool { - name := message.GetGroup() - return name == modules.NodeUpgradeJobControllerModuleGroup +func (u *Upgrade) Name() string { + return u.name } -func (uh *upgradeHandler) Process(message *model.Message, clientHub clients.Adapter) error { - upgradeReq := &commontypes.NodeUpgradeJobRequest{} - data, err := message.GetContentData() - if err != nil { - return fmt.Errorf("failed to get content data: %v", err) +func NewUpgradeExecutor() Executor { + methods := map[string]func(types.NodeTaskRequest) fsm.Event{ + string(api.TaskChecking): preCheck, + string(api.TaskInit): initUpgrade, + "": initUpgrade, + string(api.BackingUpState): backupNode, + string(api.RollingBackState): rollbackNode, + string(api.UpgradingState): upgrade, } - err = json.Unmarshal(data, upgradeReq) - if err != nil { - return fmt.Errorf("unmarshal failed: %v", err) + return &Upgrade{ + BaseExecutor: NewBaseExecutor(TaskUpgrade, methods), } +} - err = validateUpgrade(upgradeReq) - if err != nil { - return fmt.Errorf("upgrade request is not valid: %v", err) +func initUpgrade(taskReq types.NodeTaskRequest) (event fsm.Event) { + event = fsm.Event{ + Type: "Init", + Action: api.ActionSuccess, } + var err error + defer func() { + if err != nil { + event.Action = api.ActionFailure + event.Msg = err.Error() + } + }() - tool := strings.ToLower(upgradeReq.UpgradeTool) - if _, ok := upgradeToolProviders[tool]; !ok { - return fmt.Errorf("not supported upgrade tool type: %v", upgradeReq.UpgradeTool) + var upgradeReq *commontypes.NodeUpgradeJobRequest + upgradeReq, err = getTaskRequest(taskReq) + if err != nil { + return } - return upgradeToolProviders[tool].Upgrade(upgradeReq) + if upgradeReq.UpgradeID == "" { + err = fmt.Errorf("upgradeID cannot be empty") + return + } + if upgradeReq.Version == version.Get().String() { + return fsm.Event{ + Type: "Upgrading", + Action: api.ActionSuccess, + } + } + err = prepareKeadm(upgradeReq) + if err != nil { + return + } + return event } -func validateUpgrade(upgrade *commontypes.NodeUpgradeJobRequest) error { - if upgrade.UpgradeID == "" { - return fmt.Errorf("upgradeID cannot be empty") +func getTaskRequest(taskReq commontypes.NodeTaskRequest) (*commontypes.NodeUpgradeJobRequest, error) { + data, err := json.Marshal(taskReq.Item) + if err != nil { + return nil, err } - if upgrade.Version == version.Get().String() { - return fmt.Errorf("edge node already on specific version, no need to upgrade") + var upgradeReq commontypes.NodeUpgradeJobRequest + err = json.Unmarshal(data, &upgradeReq) + if err != nil { + return nil, err } - - return nil + return &upgradeReq, err } -type Provider interface { - Upgrade(upgrade *commontypes.NodeUpgradeJobRequest) error +func upgrade(taskReq types.NodeTaskRequest) (event fsm.Event) { + opts := options.GetEdgeCoreOptions() + event = fsm.Event{ + Type: "Upgrade", + } + upgradeReq, err := getTaskRequest(taskReq) + if err != nil { + event.Action = api.ActionFailure + event.Msg = err.Error() + return + } + err = keadmUpgrade(*upgradeReq, opts) + if err != nil { + event.Action = api.ActionFailure + event.Msg = err.Error() + } + return } -var ( - upgradeToolProviders = make(map[string]Provider) - mutex sync.Mutex -) +func keadmUpgrade(upgradeReq commontypes.NodeUpgradeJobRequest, opts *options.EdgeCoreOptions) error { + klog.Infof("Begin to run upgrade command") + upgradeCmd := fmt.Sprintf("keadm upgrade --upgradeID %s --historyID %s --fromVersion %s --toVersion %s --config %s --image %s > /tmp/keadm.log 2>&1", + upgradeReq.UpgradeID, upgradeReq.HistoryID, version.Get(), upgradeReq.Version, opts.ConfigFile, upgradeReq.Image) -func RegisterUpgradeProvider(upgradeToolType string, provider Provider) { - mutex.Lock() - defer mutex.Unlock() - upgradeToolProviders[upgradeToolType] = provider + // run upgrade cmd to upgrade edge node + // use nohup command to start a child progress + command := fmt.Sprintf("nohup %s &", upgradeCmd) + cmd := exec.Command("bash", "-c", command) + s, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("run upgrade command %s failed: %v, %s", command, err, s) + } + klog.Infof("!!! Finish upgrade from Version %s to %s ...", version.Get(), upgradeReq.Version) + return nil } -type keadmUpgrade struct{} - -func (*keadmUpgrade) Upgrade(upgradeReq *commontypes.NodeUpgradeJobRequest) error { - // get edgecore start options and config - opts := options.GetEdgeCoreOptions() +func prepareKeadm(upgradeReq *commontypes.NodeUpgradeJobRequest) error { config := options.GetEdgeCoreConfig() // install the requested installer keadm from docker image @@ -119,7 +156,6 @@ func (*keadmUpgrade) Upgrade(upgradeReq *commontypes.NodeUpgradeJobRequest) erro if err != nil { return fmt.Errorf("failed to new container runtime: %v", err) } - image := upgradeReq.Image // TODO: do some verification 1.sha256(pass in using CRD) 2.image signature verification @@ -135,22 +171,5 @@ func (*keadmUpgrade) Upgrade(upgradeReq *commontypes.NodeUpgradeJobRequest) erro if err != nil { return fmt.Errorf("failed to cp file from image to host: %v", err) } - - klog.Infof("Begin to run upgrade command") - upgradeCmd := fmt.Sprintf("keadm upgrade --upgradeID %s --historyID %s --fromVersion %s --toVersion %s --config %s --image %s > /tmp/keadm.log 2>&1", - upgradeReq.UpgradeID, upgradeReq.HistoryID, version.Get(), upgradeReq.Version, opts.ConfigFile, image) - - // run upgrade cmd to upgrade edge node - // use nohup command to start a child progress - command := fmt.Sprintf("nohup %s &", upgradeCmd) - cmd := exec.Command("bash", "-c", command) - s, err := cmd.CombinedOutput() - if err != nil { - klog.Errorf("run upgrade command %s failed: %v, %s", command, err, s) - return fmt.Errorf("run upgrade command %s failed: %v, %s", command, err, s) - } - - klog.Infof("!!! Begin to upgrade from Version %s to %s ...", version.Get(), upgradeReq.Version) - return nil } diff --git a/edge/pkg/edgehub/task/taskexecutor/pre_check.go b/edge/pkg/edgehub/task/taskexecutor/pre_check.go new file mode 100644 index 000000000..da68a2077 --- /dev/null +++ b/edge/pkg/edgehub/task/taskexecutor/pre_check.go @@ -0,0 +1,158 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package taskexecutor + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/disk" + "github.com/shirou/gopsutil/v3/mem" + + "github.com/kubeedge/kubeedge/common/types" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" +) + +const ( + MaxCPUUsage float64 = 80 + MaxMemUsage float64 = 80 + MaxDiskUsage float64 = 80 +) + +func preCheck(taskReq types.NodeTaskRequest) fsm.Event { + event := fsm.Event{ + Type: "Check", + Action: api.ActionSuccess, + Msg: "", + } + + data, err := json.Marshal(taskReq.Item) + if err != nil { + event.Action = api.ActionFailure + event.Msg = err.Error() + return event + } + var checkItems types.NodePreCheckRequest + err = json.Unmarshal(data, &checkItems) + if err != nil { + event.Action = api.ActionFailure + event.Msg = err.Error() + return event + } + + var failed bool + var checkResult = map[string]string{} + var checkFunc = map[string]func() error{ + "cpu": checkCPU, + "mem": checkMem, + "disk": checkDisk, + } + for _, item := range checkItems.CheckItem { + f, ok := checkFunc[item] + if !ok { + checkResult[item] = "check item not support" + continue + } + err = f() + if err != nil { + failed = true + checkResult[item] = err.Error() + continue + } + checkResult[item] = "ok" + } + if !failed { + return event + } + event.Action = api.ActionFailure + result, err := json.Marshal(checkResult) + if err != nil { + event.Msg = err.Error() + return event + } + event.Msg = string(result) + return event +} + +func checkCPU() error { + cpuUsage, err := cpu.Percent(100*time.Millisecond, false) + if err != nil { + return err + } + var usage float64 + for _, percpu := range cpuUsage { + usage += percpu / float64(len(cpuUsage)) + } + if usage > MaxCPUUsage { + return fmt.Errorf("current cpu usage is %f, which exceeds the maximum allowed usage %f", usage, MaxCPUUsage) + } + return nil +} + +func checkMem() error { + memInfo, err := mem.VirtualMemory() + if err != nil { + return err + } + memUsedPercent := memInfo.UsedPercent + if memUsedPercent > MaxMemUsage { + return fmt.Errorf("current mem usage is %f, which exceeds the maximum allowed usage %f", memUsedPercent, MaxMemUsage) + } + return nil +} + +func checkDisk() error { + partitions, err := disk.Partitions(true) + if err != nil { + return err + } + var failed bool + var diskUsages = map[string]string{} + for _, part := range partitions { + usage, err := disk.Usage(part.Mountpoint) + if err != nil { + failed = true + diskUsages[part.Device] = err.Error() + continue + } + if usage.UsedPercent > MaxDiskUsage { + failed = true + diskUsages[part.Device] = fmt.Sprintf("current disk usage is %f, which exceeds the maximum allowed usage %f", usage.UsedPercent, MaxMemUsage) + continue + } + diskUsages[part.Device] = fmt.Sprintf("%f", usage.UsedPercent) + } + if !failed { + return nil + } + result, err := json.Marshal(diskUsages) + if err != nil { + return err + } + return fmt.Errorf(string(result)) +} + +func normalInit(types.NodeTaskRequest) fsm.Event { + return fsm.Event{ + Type: "Init", + Action: api.ActionSuccess, + Msg: "", + } +} diff --git a/edge/pkg/edgehub/task/taskexecutor/task_executor.go b/edge/pkg/edgehub/task/taskexecutor/task_executor.go new file mode 100644 index 000000000..fd3be1f88 --- /dev/null +++ b/edge/pkg/edgehub/task/taskexecutor/task_executor.go @@ -0,0 +1,93 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package taskexecutor + +import ( + "fmt" + + "k8s.io/klog/v2" + + "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" +) + +func init() { + Register(TaskUpgrade, NewUpgradeExecutor()) + Register(TaskPrePull, NewPrePullExecutor()) +} + +type Executor interface { + Name() string + Do(types.NodeTaskRequest) (fsm.Event, error) +} + +type BaseExecutor struct { + name string + methods map[string]func(types.NodeTaskRequest) fsm.Event +} + +func (be *BaseExecutor) Name() string { + return be.name +} + +func NewBaseExecutor(name string, methods map[string]func(types.NodeTaskRequest) fsm.Event) *BaseExecutor { + return &BaseExecutor{ + name: name, + methods: methods, + } +} + +func (be *BaseExecutor) Do(taskReq types.NodeTaskRequest) (fsm.Event, error) { + method, ok := be.methods[taskReq.State] + if !ok { + err := fmt.Errorf("method %s in executor %s is not implemented", taskReq.State, taskReq.Type) + klog.Warning(err.Error()) + return fsm.Event{}, err + } + return method(taskReq), nil +} + +var ( + executors = make(map[string]Executor) + CommonMethods = map[string]func(types.NodeTaskRequest) fsm.Event{ + string(v1alpha1.TaskChecking): preCheck, + string(v1alpha1.TaskInit): normalInit, + } +) + +func Register(name string, executor Executor) { + if _, ok := executors[name]; ok { + klog.Warning("executor %s exists ", name) + } + executors[name] = executor +} + +func GetExecutor(name string) (Executor, error) { + executor, ok := executors[name] + if !ok { + return nil, fmt.Errorf("executor %s is not registered", name) + } + return executor, nil +} + +func emptyInit(_ types.NodeTaskRequest) (event fsm.Event) { + return fsm.Event{ + Type: "Init", + Action: v1alpha1.ActionSuccess, + } +} diff --git a/edge/pkg/edgehub/upgrade/image_prepull.go b/edge/pkg/edgehub/upgrade/image_prepull.go deleted file mode 100644 index d02a4cdc9..000000000 --- a/edge/pkg/edgehub/upgrade/image_prepull.go +++ /dev/null @@ -1,188 +0,0 @@ -/* -Copyright 2023 The KubeEdge Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package upgrade - -import ( - "encoding/json" - "fmt" - "strings" - - v1 "k8s.io/api/core/v1" - runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" - "k8s.io/klog/v2" - - beehiveContext "github.com/kubeedge/beehive/pkg/core/context" - "github.com/kubeedge/beehive/pkg/core/model" - cloudmodules "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" - "github.com/kubeedge/kubeedge/common/constants" - commontypes "github.com/kubeedge/kubeedge/common/types" - "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options" - "github.com/kubeedge/kubeedge/edge/pkg/common/modules" - "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients" - "github.com/kubeedge/kubeedge/edge/pkg/edgehub/common/msghandler" - metaclient "github.com/kubeedge/kubeedge/edge/pkg/metamanager/client" - "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util" - "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" -) - -func init() { - handler := &prepullHandler{} - msghandler.RegisterHandler(handler) -} - -type prepullHandler struct{} - -func (uh *prepullHandler) Filter(message *model.Message) bool { - name := message.GetGroup() - return name == cloudmodules.ImagePrePullControllerModuleGroup -} - -func (uh *prepullHandler) Process(message *model.Message, clientHub clients.Adapter) error { - // get edgecore config - edgecoreconfig := options.GetEdgeCoreConfig() - if edgecoreconfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint == "" { - edgecoreconfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint = edgecoreconfig.Modules.Edged.RemoteRuntimeEndpoint - } - nodeName := edgecoreconfig.Modules.Edged.HostnameOverride - - var errmsg, jobName string - defer func() { - if errmsg != "" { - sendResponseMessage(nodeName, jobName, errmsg, nil, v1alpha1.PrePullFailed) - } - }() - - jobName, err := parseJobName(message.GetResource()) - if err != nil { - errmsg = fmt.Sprintf("failed to parse prepull resource, err: %v", err) - return fmt.Errorf(errmsg) - } - - // parse message request - var prePullReq commontypes.ImagePrePullJobRequest - data, err := message.GetContentData() - if err != nil { - errmsg = fmt.Sprintf("failed to get message content, err: %v", err) - return fmt.Errorf(errmsg) - } - err = json.Unmarshal(data, &prePullReq) - if err != nil { - errmsg = fmt.Sprintf("failed to unmarshal message content, err: %v", err) - return fmt.Errorf(errmsg) - } - if nodeName != prePullReq.NodeName { - errmsg = fmt.Sprintf("request node name %s is not match with the edge node %s", prePullReq.NodeName, nodeName) - return fmt.Errorf(errmsg) - } - - // todo check items such as disk according to req.checkitem config - - // pull images - container, err := util.NewContainerRuntime(edgecoreconfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint, edgecoreconfig.Modules.Edged.TailoredKubeletConfig.CgroupDriver) - if err != nil { - errmsg = fmt.Sprintf("failed to new container runtime: %v", err) - return fmt.Errorf(errmsg) - } - - go func() { - pullMsg, resp, state := prePullImages(prePullReq, container) - sendResponseMessage(nodeName, jobName, pullMsg, resp, state) - }() - return nil -} - -func sendResponseMessage(nodeName, jobName, pullMsg string, imageStatus []v1alpha1.ImageStatus, state v1alpha1.PrePullState) { - resp := commontypes.ImagePrePullJobResponse{ - NodeName: nodeName, - State: state, - Reason: pullMsg, - ImageStatus: imageStatus, - } - msg := model.NewMessage("").SetRoute(modules.EdgeHubModuleName, modules.HubGroup). - SetResourceOperation(fmt.Sprintf("node/%s/prepull/%s", nodeName, jobName), "prepull").FillBody(resp) - beehiveContext.Send(modules.EdgeHubModuleName, *msg) -} - -func prePullImages(prePullReq commontypes.ImagePrePullJobRequest, container util.ContainerRuntime) (string, []v1alpha1.ImageStatus, v1alpha1.PrePullState) { - var errmsg string - authConfig, err := makeAuthConfig(prePullReq.Secret) - if err != nil { - errmsg = fmt.Sprintf("failed to get prepull secret, err: %v", err) - return errmsg, nil, v1alpha1.PrePullFailed - } - - state := v1alpha1.PrePullSuccessful - var imageStatus []v1alpha1.ImageStatus - for _, image := range prePullReq.Images { - prePullStatus := v1alpha1.ImageStatus{ - Image: image, - } - for i := 0; i < int(prePullReq.RetryTimes); i++ { - err = container.PullImage(image, authConfig, nil) - if err == nil { - break - } - } - if err != nil { - klog.Errorf("pull image %s failed, err: %v", image, err) - errmsg = "prepull images failed" - state = v1alpha1.PrePullFailed - prePullStatus.State = v1alpha1.PrePullFailed - prePullStatus.Reason = err.Error() - } else { - klog.Infof("pull image %s successfully!", image) - prePullStatus.State = v1alpha1.PrePullSuccessful - } - imageStatus = append(imageStatus, prePullStatus) - } - - return errmsg, imageStatus, state -} - -func parseJobName(resource string) (string, error) { - var jobName string - sli := strings.Split(resource, constants.ResourceSep) - if len(sli) != 2 { - return jobName, fmt.Errorf("the resource %s is not the standard type", resource) - } - return sli[1], nil -} - -func makeAuthConfig(pullsecret string) (*runtimeapi.AuthConfig, error) { - if pullsecret == "" { - return nil, nil - } - - secretSli := strings.Split(pullsecret, constants.ResourceSep) - if len(secretSli) != 2 { - return nil, fmt.Errorf("pull secret format is not correct") - } - - client := metaclient.New() - secret, err := client.Secrets(secretSli[0]).Get(secretSli[1]) - if err != nil { - return nil, fmt.Errorf("get secret %s failed, %v", secretSli[1], err) - } - - var auth runtimeapi.AuthConfig - err = json.Unmarshal(secret.Data[v1.DockerConfigJsonKey], &auth) - if err != nil { - return nil, fmt.Errorf("unmarshal secret %s to auth file failed, %v", secretSli[1], err) - } - - return &auth, nil -} diff --git a/edge/pkg/edgehub/upgrade/upgrade_test.go b/edge/pkg/edgehub/upgrade/upgrade_test.go deleted file mode 100644 index 2660f9d79..000000000 --- a/edge/pkg/edgehub/upgrade/upgrade_test.go +++ /dev/null @@ -1,56 +0,0 @@ -/* -Copyright 2022 The KubeEdge Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package upgrade - -import ( - "testing" - - "github.com/kubeedge/beehive/pkg/core/model" -) - -func TestFilter(t *testing.T) { - uh := &upgradeHandler{} - - tests := []struct { - msg *model.Message - want bool - name string - }{ - { - msg: &model.Message{ - Router: model.MessageRoute{Group: "nodeupgradejobcontroller"}, - }, - want: true, - name: "Node upgrade job controlled", - }, - { - msg: &model.Message{ - Router: model.MessageRoute{Group: "devicecontroller"}, - }, - want: false, - name: "Device controller", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := uh.Filter(tt.msg); got != tt.want { - t.Errorf("upgradeHandler.Filter() retuned unexpected result. got = %v, want = %v", got, tt.want) - } - }) - } -} diff --git a/keadm/cmd/keadm/app/cmd/cmd_others.go b/keadm/cmd/keadm/app/cmd/cmd_others.go index 71c699d52..56c394608 100644 --- a/keadm/cmd/keadm/app/cmd/cmd_others.go +++ b/keadm/cmd/keadm/app/cmd/cmd_others.go @@ -89,5 +89,7 @@ func NewKubeedgeCommand() *cobra.Command { cmds.AddCommand(beta.NewBeta()) cmds.AddCommand(edge.NewEdgeUpgrade()) + cmds.AddCommand(edge.NewEdgeRollback()) + return cmds } diff --git a/keadm/cmd/keadm/app/cmd/edge/rollback.go b/keadm/cmd/keadm/app/cmd/edge/rollback.go new file mode 100644 index 000000000..cacd279a3 --- /dev/null +++ b/keadm/cmd/keadm/app/cmd/edge/rollback.go @@ -0,0 +1,113 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package edge + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + "k8s.io/klog/v2" + "sigs.k8s.io/yaml" + + "github.com/kubeedge/kubeedge/common/constants" + "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util" + "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha2" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" + "github.com/kubeedge/kubeedge/pkg/version" +) + +// NewEdgeUpgrade returns KubeEdge edge upgrade command. +func NewEdgeRollback() *cobra.Command { + rollbackOptions := newRollbackOptions() + + cmd := &cobra.Command{ + Use: "rollback", + Short: "rollback edge component. Rollback the edge node to the desired version.", + Long: "Rollback edge component. Rollback the edge node to the desired version.", + RunE: func(cmd *cobra.Command, args []string) error { + // rollback edge core + return rollbackEdgeCore(rollbackOptions) + }, + } + + addRollbackFlags(cmd, rollbackOptions) + return cmd +} + +// newJoinOptions returns a struct ready for being used for creating cmd join flags. +func newRollbackOptions() *RollbackOptions { + opts := &RollbackOptions{} + opts.HistoryVersion = version.Get().String() + opts.Config = constants.DefaultConfigDir + "edgecore.yaml" + + return opts +} + +func rollbackEdgeCore(ro *RollbackOptions) error { + // get EdgeCore configuration from edgecore.yaml config file + data, err := os.ReadFile(ro.Config) + if err != nil { + return fmt.Errorf("failed to read config file %s: %v", ro.Config, err) + } + + configure := &v1alpha2.EdgeCoreConfig{} + err = yaml.Unmarshal(data, configure) + if err != nil { + return fmt.Errorf("failed to unmarshal config file %s: %v", ro.Config, err) + } + event := &fsm.Event{ + Type: "RollBack", + Action: api.ActionSuccess, + } + defer func() { + // report upgrade result to cloudhub + if err = util.ReportTaskResult(configure, ro.TaskType, ro.TaskName, *event); err != nil { + klog.Warning("failed to report upgrade result to cloud: %v", err) + } + }() + + rbErr := rollback(ro.HistoryVersion, configure.DataBase.DataSource, ro.Config) + if rbErr != nil { + event.Action = api.ActionFailure + event.Msg = fmt.Sprintf("upgrade error: %v, rollback error: %v", err, rbErr) + } + + return nil +} + +type RollbackOptions struct { + HistoryVersion string + TaskType string + TaskName string + Config string +} + +func addRollbackFlags(cmd *cobra.Command, rollbackOptions *RollbackOptions) { + cmd.Flags().StringVar(&rollbackOptions.HistoryVersion, "history", rollbackOptions.HistoryVersion, + "Use this key to specify the origin version before upgrade") + + cmd.Flags().StringVar(&rollbackOptions.Config, "config", rollbackOptions.Config, + "Use this key to specify the path to the edgecore configuration file.") + + cmd.Flags().StringVar(&rollbackOptions.TaskType, "type", "rollback", + "Use this key to specify the task type for reporting status.") + + cmd.Flags().StringVar(&rollbackOptions.TaskName, "name", rollbackOptions.TaskName, + "Use this key to specify the task name for reporting status.") +} diff --git a/keadm/cmd/keadm/app/cmd/edge/upgrade.go b/keadm/cmd/keadm/app/cmd/edge/upgrade.go index 001323679..2d5f934d8 100644 --- a/keadm/cmd/keadm/app/cmd/edge/upgrade.go +++ b/keadm/cmd/keadm/app/cmd/edge/upgrade.go @@ -17,28 +17,21 @@ limitations under the License. package edge import ( - "bytes" - "crypto/tls" - "crypto/x509" - "encoding/json" "fmt" "io" - "net" - "net/http" "os" "path/filepath" - "time" "github.com/spf13/cobra" "k8s.io/klog/v2" "sigs.k8s.io/yaml" "github.com/kubeedge/kubeedge/common/constants" - commontypes "github.com/kubeedge/kubeedge/common/types" "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/common" "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util" "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha2" - upgradev1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" ) var ( @@ -93,91 +86,96 @@ func (up *UpgradeOptions) upgrade() error { HistoryID: up.HistoryID, FromVersion: up.FromVersion, ToVersion: up.ToVersion, + TaskType: up.TaskType, Image: up.Image, + DisableBackup: up.DisableBackup, ConfigFilePath: up.Config, EdgeCoreConfig: configure, } + event := &fsm.Event{ + Type: "Upgrade", + Action: api.ActionSuccess, + } defer func() { // report upgrade result to cloudhub - if err := upgrade.reportUpgradeResult(); err != nil { + if err = util.ReportTaskResult(configure, upgrade.TaskType, upgrade.UpgradeID, *event); err != nil { klog.Errorf("failed to report upgrade result to cloud: %v", err) } // cleanup idempotency record - if err := os.Remove(idempotencyRecord); err != nil { + if err = os.Remove(idempotencyRecord); err != nil { klog.Errorf("failed to remove idempotency_record file(%s): %v", idempotencyRecord, err) } }() // only allow upgrade when last upgrade finished if util.FileExists(idempotencyRecord) { - upgrade.UpdateStatus(string(upgradev1alpha1.UpgradeFailedRollbackSuccess)) - upgrade.UpdateFailureReason("last upgrade not finished, not allowed upgrade again") + event.Action = api.ActionFailure + event.Msg = "last upgrade not finished, not allowed upgrade again" return fmt.Errorf("last upgrade not finished, not allowed upgrade again") } // create idempotency_record file if err := os.MkdirAll(filepath.Dir(idempotencyRecord), 0750); err != nil { - upgrade.UpdateStatus(string(upgradev1alpha1.UpgradeFailedRollbackSuccess)) reason := fmt.Sprintf("failed to create idempotency_record dir: %v", err) - upgrade.UpdateFailureReason(reason) + event.Action = api.ActionFailure + event.Msg = reason return fmt.Errorf(reason) } if _, err := os.Create(idempotencyRecord); err != nil { - upgrade.UpdateStatus(string(upgradev1alpha1.UpgradeFailedRollbackSuccess)) reason := fmt.Sprintf("failed to create idempotency_record file: %v", err) - upgrade.UpdateFailureReason(reason) + event.Action = api.ActionFailure + event.Msg = reason return fmt.Errorf(reason) } // run script to do upgrade operation err = upgrade.PreProcess() if err != nil { - upgrade.UpdateStatus(string(upgradev1alpha1.UpgradeFailedRollbackSuccess)) - upgrade.UpdateFailureReason(fmt.Sprintf("upgrade error: %v", err)) + event.Action = api.ActionFailure + event.Msg = fmt.Sprintf("upgrade pre process failed: %v", err) return fmt.Errorf("upgrade pre process failed: %v", err) } err = upgrade.Process() if err != nil { + event.Type = "Rollback" rbErr := upgrade.Rollback() if rbErr != nil { - upgrade.UpdateStatus(string(upgradev1alpha1.UpgradeFailedRollbackFailed)) - upgrade.UpdateFailureReason(fmt.Sprintf("upgrade error: %v, rollback error: %v", err, rbErr)) + event.Action = api.ActionFailure + event.Msg = rbErr.Error() } else { - upgrade.UpdateStatus(string(upgradev1alpha1.UpgradeFailedRollbackSuccess)) - upgrade.UpdateFailureReason(fmt.Sprintf("upgrade error: %v", err)) + event.Msg = err.Error() } return fmt.Errorf("upgrade process failed: %v", err) } - upgrade.UpdateStatus(string(upgradev1alpha1.UpgradeSuccess)) - return nil } func (up *Upgrade) PreProcess() error { - klog.Infof("upgrade preprocess start") - backupPath := filepath.Join(util.KubeEdgeBackupPath, up.FromVersion) - if err := os.MkdirAll(backupPath, 0750); err != nil { - return fmt.Errorf("mkdirall failed: %v", err) - } + // download the request version edgecore + klog.Infof("Begin to download version %s edgecore", up.ToVersion) + if !up.DisableBackup { + backupPath := filepath.Join(util.KubeEdgeBackupPath, up.FromVersion) + if err := os.MkdirAll(backupPath, 0750); err != nil { + return fmt.Errorf("mkdirall failed: %v", err) + } - // backup edgecore.db: copy from origin path to backup path - if err := copy(up.EdgeCoreConfig.DataBase.DataSource, filepath.Join(backupPath, "edgecore.db")); err != nil { - return fmt.Errorf("failed to backup db: %v", err) - } - // backup edgecore.yaml: copy from origin path to backup path - if err := copy(up.ConfigFilePath, filepath.Join(backupPath, "edgecore.yaml")); err != nil { - return fmt.Errorf("failed to back config: %v", err) - } - // backup edgecore: copy from origin path to backup path - if err := copy(filepath.Join(util.KubeEdgeUsrBinPath, util.KubeEdgeBinaryName), filepath.Join(backupPath, util.KubeEdgeBinaryName)); err != nil { - return fmt.Errorf("failed to backup edgecore: %v", err) + // backup edgecore.db: copy from origin path to backup path + if err := copy(up.EdgeCoreConfig.DataBase.DataSource, filepath.Join(backupPath, "edgecore.db")); err != nil { + return fmt.Errorf("failed to backup db: %v", err) + } + // backup edgecore.yaml: copy from origin path to backup path + if err := copy(up.ConfigFilePath, filepath.Join(backupPath, "edgecore.yaml")); err != nil { + return fmt.Errorf("failed to back config: %v", err) + } + // backup edgecore: copy from origin path to backup path + if err := copy(filepath.Join(util.KubeEdgeUsrBinPath, util.KubeEdgeBinaryName), filepath.Join(backupPath, util.KubeEdgeBinaryName)); err != nil { + return fmt.Errorf("failed to backup edgecore: %v", err) + } } - // download the request version edgecore - klog.Infof("Begin to download version %s edgecore", up.ToVersion) upgradePath := filepath.Join(util.KubeEdgeUpgradePath, up.ToVersion) if up.EdgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint == "" { up.EdgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint = up.EdgeCoreConfig.Modules.Edged.RemoteRuntimeEndpoint @@ -266,6 +264,10 @@ func (up *Upgrade) Process() error { } func (up *Upgrade) Rollback() error { + return rollback(up.FromVersion, up.EdgeCoreConfig.DataBase.DataSource, up.ConfigFilePath) +} + +func rollback(HistoryVersion, dataSource, configFilePath string) error { klog.Infof("upgrade rollback process start") // stop edgecore @@ -277,12 +279,12 @@ func (up *Upgrade) Rollback() error { // rollback origin config/db/binary // backup edgecore.db: copy from backup path to origin path - backupPath := filepath.Join(util.KubeEdgeBackupPath, up.FromVersion) - if err := copy(filepath.Join(backupPath, "edgecore.db"), up.EdgeCoreConfig.DataBase.DataSource); err != nil { + backupPath := filepath.Join(util.KubeEdgeBackupPath, HistoryVersion) + if err := copy(filepath.Join(backupPath, "edgecore.db"), dataSource); err != nil { return fmt.Errorf("failed to rollback db: %v", err) } // backup edgecore.yaml: copy from backup path to origin path - if err := copy(filepath.Join(backupPath, "edgecore.yaml"), up.ConfigFilePath); err != nil { + if err := copy(filepath.Join(backupPath, "edgecore.yaml"), configFilePath); err != nil { return fmt.Errorf("failed to back config: %v", err) } // backup edgecore: copy from backup path to origin path @@ -292,7 +294,7 @@ func (up *Upgrade) Rollback() error { // generate edgecore.service if util.HasSystemd() { - err = common.GenerateServiceFile(util.KubeEdgeBinaryName, fmt.Sprintf("%s --config %s", filepath.Join(util.KubeEdgeUsrBinPath, util.KubeEdgeBinaryName), up.ConfigFilePath), false) + err = common.GenerateServiceFile(util.KubeEdgeBinaryName, fmt.Sprintf("%s --config %s", filepath.Join(util.KubeEdgeUsrBinPath, util.KubeEdgeBinaryName), configFilePath), false) if err != nil { return fmt.Errorf("failed to create edgecore.service file: %v", err) } @@ -303,7 +305,6 @@ func (up *Upgrade) Rollback() error { if err != nil { return fmt.Errorf("failed to start origin edgecore: %v", err) } - return nil } @@ -315,67 +316,15 @@ func (up *Upgrade) UpdateFailureReason(reason string) { up.Reason = reason } -func (up *Upgrade) reportUpgradeResult() error { - resp := &commontypes.NodeUpgradeJobResponse{ - UpgradeID: up.UpgradeID, - HistoryID: up.HistoryID, - NodeName: up.EdgeCoreConfig.Modules.Edged.HostnameOverride, - FromVersion: up.FromVersion, - ToVersion: up.ToVersion, - Status: up.Status, - Reason: up.Reason, - } - - var caCrt []byte - caCertPath := up.EdgeCoreConfig.Modules.EdgeHub.TLSCAFile - caCrt, err := os.ReadFile(caCertPath) - if err != nil { - return fmt.Errorf("failed to read ca: %v", err) - } - - rootCAs := x509.NewCertPool() - rootCAs.AppendCertsFromPEM(caCrt) - - certFile := up.EdgeCoreConfig.Modules.EdgeHub.TLSCertFile - keyFile := up.EdgeCoreConfig.Modules.EdgeHub.TLSPrivateKeyFile - cliCrt, err := tls.LoadX509KeyPair(certFile, keyFile) - - transport := &http.Transport{ - DialContext: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext, - // use TLS configuration - TLSClientConfig: &tls.Config{ - RootCAs: rootCAs, - InsecureSkipVerify: false, - Certificates: []tls.Certificate{cliCrt}, - }, - } - - client := &http.Client{Transport: transport, Timeout: 30 * time.Second} - - respData, err := json.Marshal(resp) - if err != nil { - return fmt.Errorf("marshal failed: %v", err) - } - url := up.EdgeCoreConfig.Modules.EdgeHub.HTTPServer + constants.DefaultNodeUpgradeURL - result, err := client.Post(url, "application/json", bytes.NewReader(respData)) - if err != nil { - return fmt.Errorf("post http request failed: %v", err) - } - defer result.Body.Close() - - return nil -} - type UpgradeOptions struct { - UpgradeID string - HistoryID string - FromVersion string - ToVersion string - Config string - Image string + UpgradeID string + HistoryID string + FromVersion string + ToVersion string + Config string + Image string + DisableBackup bool + TaskType string } type Upgrade struct { @@ -384,7 +333,9 @@ type Upgrade struct { FromVersion string ToVersion string Image string + DisableBackup bool ConfigFilePath string + TaskType string EdgeCoreConfig *v1alpha2.EdgeCoreConfig Status string @@ -409,4 +360,10 @@ func addUpgradeFlags(cmd *cobra.Command, upgradeOptions *UpgradeOptions) { cmd.Flags().StringVar(&upgradeOptions.Image, "image", upgradeOptions.Image, "Use this key to specify installation image to download.") + + cmd.Flags().StringVar(&upgradeOptions.TaskType, "type", "upgrade", + "Use this key to specify the task type for reporting status.") + + cmd.Flags().BoolVar(&upgradeOptions.DisableBackup, "disable-backup", upgradeOptions.DisableBackup, + "Use this key to specify the backup enable for upgrade.") } diff --git a/keadm/cmd/keadm/app/cmd/util/common.go b/keadm/cmd/keadm/app/cmd/util/common.go index 4cbc2c243..a5065484e 100644 --- a/keadm/cmd/keadm/app/cmd/util/common.go +++ b/keadm/cmd/keadm/app/cmd/util/common.go @@ -18,16 +18,22 @@ package util import ( "archive/tar" + "bytes" "compress/gzip" "crypto/sha512" + "crypto/tls" + "crypto/x509" + "encoding/json" "fmt" "io" + "net" "net/http" "os" "path/filepath" "regexp" "strconv" "strings" + "time" "github.com/blang/semver" "github.com/spf13/pflag" @@ -39,8 +45,11 @@ import ( "k8s.io/klog/v2" "github.com/kubeedge/kubeedge/common/constants" + commontypes "github.com/kubeedge/kubeedge/common/types" types "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/common" + "github.com/kubeedge/kubeedge/pkg/apis" "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha2" + "github.com/kubeedge/kubeedge/pkg/util/fsm" pkgversion "github.com/kubeedge/kubeedge/pkg/version" ) @@ -639,3 +648,57 @@ func downloadServiceFile(componentType types.ComponentType, version semver.Versi } return nil } + +func ReportTaskResult(config *v1alpha2.EdgeCoreConfig, taskType, taskID string, event fsm.Event) error { + resp := &commontypes.NodeTaskResponse{ + NodeName: config.Modules.Edged.HostnameOverride, + Event: event.Type, + Action: event.Action, + Time: time.Now().Format(apis.ISO8601UTC), + Reason: event.Msg, + } + edgeHub := config.Modules.EdgeHub + var caCrt []byte + caCertPath := edgeHub.TLSCAFile + caCrt, err := os.ReadFile(caCertPath) + if err != nil { + return fmt.Errorf("failed to read ca: %v", err) + } + + rootCAs := x509.NewCertPool() + rootCAs.AppendCertsFromPEM(caCrt) + + certFile := edgeHub.TLSCertFile + keyFile := edgeHub.TLSPrivateKeyFile + cliCrt, err := tls.LoadX509KeyPair(certFile, keyFile) + + transport := &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + // use TLS configuration + TLSClientConfig: &tls.Config{ + RootCAs: rootCAs, + InsecureSkipVerify: false, + Certificates: []tls.Certificate{cliCrt}, + }, + } + + client := &http.Client{Transport: transport, Timeout: 30 * time.Second} + + respData, err := json.Marshal(resp) + if err != nil { + return fmt.Errorf("marshal failed: %v", err) + } + url := edgeHub.HTTPServer + fmt.Sprintf("/task/%s/name/%s/node/%s/status", taskType, taskID, config.Modules.Edged.HostnameOverride) + result, err := client.Post(url, "application/json", bytes.NewReader(respData)) + + if err != nil { + return fmt.Errorf("post http request failed: %v", err) + } + klog.Error("report result ", result) + defer result.Body.Close() + + return nil +} diff --git a/manifests/charts/cloudcore/crds/operations_v1alpha1_imageprepulljob.yaml b/manifests/charts/cloudcore/crds/operations_v1alpha1_imageprepulljob.yaml index 903516d32..a044a1532 100644 --- a/manifests/charts/cloudcore/crds/operations_v1alpha1_imageprepulljob.yaml +++ b/manifests/charts/cloudcore/crds/operations_v1alpha1_imageprepulljob.yaml @@ -48,6 +48,16 @@ spec: items: type: string type: array + concurrency: + description: Concurrency specifies the maximum number of edge + nodes that can pull images at the same time. The default Concurrency + value is 1. + format: int32 + type: integer + failureTolerate: + description: FailureTolerate specifies the task tolerance failure + ratio. The default FailureTolerate value is 0.1. + type: string imageSecrets: description: ImageSecret specifies the secret for image pull if private registry used. Use {namespace}/{secretName} in format. @@ -119,10 +129,10 @@ spec: failed on each edgenode. Default to 0 format: int32 type: integer - timeoutSecondsOnEachNode: - description: TimeoutSecondsOnEachNode limits the duration of the - image prepull job on each edgenode. Default to 360. If set to - 0, we'll use the default value 360. + timeoutSeconds: + description: TimeoutSeconds limits the duration of the node prepull + job on each edgenode. Default to 300. If set to 0, we'll use + the default value 300. format: int32 type: integer type: object @@ -130,14 +140,21 @@ spec: status: description: Status represents the status of ImagePrePullJob. properties: + action: + description: 'Action represents for the action of the ImagePrePullJob. + There are two possible action values: Success, Failure.' + type: string + event: + description: 'Event represents for the event of the ImagePrePullJob. + There are four possible event values: Init, Check, Pull, TimeOut.' + type: string + reason: + description: Reason represents for the reason of the ImagePrePullJob. + type: string state: description: 'State represents for the state phase of the ImagePrePullJob. - There are four possible state values: "", prechecking, prepulling, - successful, failed.' - enum: - - prepulling - - successful - - failed + There are five possible state values: "", checking, pulling, successful, + failed.' type: string status: description: Status contains image prepull status for each edge node. @@ -163,31 +180,42 @@ spec: description: 'State represents for the state phase of this image pull on the edge node There are two possible state values: successful, failed.' - enum: - - prepulling - - successful - - failed type: string type: object type: array - nodeName: - description: NodeName is the name of edge node. - type: string - reason: - description: Reason represents the fail reason if images prepull - failed on the edge node - type: string - state: - description: 'State represents for the state phase of the ImagePrepullJob - on the edge node. There are five possible state values: "", - prepulling, successful, failed.' - enum: - - prepulling - - successful - - failed - type: string + nodeStatus: + description: TaskStatus represents the status for each node + properties: + action: + description: 'Action represents for the action of the ImagePrePullJob. + There are three possible action values: Success, Failure, + TimeOut.' + type: string + event: + description: 'Event represents for the event of the ImagePrePullJob. + There are three possible event values: Init, Check, Pull.' + type: string + nodeName: + description: NodeName is the name of edge node. + type: string + reason: + description: Reason represents for the reason of the ImagePrePullJob. + type: string + state: + description: 'State represents for the upgrade state phase + of the edge node. There are several possible state values: + "", Upgrading, BackingUp, RollingBack and Checking.' + type: string + time: + description: Time represents for the running time of the + ImagePrePullJob. + type: string + type: object type: object type: array + time: + description: Time represents for the running time of the ImagePrePullJob. + type: string type: object required: - spec diff --git a/manifests/charts/cloudcore/crds/operations_v1alpha1_nodeupgradejob.yaml b/manifests/charts/cloudcore/crds/operations_v1alpha1_nodeupgradejob.yaml index 130df65ac..958272424 100644 --- a/manifests/charts/cloudcore/crds/operations_v1alpha1_nodeupgradejob.yaml +++ b/manifests/charts/cloudcore/crds/operations_v1alpha1_nodeupgradejob.yaml @@ -36,12 +36,22 @@ spec: spec: description: Specification of the desired behavior of NodeUpgradeJob. properties: + checkItems: + description: CheckItems specifies the items need to be checked before + the task is executed. The default CheckItems value is nil. + items: + type: string + type: array concurrency: description: Concurrency specifies the max number of edge nodes that can be upgraded at the same time. The default Concurrency value is 1. format: int32 type: integer + failureTolerate: + description: FailureTolerate specifies the task tolerance failure + ratio. The default FailureTolerate value is 0.1. + type: string image: description: 'Image specifies a container image name, the image contains: keadm and edgecore. keadm is used as upgradetool, to install the @@ -111,75 +121,71 @@ spec: job. Default to 300. If set to 0, we'll use the default value 300. format: int32 type: integer - upgradeTool: - description: UpgradeTool is a request to decide use which upgrade - tool. If it is empty, the upgrade job simply use default upgrade - tool keadm to do upgrade operation. - type: string version: type: string type: object status: description: Most recently observed status of the NodeUpgradeJob. properties: - state: - description: 'State represents for the state phase of the NodeUpgradeJob. - There are three possible state values: "", upgrading and completed.' - enum: - - upgrading - - completed + action: + description: 'Action represents for the action of the ImagePrePullJob. + There are two possible action values: Success, Failure.' type: string - status: + currentVersion: + description: CurrentVersion represents for the current status of the + EdgeCore. + type: string + event: + description: 'Event represents for the event of the ImagePrePullJob. + There are six possible event values: Init, Check, BackUp, Upgrade, + TimeOut, Rollback.' + type: string + historicVersion: + description: HistoricVersion represents for the historic status of + the EdgeCore. + type: string + nodeStatus: description: Status contains upgrade Status for each edge node. items: - description: UpgradeStatus stores the status of Upgrade for each - edge node. + description: TaskStatus stores the status of Upgrade for each edge + node. properties: - history: - description: History is the last upgrade result of the edge - node. - properties: - fromVersion: - description: FromVersion is the version which the edge node - is upgraded from. - type: string - historyID: - description: HistoryID is to uniquely identify an Upgrade - Operation. - type: string - reason: - description: Reason is the error reason of Upgrade failure. - If the upgrade is successful, this reason is an empty - string. - type: string - result: - description: Result represents the result of upgrade. - enum: - - upgrade_success - - upgrade_failed_rollback_success - - upgrade_failed_rollback_failed - type: string - toVersion: - description: ToVersion is the version which the edge node - is upgraded to. - type: string - upgradeTime: - description: UpgradeTime is the time of this Upgrade. - type: string - type: object + action: + description: 'Action represents for the action of the ImagePrePullJob. + There are three possible action values: Success, Failure, + TimeOut.' + type: string + event: + description: 'Event represents for the event of the ImagePrePullJob. + There are three possible event values: Init, Check, Pull.' + type: string nodeName: description: NodeName is the name of edge node. type: string + reason: + description: Reason represents for the reason of the ImagePrePullJob. + type: string state: description: 'State represents for the upgrade state phase of - the edge node. There are three possible state values: "", - upgrading and completed.' - enum: - - upgrading - - completed + the edge node. There are several possible state values: "", + Upgrading, BackingUp, RollingBack and Checking.' + type: string + time: + description: Time represents for the running time of the ImagePrePullJob. type: string type: object type: array + reason: + description: Reason represents for the reason of the ImagePrePullJob. + type: string + state: + description: 'State represents for the state phase of the NodeUpgradeJob. + There are several possible state values: "", Upgrading, BackingUp, + RollingBack and Checking.' + type: string + time: + description: Time represents for the running time of the ImagePrePullJob. + type: string type: object type: object served: true diff --git a/manifests/charts/cloudcore/templates/configmap_cloudcore.yaml b/manifests/charts/cloudcore/templates/configmap_cloudcore.yaml index 2986cfd91..23ca97d38 100644 --- a/manifests/charts/cloudcore/templates/configmap_cloudcore.yaml +++ b/manifests/charts/cloudcore/templates/configmap_cloudcore.yaml @@ -53,7 +53,5 @@ data: iptablesManager: enable: {{ .Values.iptablesManager.enable }} mode: {{ .Values.iptablesManager.mode }} - nodeUpgradeJobController: - enable: {{ .Values.cloudCore.modules.nodeUpgradeJobController.enable }} - imagePrePullController: - enable: {{ .Values.cloudCore.modules.imagePrePullController.enable }} + taskManager: + enable: {{ .Values.cloudCore.modules.taskManager.enable }} diff --git a/manifests/charts/cloudcore/values.yaml b/manifests/charts/cloudcore/values.yaml index a09482c03..db61c8e14 100644 --- a/manifests/charts/cloudcore/values.yaml +++ b/manifests/charts/cloudcore/values.yaml @@ -57,9 +57,7 @@ cloudCore: enable: false router: enable: false - nodeUpgradeJobController: - enable: false - imagePrePullController: + taskManager: enable: false service: enable: true diff --git a/manifests/profiles/version.yaml b/manifests/profiles/version.yaml index 5b3deb7e1..d5a1bf5c2 100644 --- a/manifests/profiles/version.yaml +++ b/manifests/profiles/version.yaml @@ -51,9 +51,7 @@ cloudCore: enable: false router: enable: false - nodeUpgradeJobController: - enable: false - imagePrePullController: + taskManager: enable: false service: enable: false diff --git a/pkg/apis/common.go b/pkg/apis/common.go new file mode 100644 index 000000000..6f556a651 --- /dev/null +++ b/pkg/apis/common.go @@ -0,0 +1,3 @@ +package apis + +const ISO8601UTC = "2006-01-02T15:04:05Z" diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go index 66128ec49..9e58cd38a 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go @@ -98,24 +98,14 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig { UpdateDeviceStatusWorkers: constants.DefaultUpdateDeviceStatusWorkers, }, }, - NodeUpgradeJobController: &NodeUpgradeJobController{ + TaskManager: &TaskManager{ Enable: false, - Buffer: &NodeUpgradeJobControllerBuffer{ - UpdateNodeUpgradeJobStatus: constants.DefaultNodeUpgradeJobStatusBuffer, - NodeUpgradeJobEvent: constants.DefaultNodeUpgradeJobEventBuffer, + Buffer: &TaskManagerBuffer{ + TaskStatus: constants.DefaultNodeUpgradeJobStatusBuffer, + TaskEvent: constants.DefaultNodeUpgradeJobEventBuffer, }, - Load: &NodeUpgradeJobControllerLoad{ - NodeUpgradeJobWorkers: constants.DefaultNodeUpgradeJobWorkers, - }, - }, - ImagePrePullController: &ImagePrePullController{ - Enable: false, - Buffer: &ImagePrePullControllerBuffer{ - ImagePrePullJobStatus: constants.DefaultImagePrePullJobStatusBuffer, - ImagePrePullJobEvent: constants.DefaultImagePrePullJobEventBuffer, - }, - Load: &ImagePrePullControllerLoad{ - ImagePrePullJobWorkers: constants.DefaultImagePrePullJobWorkers, + Load: &TaskManagerLoad{ + TaskWorkers: constants.DefaultNodeUpgradeJobWorkers, }, }, SyncController: &SyncController{ diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go index 955960fe0..db46c094b 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go @@ -88,10 +88,8 @@ type Modules struct { EdgeController *EdgeController `json:"edgeController,omitempty"` // DeviceController indicates DeviceController module config DeviceController *DeviceController `json:"deviceController,omitempty"` - // NodeUpgradeJobController indicates NodeUpgradeJobController module config - NodeUpgradeJobController *NodeUpgradeJobController `json:"nodeUpgradeJobController,omitempty"` - // ImagePrePullController indicates ImagePrePullController module config - ImagePrePullController *ImagePrePullController `json:"imagePrePullController,omitempty"` + // TaskManager indicates TaskManager module config + TaskManager *TaskManager `json:"taskManager,omitempty"` // SyncController indicates SyncController module config SyncController *SyncController `json:"syncController,omitempty"` // DynamicController indicates DynamicController module config @@ -381,33 +379,33 @@ type DeviceControllerLoad struct { UpdateDeviceStatusWorkers int32 `json:"updateDeviceStatusWorkers,omitempty"` } -// NodeUpgradeJobController indicates the operations controller -type NodeUpgradeJobController struct { - // Enable indicates whether NodeUpgradeJobController is enabled, - // if set to false (for debugging etc.), skip checking other NodeUpgradeJobController configs. +// TaskManager indicates the operations controller +type TaskManager struct { + // Enable indicates whether TaskManager is enabled, + // if set to false (for debugging etc.), skip checking other TaskManager configs. // default false Enable bool `json:"enable"` // Buffer indicates Operation Controller buffer - Buffer *NodeUpgradeJobControllerBuffer `json:"buffer,omitempty"` + Buffer *TaskManagerBuffer `json:"buffer,omitempty"` // Load indicates Operation Controller Load - Load *NodeUpgradeJobControllerLoad `json:"load,omitempty"` + Load *TaskManagerLoad `json:"load,omitempty"` } -// NodeUpgradeJobControllerBuffer indicates NodeUpgradeJobController buffer -type NodeUpgradeJobControllerBuffer struct { - // UpdateNodeUpgradeJobStatus indicates the buffer of update NodeUpgradeJob status +// TaskManagerBuffer indicates TaskManager buffer +type TaskManagerBuffer struct { + // TaskStatus indicates the buffer of update NodeUpgradeJob status // default 1024 - UpdateNodeUpgradeJobStatus int32 `json:"updateNodeUpgradeJobStatus,omitempty"` - // NodeUpgradeJobEvent indicates the buffer of NodeUpgradeJob event + TaskStatus int32 `json:"taskStatus,omitempty"` + // TaskEvent indicates the buffer of NodeUpgradeJob event // default 1 - NodeUpgradeJobEvent int32 `json:"nodeUpgradeJobEvent,omitempty"` + TaskEvent int32 `json:"taskEvent,omitempty"` } -// NodeUpgradeJobControllerLoad indicates the NodeUpgradeJobController load -type NodeUpgradeJobControllerLoad struct { - // NodeUpgradeJobWorkers indicates the load of update NodeUpgradeJob workers +// TaskManagerLoad indicates the TaskManager load +type TaskManagerLoad struct { + // TaskWorkers indicates the load of update NodeUpgradeJob workers // default 1 - NodeUpgradeJobWorkers int32 `json:"nodeUpgradeJobWorkers,omitempty"` + TaskWorkers int32 `json:"taskWorkers,omitempty"` } // ImagePrePullController indicates the operations controller diff --git a/cloud/pkg/nodeupgradejobcontroller/config/config.go b/pkg/apis/fsm/v1alpha1/backup_task.go index b98f9ea89..1fd6c3983 100644 --- a/cloud/pkg/nodeupgradejobcontroller/config/config.go +++ b/pkg/apis/fsm/v1alpha1/backup_task.go @@ -1,5 +1,5 @@ /* -Copyright 2022 The KubeEdge Authors. +Copyright 2023 The KubeEdge Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,25 +14,18 @@ See the License for the specific language governing permissions and limitations under the License. */ -package config +package v1alpha1 -import ( - "sync" - - "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1" +const ( + BackingUpState State = "BackingUp" ) -var Config Configure -var once sync.Once +var BackupRule = map[string]State{ + "/Init/Success": TaskChecking, -type Configure struct { - v1alpha1.NodeUpgradeJobController -} + "Checking/Check/Success": BackingUpState, + "Checking/Check/Failure": TaskFailed, -func InitConfigure(dc *v1alpha1.NodeUpgradeJobController) { - once.Do(func() { - Config = Configure{ - NodeUpgradeJobController: *dc, - } - }) + "BackingUp/Backup/Success": TaskSuccessful, + "BackingUp/Backup/Failure": TaskFailed, } diff --git a/pkg/apis/fsm/v1alpha1/fsm.go b/pkg/apis/fsm/v1alpha1/fsm.go new file mode 100644 index 000000000..f93c6a870 --- /dev/null +++ b/pkg/apis/fsm/v1alpha1/fsm.go @@ -0,0 +1,45 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +type State string + +type Action string + +const ( + NodeAvailable State = "Available" + NodeUpgrading State = "Upgrading" + NodeRollingBack State = "RollingBack" + NodeConfigUpdating State = "ConfigUpdating" +) + +const ( + TaskInit State = "Init" + TaskChecking State = "Checking" + TaskSuccessful State = "Successful" + TaskFailed State = "Failed" + TaskPause State = "Pause" +) + +const ( + ActionSuccess Action = "Success" + ActionFailure Action = "Failure" +) + +const ( + EventTimeOut = "TimeOut" +) diff --git a/pkg/apis/fsm/v1alpha1/image_prepull_task.go b/pkg/apis/fsm/v1alpha1/image_prepull_task.go new file mode 100644 index 000000000..30b91be3a --- /dev/null +++ b/pkg/apis/fsm/v1alpha1/image_prepull_task.go @@ -0,0 +1,42 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +const ( + PullingState State = "Pulling" +) + +// CurrentState/Event/Action: NextState +var PrePullRule = map[string]State{ + "Init/Init/Success": TaskChecking, + "Init/Init/Failure": TaskFailed, + "Init/TimeOut/Failure": TaskFailed, + + "Checking/Check/Success": PullingState, + "Checking/Check/Failure": TaskFailed, + "Checking/TimeOut/Failure": TaskFailed, + + "Pulling/Pull/Success": TaskSuccessful, + "Pulling/Pull/Failure": TaskFailed, + "Pulling/TimeOut/Failure": TaskFailed, +} + +var PrePullStageSequence = map[State]State{ + "": TaskChecking, + TaskInit: TaskChecking, + TaskChecking: PullingState, +} diff --git a/pkg/apis/fsm/v1alpha1/rollback_task.go b/pkg/apis/fsm/v1alpha1/rollback_task.go new file mode 100644 index 000000000..f90d50583 --- /dev/null +++ b/pkg/apis/fsm/v1alpha1/rollback_task.go @@ -0,0 +1,31 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +const ( + RollingBackState State = "RollingBack" +) + +var RollbackRule = map[string]State{ + "/Init/Success": TaskChecking, + + "Checking/Check/Success": RollingBackState, + "Checking/Check/Failure": TaskFailed, + + "RollingBack/Rollback/Failure": TaskFailed, + "RollingBack/Rollback/Success": TaskFailed, +} diff --git a/pkg/apis/fsm/v1alpha1/upgrade_task.go b/pkg/apis/fsm/v1alpha1/upgrade_task.go new file mode 100644 index 000000000..0f1102fb4 --- /dev/null +++ b/pkg/apis/fsm/v1alpha1/upgrade_task.go @@ -0,0 +1,61 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +const ( + UpgradingState State = "Upgrading" +) + +// CurrentState/Event/Action: NextState +var UpgradeRule = map[string]State{ + "Init/Init/Success": TaskChecking, + "Init/Init/Failure": TaskFailed, + "Init/TimeOut/Failure": TaskFailed, + "Init/Upgrade/Success": TaskSuccessful, + + "Checking/Check/Success": BackingUpState, + "Checking/Check/Failure": TaskFailed, + "Checking/TimeOut/Failure": TaskFailed, + + "BackingUp/Backup/Success": UpgradingState, + "BackingUp/Backup/Failure": TaskFailed, + "BackingUp/TimeOut/Failure": TaskFailed, + + "Upgrading/Upgrade/Success": TaskSuccessful, + "Upgrading/Upgrade/Failure": TaskFailed, + "Upgrading/TimeOut/Failure": TaskFailed, + + // TODO provide options for task failure, such as successful node upgrade rollback. + "RollingBack/Rollback/Failure": TaskFailed, + "RollingBack/TimeOut/Failure": TaskFailed, + "RollingBack/Rollback/Success": TaskFailed, + + "Upgrading/Rollback/Failure": TaskFailed, + "Upgrading/Rollback/Success": TaskFailed, + + //TODO delete in version 1.18 + "Init/Rollback/Failure": TaskFailed, + "Init/Rollback/Success": TaskFailed, +} + +var UpdateStageSequence = map[State]State{ + "": TaskChecking, + TaskInit: TaskChecking, + TaskChecking: BackingUpState, + BackingUpState: UpgradingState, + UpgradingState: RollingBackState, +} diff --git a/pkg/apis/operations/v1alpha1/imageprepull_types.go b/pkg/apis/operations/v1alpha1/imageprepull_types.go index 0ac53a975..7518c5d4a 100644 --- a/pkg/apis/operations/v1alpha1/imageprepull_types.go +++ b/pkg/apis/operations/v1alpha1/imageprepull_types.go @@ -18,6 +18,8 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" ) // +genclient @@ -84,42 +86,54 @@ type ImagePrePullTemplate struct { // +optional CheckItems []string `json:"checkItems,omitempty"` + // FailureTolerate specifies the task tolerance failure ratio. + // The default FailureTolerate value is 0.1. + // +optional + FailureTolerate string `json:"failureTolerate,omitempty"` + + // Concurrency specifies the maximum number of edge nodes that can pull images at the same time. + // The default Concurrency value is 1. + // +optional + Concurrency int32 `json:"concurrency,omitempty"` + + // TimeoutSeconds limits the duration of the node prepull job on each edgenode. + // Default to 300. + // If set to 0, we'll use the default value 300. + // +optional + TimeoutSeconds *uint32 `json:"timeoutSeconds,omitempty"` + // ImageSecret specifies the secret for image pull if private registry used. // Use {namespace}/{secretName} in format. // +optional ImageSecret string `json:"imageSecrets,omitempty"` - // TimeoutSecondsOnEachNode limits the duration of the image prepull job on each edgenode. - // Default to 360. - // If set to 0, we'll use the default value 360. - // +optional - TimeoutSecondsOnEachNode *uint32 `json:"timeoutSecondsOnEachNode,omitempty"` - // RetryTimes specifies the retry times if image pull failed on each edgenode. // Default to 0 // +optional RetryTimes int32 `json:"retryTimes,omitempty"` } -// PrePullState describe the PrePullState of image prepull operation on edge nodes. -// +kubebuilder:validation:Enum=prepulling;successful;failed -type PrePullState string - -// Valid values of PrepullState -const ( - PrePullInitialValue PrePullState = "" - PrePulling PrePullState = "prepulling" - PrePullSuccessful PrePullState = "successful" - PrePullFailed PrePullState = "failed" -) - // ImagePrePullJobStatus stores the status of ImagePrePullJob. // contains images prepull status on multiple edge nodes. // +kubebuilder:validation:Type=object type ImagePrePullJobStatus struct { // State represents for the state phase of the ImagePrePullJob. - // There are four possible state values: "", prechecking, prepulling, successful, failed. - State PrePullState `json:"state,omitempty"` + // There are five possible state values: "", checking, pulling, successful, failed. + State api.State `json:"state,omitempty"` + + // Event represents for the event of the ImagePrePullJob. + // There are four possible event values: Init, Check, Pull, TimeOut. + Event string `json:"event,omitempty"` + + // Action represents for the action of the ImagePrePullJob. + // There are two possible action values: Success, Failure. + Action api.Action `json:"action,omitempty"` + + // Reason represents for the reason of the ImagePrePullJob. + Reason string `json:"reason,omitempty"` + + // Time represents for the running time of the ImagePrePullJob. + Time string `json:"time,omitempty"` // Status contains image prepull status for each edge node. Status []ImagePrePullStatus `json:"status,omitempty"` @@ -128,16 +142,8 @@ type ImagePrePullJobStatus struct { // ImagePrePullStatus stores image prepull status for each edge node. // +kubebuilder:validation:Type=object type ImagePrePullStatus struct { - // NodeName is the name of edge node. - NodeName string `json:"nodeName,omitempty"` - - // State represents for the state phase of the ImagePrepullJob on the edge node. - // There are five possible state values: "", prepulling, successful, failed. - State PrePullState `json:"state,omitempty"` - - // Reason represents the fail reason if images prepull failed on the edge node - Reason string `json:"reason,omitempty"` - + // TaskStatus represents the status for each node + *TaskStatus `json:"nodeStatus,omitempty"` // ImageStatus represents the prepull status for each image ImageStatus []ImageStatus `json:"imageStatus,omitempty"` } @@ -150,7 +156,7 @@ type ImageStatus struct { // State represents for the state phase of this image pull on the edge node // There are two possible state values: successful, failed. - State PrePullState `json:"state,omitempty"` + State api.State `json:"state,omitempty"` // Reason represents the fail reason if image pull failed // +optional diff --git a/pkg/apis/operations/v1alpha1/type.go b/pkg/apis/operations/v1alpha1/type.go index 1b45d5860..09b42c13f 100644 --- a/pkg/apis/operations/v1alpha1/type.go +++ b/pkg/apis/operations/v1alpha1/type.go @@ -18,6 +18,8 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" ) // +genclient @@ -58,10 +60,6 @@ type NodeUpgradeJobList struct { type NodeUpgradeJobSpec struct { // +Required: Version is the EdgeCore version to upgrade. Version string `json:"version,omitempty"` - // UpgradeTool is a request to decide use which upgrade tool. If it is empty, - // the upgrade job simply use default upgrade tool keadm to do upgrade operation. - // +optional - UpgradeTool string `json:"upgradeTool,omitempty"` // TimeoutSeconds limits the duration of the node upgrade job. // Default to 300. // If set to 0, we'll use the default value 300. @@ -91,67 +89,60 @@ type NodeUpgradeJobSpec struct { // The default Concurrency value is 1. // +optional Concurrency int32 `json:"concurrency,omitempty"` -} - -// UpgradeResult describe the result status of upgrade operation on edge nodes. -// +kubebuilder:validation:Enum=upgrade_success;upgrade_failed_rollback_success;upgrade_failed_rollback_failed -type UpgradeResult string -// upgrade operation status -const ( - UpgradeSuccess UpgradeResult = "upgrade_success" - UpgradeFailedRollbackSuccess UpgradeResult = "upgrade_failed_rollback_success" - UpgradeFailedRollbackFailed UpgradeResult = "upgrade_failed_rollback_failed" -) - -// UpgradeState describe the UpgradeState of upgrade operation on edge nodes. -// +kubebuilder:validation:Enum=upgrading;completed -type UpgradeState string + // CheckItems specifies the items need to be checked before the task is executed. + // The default CheckItems value is nil. + // +optional + CheckItems []string `json:"checkItems,omitempty"` -// Valid values of UpgradeState -const ( - InitialValue UpgradeState = "" - Upgrading UpgradeState = "upgrading" - Completed UpgradeState = "completed" -) + // FailureTolerate specifies the task tolerance failure ratio. + // The default FailureTolerate value is 0.1. + // +optional + FailureTolerate string `json:"failureTolerate,omitempty"` +} // NodeUpgradeJobStatus stores the status of NodeUpgradeJob. // contains multiple edge nodes upgrade status. // +kubebuilder:validation:Type=object type NodeUpgradeJobStatus struct { // State represents for the state phase of the NodeUpgradeJob. - // There are three possible state values: "", upgrading and completed. - State UpgradeState `json:"state,omitempty"` + // There are several possible state values: "", Upgrading, BackingUp, RollingBack and Checking. + State api.State `json:"state,omitempty"` + + // CurrentVersion represents for the current status of the EdgeCore. + CurrentVersion string `json:"currentVersion,omitempty"` + // HistoricVersion represents for the historic status of the EdgeCore. + HistoricVersion string `json:"historicVersion,omitempty"` + // Event represents for the event of the ImagePrePullJob. + // There are six possible event values: Init, Check, BackUp, Upgrade, TimeOut, Rollback. + Event string `json:"event,omitempty"` + // Action represents for the action of the ImagePrePullJob. + // There are two possible action values: Success, Failure. + Action api.Action `json:"action,omitempty"` + // Reason represents for the reason of the ImagePrePullJob. + Reason string `json:"reason,omitempty"` + // Time represents for the running time of the ImagePrePullJob. + Time string `json:"time,omitempty"` // Status contains upgrade Status for each edge node. - Status []UpgradeStatus `json:"status,omitempty"` + Status []TaskStatus `json:"nodeStatus,omitempty"` } -// UpgradeStatus stores the status of Upgrade for each edge node. +// TaskStatus stores the status of Upgrade for each edge node. // +kubebuilder:validation:Type=object -type UpgradeStatus struct { +type TaskStatus struct { // NodeName is the name of edge node. NodeName string `json:"nodeName,omitempty"` // State represents for the upgrade state phase of the edge node. - // There are three possible state values: "", upgrading and completed. - State UpgradeState `json:"state,omitempty"` - // History is the last upgrade result of the edge node. - History History `json:"history,omitempty"` -} - -// History stores the information about upgrade history record. -// +kubebuilder:validation:Type=object -type History struct { - // HistoryID is to uniquely identify an Upgrade Operation. - HistoryID string `json:"historyID,omitempty"` - // FromVersion is the version which the edge node is upgraded from. - FromVersion string `json:"fromVersion,omitempty"` - // ToVersion is the version which the edge node is upgraded to. - ToVersion string `json:"toVersion,omitempty"` - // Result represents the result of upgrade. - Result UpgradeResult `json:"result,omitempty"` - // Reason is the error reason of Upgrade failure. - // If the upgrade is successful, this reason is an empty string. + // There are several possible state values: "", Upgrading, BackingUp, RollingBack and Checking. + State api.State `json:"state,omitempty"` + // Event represents for the event of the ImagePrePullJob. + // There are three possible event values: Init, Check, Pull. + Event string `json:"event,omitempty"` + // Action represents for the action of the ImagePrePullJob. + // There are three possible action values: Success, Failure, TimeOut. + Action api.Action `json:"action,omitempty"` + // Reason represents for the reason of the ImagePrePullJob. Reason string `json:"reason,omitempty"` - // UpgradeTime is the time of this Upgrade. - UpgradeTime string `json:"upgradeTime,omitempty"` + // Time represents for the running time of the ImagePrePullJob. + Time string `json:"time,omitempty"` } diff --git a/pkg/apis/operations/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/operations/v1alpha1/zz_generated.deepcopy.go index 8b38fd4ec..bf26dabe8 100644 --- a/pkg/apis/operations/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/operations/v1alpha1/zz_generated.deepcopy.go @@ -27,22 +27,6 @@ import ( ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *History) DeepCopyInto(out *History) { - *out = *in - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new History. -func (in *History) DeepCopy() *History { - if in == nil { - return nil - } - out := new(History) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ImagePrePullJob) DeepCopyInto(out *ImagePrePullJob) { *out = *in out.TypeMeta = in.TypeMeta @@ -146,6 +130,11 @@ func (in *ImagePrePullJobStatus) DeepCopy() *ImagePrePullJobStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ImagePrePullStatus) DeepCopyInto(out *ImagePrePullStatus) { *out = *in + if in.TaskStatus != nil { + in, out := &in.TaskStatus, &out.TaskStatus + *out = new(TaskStatus) + **out = **in + } if in.ImageStatus != nil { in, out := &in.ImageStatus, &out.ImageStatus *out = make([]ImageStatus, len(*in)) @@ -187,8 +176,8 @@ func (in *ImagePrePullTemplate) DeepCopyInto(out *ImagePrePullTemplate) { *out = make([]string, len(*in)) copy(*out, *in) } - if in.TimeoutSecondsOnEachNode != nil { - in, out := &in.TimeoutSecondsOnEachNode, &out.TimeoutSecondsOnEachNode + if in.TimeoutSeconds != nil { + in, out := &in.TimeoutSeconds, &out.TimeoutSeconds *out = new(uint32) **out = **in } @@ -300,6 +289,11 @@ func (in *NodeUpgradeJobSpec) DeepCopyInto(out *NodeUpgradeJobSpec) { *out = new(v1.LabelSelector) (*in).DeepCopyInto(*out) } + if in.CheckItems != nil { + in, out := &in.CheckItems, &out.CheckItems + *out = make([]string, len(*in)) + copy(*out, *in) + } return } @@ -318,7 +312,7 @@ func (in *NodeUpgradeJobStatus) DeepCopyInto(out *NodeUpgradeJobStatus) { *out = *in if in.Status != nil { in, out := &in.Status, &out.Status - *out = make([]UpgradeStatus, len(*in)) + *out = make([]TaskStatus, len(*in)) copy(*out, *in) } return @@ -335,18 +329,17 @@ func (in *NodeUpgradeJobStatus) DeepCopy() *NodeUpgradeJobStatus { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *UpgradeStatus) DeepCopyInto(out *UpgradeStatus) { +func (in *TaskStatus) DeepCopyInto(out *TaskStatus) { *out = *in - out.History = in.History return } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpgradeStatus. -func (in *UpgradeStatus) DeepCopy() *UpgradeStatus { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskStatus. +func (in *TaskStatus) DeepCopy() *TaskStatus { if in == nil { return nil } - out := new(UpgradeStatus) + out := new(TaskStatus) in.DeepCopyInto(out) return out } diff --git a/pkg/util/fsm/fsm.go b/pkg/util/fsm/fsm.go new file mode 100644 index 000000000..03bbd21f0 --- /dev/null +++ b/pkg/util/fsm/fsm.go @@ -0,0 +1,139 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fsm + +import ( + "fmt" + + "k8s.io/klog/v2" + + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" +) + +type FSM struct { + id string + nodeName string + lastState api.State + currentFunc func(id, nodeName string) (api.State, error) + updateFunc func(id, nodeName string, state api.State, event Event) error + guard map[string]api.State + stageSequence map[api.State]api.State +} + +func (F *FSM) NodeName(nodeName string) *FSM { + F.nodeName = nodeName + return F +} + +type Event struct { + Type string + Action api.Action + Msg string + ExternalMessage string +} + +func (e Event) UniqueName() string { + return e.Type + "/" + string(e.Action) +} + +func (F *FSM) ID(id string) *FSM { + F.id = id + return F +} + +func (F *FSM) LastState(lastState api.State) { + F.lastState = lastState +} + +func (F *FSM) CurrentFunc(currentFunc func(id, nodeName string) (api.State, error)) *FSM { + F.currentFunc = currentFunc + return F +} + +func (F *FSM) UpdateFunc(updateFunc func(id, nodeName string, state api.State, event Event) error) *FSM { + F.updateFunc = updateFunc + return F +} + +func (F *FSM) Guard(guard map[string]api.State) *FSM { + F.guard = guard + return F +} + +func (F *FSM) StageSequence(stageSequence map[api.State]api.State) *FSM { + F.stageSequence = stageSequence + return F +} + +func (F *FSM) CurrentState() (api.State, error) { + if F.currentFunc == nil { + return "", fmt.Errorf("currentFunc is nil") + } + return F.currentFunc(F.id, F.nodeName) +} + +func (F *FSM) transitCheck(event Event) (api.State, api.State, error) { + currentState, err := F.CurrentState() + if err != nil { + return "", "", err + } + if F.guard == nil { + return "", "", fmt.Errorf("guard is nil ") + } + nextState, ok := F.guard[string(currentState)+"/"+event.UniqueName()] + if !ok { + return "", "", fmt.Errorf(string(currentState)+"/"+event.UniqueName(), " unsupported event") + } + return currentState, nextState, nil +} + +func (F *FSM) AllowTransit(event Event) error { + _, _, err := F.transitCheck(event) + return err +} + +func (F *FSM) Transit(event Event) error { + currentState, nextState, err := F.transitCheck(event) + if err != nil { + return err + } + if F.updateFunc == nil { + return fmt.Errorf("updateFunc is nil") + } + err = F.updateFunc(F.id, F.nodeName, nextState, event) + if err != nil { + return err + } + F.lastState = currentState + return nil +} + +func TaskFinish(state api.State) bool { + return state == api.TaskFailed || state == api.TaskSuccessful +} + +func (F *FSM) TaskStagCompleted(state api.State) bool { + currentState, err := F.CurrentState() + if err != nil { + klog.Error("get %s current state failed: %s", F.id, err.Error()) + return false + } + if F.stageSequence[currentState] == state || TaskFinish(state) { + return true + } + return false +} |
