summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com>2024-01-17 17:50:27 +0800
committerGitHub <noreply@github.com>2024-01-17 17:50:27 +0800
commitdadbeb0e72565822afca48cb3f66a17473d7d585 (patch)
tree253eded79262e2f0d83a3da66ae945c9985eec0f
parentMerge pull request #5321 from luomengY/ci-cri (diff)
parentsupport edge nodes upgrade and image pre pull (diff)
downloadkubeedge-dadbeb0e72565822afca48cb3f66a17473d7d585.tar.gz
Merge pull request #5330 from ZhengXinwei-F/task-manager
Implement task manager to complete cloud edge task execution
-rw-r--r--build/crd-samples/operations/imageprepulljob.yaml2
-rw-r--r--build/crd-samples/operations/nodeupgradejob.yaml11
-rw-r--r--build/crds/operations/operations_v1alpha1_imageprepulljob.yaml88
-rw-r--r--build/crds/operations/operations_v1alpha1_nodeupgradejob.yaml108
-rw-r--r--cloud/cmd/cloudcore/app/server.go6
-rw-r--r--cloud/pkg/cloudhub/dispatcher/message_dispatcher.go11
-rw-r--r--cloud/pkg/cloudhub/servers/httpserver/report_task_status.go80
-rw-r--r--cloud/pkg/cloudhub/servers/httpserver/server.go1
-rw-r--r--cloud/pkg/cloudhub/servers/httpserver/upgrade.go64
-rw-r--r--cloud/pkg/common/messagelayer/context.go12
-rw-r--r--cloud/pkg/common/modules/modules.go4
-rw-r--r--cloud/pkg/imageprepullcontroller/controller/downstream.go268
-rw-r--r--cloud/pkg/imageprepullcontroller/controller/util.go131
-rw-r--r--cloud/pkg/imageprepullcontroller/imageprepullcontroller.go91
-rw-r--r--cloud/pkg/imageprepullcontroller/manager/common.go62
-rw-r--r--cloud/pkg/imageprepullcontroller/manager/imageprepull.go52
-rw-r--r--cloud/pkg/nodeupgradejobcontroller/controller/downstream.go447
-rw-r--r--cloud/pkg/nodeupgradejobcontroller/controller/upstream.go244
-rw-r--r--cloud/pkg/nodeupgradejobcontroller/controller/util.go124
-rw-r--r--cloud/pkg/nodeupgradejobcontroller/nodeupgradejobcontroller.go91
-rw-r--r--cloud/pkg/taskmanager/config/config.go (renamed from cloud/pkg/imageprepullcontroller/config/config.go)6
-rw-r--r--cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_controller.go338
-rw-r--r--cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_task.go133
-rw-r--r--cloud/pkg/taskmanager/manager/downstream.go64
-rw-r--r--cloud/pkg/taskmanager/manager/executor.go433
-rw-r--r--cloud/pkg/taskmanager/manager/upstream.go (renamed from cloud/pkg/imageprepullcontroller/controller/upstream.go)122
-rw-r--r--cloud/pkg/taskmanager/nodeupgradecontroller/node_upgrade_controller.go386
-rw-r--r--cloud/pkg/taskmanager/nodeupgradecontroller/upgrade_task.go122
-rw-r--r--cloud/pkg/taskmanager/task_manager.go123
-rw-r--r--cloud/pkg/taskmanager/util/controller/controller.go169
-rw-r--r--cloud/pkg/taskmanager/util/manager/common.go (renamed from cloud/pkg/nodeupgradejobcontroller/manager/common.go)2
-rw-r--r--cloud/pkg/taskmanager/util/manager/task_cache.go (renamed from cloud/pkg/nodeupgradejobcontroller/manager/nodeupgrade.go)22
-rw-r--r--cloud/pkg/taskmanager/util/util.go181
-rw-r--r--cloud/pkg/taskmanager/util/util_test.go (renamed from cloud/pkg/nodeupgradejobcontroller/controller/util_test.go)96
-rw-r--r--common/constants/default.go3
-rw-r--r--common/types/types.go35
-rw-r--r--edge/pkg/common/util/util.go32
-rw-r--r--edge/pkg/edgehub/edgehub.go4
-rw-r--r--edge/pkg/edgehub/task/task_handler.go78
-rw-r--r--edge/pkg/edgehub/task/taskexecutor/image_prepull.go184
-rw-r--r--edge/pkg/edgehub/task/taskexecutor/node_backup.go106
-rw-r--r--edge/pkg/edgehub/task/taskexecutor/node_rollback.go72
-rw-r--r--edge/pkg/edgehub/task/taskexecutor/node_upgrade.go (renamed from edge/pkg/edgehub/upgrade/upgrade.go)169
-rw-r--r--edge/pkg/edgehub/task/taskexecutor/pre_check.go158
-rw-r--r--edge/pkg/edgehub/task/taskexecutor/task_executor.go93
-rw-r--r--edge/pkg/edgehub/upgrade/image_prepull.go188
-rw-r--r--edge/pkg/edgehub/upgrade/upgrade_test.go56
-rw-r--r--keadm/cmd/keadm/app/cmd/cmd_others.go2
-rw-r--r--keadm/cmd/keadm/app/cmd/edge/rollback.go113
-rw-r--r--keadm/cmd/keadm/app/cmd/edge/upgrade.go173
-rw-r--r--keadm/cmd/keadm/app/cmd/util/common.go63
-rw-r--r--manifests/charts/cloudcore/crds/operations_v1alpha1_imageprepulljob.yaml88
-rw-r--r--manifests/charts/cloudcore/crds/operations_v1alpha1_nodeupgradejob.yaml108
-rw-r--r--manifests/charts/cloudcore/templates/configmap_cloudcore.yaml6
-rw-r--r--manifests/charts/cloudcore/values.yaml4
-rw-r--r--manifests/profiles/version.yaml4
-rw-r--r--pkg/apis/common.go3
-rw-r--r--pkg/apis/componentconfig/cloudcore/v1alpha1/default.go22
-rw-r--r--pkg/apis/componentconfig/cloudcore/v1alpha1/types.go38
-rw-r--r--pkg/apis/fsm/v1alpha1/backup_task.go (renamed from cloud/pkg/nodeupgradejobcontroller/config/config.go)27
-rw-r--r--pkg/apis/fsm/v1alpha1/fsm.go45
-rw-r--r--pkg/apis/fsm/v1alpha1/image_prepull_task.go42
-rw-r--r--pkg/apis/fsm/v1alpha1/rollback_task.go31
-rw-r--r--pkg/apis/fsm/v1alpha1/upgrade_task.go61
-rw-r--r--pkg/apis/operations/v1alpha1/imageprepull_types.go68
-rw-r--r--pkg/apis/operations/v1alpha1/type.go93
-rw-r--r--pkg/apis/operations/v1alpha1/zz_generated.deepcopy.go41
-rw-r--r--pkg/util/fsm/fsm.go139
68 files changed, 3981 insertions, 2462 deletions
diff --git a/build/crd-samples/operations/imageprepulljob.yaml b/build/crd-samples/operations/imageprepulljob.yaml
index cebfbe27f..940a99975 100644
--- a/build/crd-samples/operations/imageprepulljob.yaml
+++ b/build/crd-samples/operations/imageprepulljob.yaml
@@ -11,4 +11,4 @@ spec:
nodes:
- edgenode1 # Need to replaced with your own node name
timeoutSecondsOnEachNode: 300
- retryTimesOnEachNode: 1 \ No newline at end of file
+ retryTimes: 1 \ No newline at end of file
diff --git a/build/crd-samples/operations/nodeupgradejob.yaml b/build/crd-samples/operations/nodeupgradejob.yaml
index 5b97287d7..2f5f730c8 100644
--- a/build/crd-samples/operations/nodeupgradejob.yaml
+++ b/build/crd-samples/operations/nodeupgradejob.yaml
@@ -5,10 +5,15 @@ metadata:
labels:
description: upgrade-label
spec:
- version: "v1.10.0"
- timeoutSeconds: 60
+ version: "v1.16.0"
+ checkItems:
+ - "cpu"
+ - "mem"
+ - "disk"
+ failureTolerate: "0.3"
+ concurrency: 2
+ timeoutSeconds: 180
labelSelector:
matchLabels:
"node-role.kubernetes.io/edge": ""
node-role.kubernetes.io/agent: ""
-
diff --git a/build/crds/operations/operations_v1alpha1_imageprepulljob.yaml b/build/crds/operations/operations_v1alpha1_imageprepulljob.yaml
index 903516d32..a044a1532 100644
--- a/build/crds/operations/operations_v1alpha1_imageprepulljob.yaml
+++ b/build/crds/operations/operations_v1alpha1_imageprepulljob.yaml
@@ -48,6 +48,16 @@ spec:
items:
type: string
type: array
+ concurrency:
+ description: Concurrency specifies the maximum number of edge
+ nodes that can pull images at the same time. The default Concurrency
+ value is 1.
+ format: int32
+ type: integer
+ failureTolerate:
+ description: FailureTolerate specifies the task tolerance failure
+ ratio. The default FailureTolerate value is 0.1.
+ type: string
imageSecrets:
description: ImageSecret specifies the secret for image pull if
private registry used. Use {namespace}/{secretName} in format.
@@ -119,10 +129,10 @@ spec:
failed on each edgenode. Default to 0
format: int32
type: integer
- timeoutSecondsOnEachNode:
- description: TimeoutSecondsOnEachNode limits the duration of the
- image prepull job on each edgenode. Default to 360. If set to
- 0, we'll use the default value 360.
+ timeoutSeconds:
+ description: TimeoutSeconds limits the duration of the node prepull
+ job on each edgenode. Default to 300. If set to 0, we'll use
+ the default value 300.
format: int32
type: integer
type: object
@@ -130,14 +140,21 @@ spec:
status:
description: Status represents the status of ImagePrePullJob.
properties:
+ action:
+ description: 'Action represents for the action of the ImagePrePullJob.
+ There are two possible action values: Success, Failure.'
+ type: string
+ event:
+ description: 'Event represents for the event of the ImagePrePullJob.
+ There are four possible event values: Init, Check, Pull, TimeOut.'
+ type: string
+ reason:
+ description: Reason represents for the reason of the ImagePrePullJob.
+ type: string
state:
description: 'State represents for the state phase of the ImagePrePullJob.
- There are four possible state values: "", prechecking, prepulling,
- successful, failed.'
- enum:
- - prepulling
- - successful
- - failed
+ There are five possible state values: "", checking, pulling, successful,
+ failed.'
type: string
status:
description: Status contains image prepull status for each edge node.
@@ -163,31 +180,42 @@ spec:
description: 'State represents for the state phase of
this image pull on the edge node There are two possible
state values: successful, failed.'
- enum:
- - prepulling
- - successful
- - failed
type: string
type: object
type: array
- nodeName:
- description: NodeName is the name of edge node.
- type: string
- reason:
- description: Reason represents the fail reason if images prepull
- failed on the edge node
- type: string
- state:
- description: 'State represents for the state phase of the ImagePrepullJob
- on the edge node. There are five possible state values: "",
- prepulling, successful, failed.'
- enum:
- - prepulling
- - successful
- - failed
- type: string
+ nodeStatus:
+ description: TaskStatus represents the status for each node
+ properties:
+ action:
+ description: 'Action represents for the action of the ImagePrePullJob.
+ There are three possible action values: Success, Failure,
+ TimeOut.'
+ type: string
+ event:
+ description: 'Event represents for the event of the ImagePrePullJob.
+ There are three possible event values: Init, Check, Pull.'
+ type: string
+ nodeName:
+ description: NodeName is the name of edge node.
+ type: string
+ reason:
+ description: Reason represents for the reason of the ImagePrePullJob.
+ type: string
+ state:
+ description: 'State represents for the upgrade state phase
+ of the edge node. There are several possible state values:
+ "", Upgrading, BackingUp, RollingBack and Checking.'
+ type: string
+ time:
+ description: Time represents for the running time of the
+ ImagePrePullJob.
+ type: string
+ type: object
type: object
type: array
+ time:
+ description: Time represents for the running time of the ImagePrePullJob.
+ type: string
type: object
required:
- spec
diff --git a/build/crds/operations/operations_v1alpha1_nodeupgradejob.yaml b/build/crds/operations/operations_v1alpha1_nodeupgradejob.yaml
index 130df65ac..958272424 100644
--- a/build/crds/operations/operations_v1alpha1_nodeupgradejob.yaml
+++ b/build/crds/operations/operations_v1alpha1_nodeupgradejob.yaml
@@ -36,12 +36,22 @@ spec:
spec:
description: Specification of the desired behavior of NodeUpgradeJob.
properties:
+ checkItems:
+ description: CheckItems specifies the items need to be checked before
+ the task is executed. The default CheckItems value is nil.
+ items:
+ type: string
+ type: array
concurrency:
description: Concurrency specifies the max number of edge nodes that
can be upgraded at the same time. The default Concurrency value
is 1.
format: int32
type: integer
+ failureTolerate:
+ description: FailureTolerate specifies the task tolerance failure
+ ratio. The default FailureTolerate value is 0.1.
+ type: string
image:
description: 'Image specifies a container image name, the image contains:
keadm and edgecore. keadm is used as upgradetool, to install the
@@ -111,75 +121,71 @@ spec:
job. Default to 300. If set to 0, we'll use the default value 300.
format: int32
type: integer
- upgradeTool:
- description: UpgradeTool is a request to decide use which upgrade
- tool. If it is empty, the upgrade job simply use default upgrade
- tool keadm to do upgrade operation.
- type: string
version:
type: string
type: object
status:
description: Most recently observed status of the NodeUpgradeJob.
properties:
- state:
- description: 'State represents for the state phase of the NodeUpgradeJob.
- There are three possible state values: "", upgrading and completed.'
- enum:
- - upgrading
- - completed
+ action:
+ description: 'Action represents for the action of the ImagePrePullJob.
+ There are two possible action values: Success, Failure.'
type: string
- status:
+ currentVersion:
+ description: CurrentVersion represents for the current status of the
+ EdgeCore.
+ type: string
+ event:
+ description: 'Event represents for the event of the ImagePrePullJob.
+ There are six possible event values: Init, Check, BackUp, Upgrade,
+ TimeOut, Rollback.'
+ type: string
+ historicVersion:
+ description: HistoricVersion represents for the historic status of
+ the EdgeCore.
+ type: string
+ nodeStatus:
description: Status contains upgrade Status for each edge node.
items:
- description: UpgradeStatus stores the status of Upgrade for each
- edge node.
+ description: TaskStatus stores the status of Upgrade for each edge
+ node.
properties:
- history:
- description: History is the last upgrade result of the edge
- node.
- properties:
- fromVersion:
- description: FromVersion is the version which the edge node
- is upgraded from.
- type: string
- historyID:
- description: HistoryID is to uniquely identify an Upgrade
- Operation.
- type: string
- reason:
- description: Reason is the error reason of Upgrade failure.
- If the upgrade is successful, this reason is an empty
- string.
- type: string
- result:
- description: Result represents the result of upgrade.
- enum:
- - upgrade_success
- - upgrade_failed_rollback_success
- - upgrade_failed_rollback_failed
- type: string
- toVersion:
- description: ToVersion is the version which the edge node
- is upgraded to.
- type: string
- upgradeTime:
- description: UpgradeTime is the time of this Upgrade.
- type: string
- type: object
+ action:
+ description: 'Action represents for the action of the ImagePrePullJob.
+ There are three possible action values: Success, Failure,
+ TimeOut.'
+ type: string
+ event:
+ description: 'Event represents for the event of the ImagePrePullJob.
+ There are three possible event values: Init, Check, Pull.'
+ type: string
nodeName:
description: NodeName is the name of edge node.
type: string
+ reason:
+ description: Reason represents for the reason of the ImagePrePullJob.
+ type: string
state:
description: 'State represents for the upgrade state phase of
- the edge node. There are three possible state values: "",
- upgrading and completed.'
- enum:
- - upgrading
- - completed
+ the edge node. There are several possible state values: "",
+ Upgrading, BackingUp, RollingBack and Checking.'
+ type: string
+ time:
+ description: Time represents for the running time of the ImagePrePullJob.
type: string
type: object
type: array
+ reason:
+ description: Reason represents for the reason of the ImagePrePullJob.
+ type: string
+ state:
+ description: 'State represents for the state phase of the NodeUpgradeJob.
+ There are several possible state values: "", Upgrading, BackingUp,
+ RollingBack and Checking.'
+ type: string
+ time:
+ description: Time represents for the running time of the ImagePrePullJob.
+ type: string
type: object
type: object
served: true
diff --git a/cloud/cmd/cloudcore/app/server.go b/cloud/cmd/cloudcore/app/server.go
index fd346cd72..c4a7a2388 100644
--- a/cloud/cmd/cloudcore/app/server.go
+++ b/cloud/cmd/cloudcore/app/server.go
@@ -48,11 +48,10 @@ import (
"github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller"
"github.com/kubeedge/kubeedge/cloud/pkg/dynamiccontroller"
"github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller"
- "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller"
- "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller"
"github.com/kubeedge/kubeedge/cloud/pkg/policycontroller"
"github.com/kubeedge/kubeedge/cloud/pkg/router"
"github.com/kubeedge/kubeedge/cloud/pkg/synccontroller"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager"
"github.com/kubeedge/kubeedge/common/constants"
"github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1"
"github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1/validation"
@@ -159,8 +158,7 @@ func registerModules(c *v1alpha1.CloudCoreConfig) {
cloudhub.Register(c.Modules.CloudHub)
edgecontroller.Register(c.Modules.EdgeController)
devicecontroller.Register(c.Modules.DeviceController)
- imageprepullcontroller.Register(c.Modules.ImagePrePullController)
- nodeupgradejobcontroller.Register(c.Modules.NodeUpgradeJobController)
+ taskmanager.Register(c.Modules.TaskManager)
synccontroller.Register(c.Modules.SyncController)
cloudstream.Register(c.Modules.CloudStream, c.CommonConfig)
router.Register(c.Modules.Router)
diff --git a/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go b/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go
index a236d3ebd..18382f13b 100644
--- a/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go
+++ b/cloud/pkg/cloudhub/dispatcher/message_dispatcher.go
@@ -33,8 +33,8 @@ import (
"github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/session"
"github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer"
"github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
- imageprepullcontroller "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/controller"
"github.com/kubeedge/kubeedge/cloud/pkg/synccontroller"
+ taskutil "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util"
commonconst "github.com/kubeedge/kubeedge/common/constants"
v2 "github.com/kubeedge/kubeedge/edge/pkg/metamanager/dao/v2"
"github.com/kubeedge/kubeedge/pkg/apis/reliablesyncs/v1alpha1"
@@ -184,8 +184,9 @@ func (md *messageDispatcher) DispatchUpstream(message *beehivemodel.Message, inf
message.Router.Resource = fmt.Sprintf("node/%s/%s", info.NodeID, message.Router.Resource)
beehivecontext.Send(modules.RouterModuleName, *message)
- case message.GetOperation() == imageprepullcontroller.ImagePrePull:
- beehivecontext.SendToGroup(modules.ImagePrePullControllerModuleGroup, *message)
+ case message.GetOperation() == taskutil.TaskPrePull ||
+ message.GetOperation() == taskutil.TaskUpgrade:
+ beehivecontext.SendToGroup(modules.TaskManagerModuleGroup, *message)
default:
err := md.PubToController(info, message)
@@ -451,7 +452,9 @@ func noAckRequired(msg *beehivemodel.Message) bool {
return true
case msg.GetGroup() == modules.UserGroup:
return true
- case msg.GetSource() == modules.NodeUpgradeJobControllerModuleName || msg.GetSource() == modules.ImagePrePullControllerModuleName:
+ case msg.GetSource() == modules.TaskManagerModuleName:
+ return true
+ case msg.GetSource() == modules.NodeUpgradeJobControllerModuleName:
return true
case msg.GetOperation() == beehivemodel.ResponseOperation:
content, ok := msg.Content.(string)
diff --git a/cloud/pkg/cloudhub/servers/httpserver/report_task_status.go b/cloud/pkg/cloudhub/servers/httpserver/report_task_status.go
new file mode 100644
index 000000000..784470502
--- /dev/null
+++ b/cloud/pkg/cloudhub/servers/httpserver/report_task_status.go
@@ -0,0 +1,80 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package httpserver
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+
+ "github.com/emicklei/go-restful"
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/klog/v2"
+
+ beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
+ beehiveModel "github.com/kubeedge/beehive/pkg/core/model"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
+ "github.com/kubeedge/kubeedge/common/types"
+)
+
+const (
+ millionByte = int64(3 * 1024 * 1024)
+)
+
+// reportTaskStatus report the status of task
+func reportTaskStatus(request *restful.Request, response *restful.Response) {
+ resp := types.NodeTaskResponse{}
+ taskID := request.PathParameter("taskID")
+ taskType := request.PathParameter("taskType")
+ nodeID := request.PathParameter("nodeID")
+
+ lr := &io.LimitedReader{
+ R: request.Request.Body,
+ N: millionByte + 1,
+ }
+ body, err := io.ReadAll(lr)
+ if err != nil {
+ err = response.WriteError(http.StatusBadRequest, fmt.Errorf("failed to get req body: %v", err))
+ if err != nil {
+ klog.Warning(err.Error())
+ }
+ return
+ }
+ if lr.N <= 0 {
+ err = response.WriteError(http.StatusBadRequest, errors.NewRequestEntityTooLargeError("the request body can only be up to 1MB in size"))
+ if err != nil {
+ klog.Warning(err.Error())
+ }
+ return
+ }
+ if err = json.Unmarshal(body, &resp); err != nil {
+ err = response.WriteError(http.StatusBadRequest, fmt.Errorf("failed to unmarshal task info: %v", err))
+ if err != nil {
+ klog.Warning(err.Error())
+ }
+ return
+ }
+
+ msg := beehiveModel.NewMessage("").SetRoute(modules.CloudHubModuleName, modules.CloudHubModuleGroup).
+ SetResourceOperation(fmt.Sprintf("task/%s/node/%s", taskID, nodeID), taskType).FillBody(resp)
+ beehiveContext.Send(modules.TaskManagerModuleName, *msg)
+
+ if _, err = response.Write([]byte("ok")); err != nil {
+ klog.Errorf("failed to send the task resp to edge , err: %v", err)
+ }
+}
diff --git a/cloud/pkg/cloudhub/servers/httpserver/server.go b/cloud/pkg/cloudhub/servers/httpserver/server.go
index 1ae061770..0b8e61433 100644
--- a/cloud/pkg/cloudhub/servers/httpserver/server.go
+++ b/cloud/pkg/cloudhub/servers/httpserver/server.go
@@ -47,6 +47,7 @@ func StartHTTPServer() {
ws.Route(ws.GET(constants.DefaultCertURL).To(edgeCoreClientCert))
ws.Route(ws.GET(constants.DefaultCAURL).To(getCA))
ws.Route(ws.POST(constants.DefaultNodeUpgradeURL).To(upgradeEdge))
+ ws.Route(ws.POST(constants.DefaultTaskStateReportURL).To(reportTaskStatus))
serverContainer.Add(ws)
addr := fmt.Sprintf("%s:%d", hubconfig.Config.HTTPS.Address, hubconfig.Config.HTTPS.Port)
diff --git a/cloud/pkg/cloudhub/servers/httpserver/upgrade.go b/cloud/pkg/cloudhub/servers/httpserver/upgrade.go
index c8b8eb65f..2dc343714 100644
--- a/cloud/pkg/cloudhub/servers/httpserver/upgrade.go
+++ b/cloud/pkg/cloudhub/servers/httpserver/upgrade.go
@@ -17,6 +17,7 @@ import (
"encoding/json"
"fmt"
"io"
+ "net/http"
"github.com/emicklei/go-restful"
"k8s.io/apimachinery/pkg/api/errors"
@@ -25,40 +26,73 @@ import (
beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
beehiveModel "github.com/kubeedge/beehive/pkg/core/model"
"github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
- "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller/controller"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util"
commontypes "github.com/kubeedge/kubeedge/common/types"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+)
+
+const (
+ UpgradeSuccess = "upgrade_success"
+ UpgradeFailedRollbackSuccess = "upgrade_failed_rollback_success"
+ UpgradeFailedRollbackFailed = "upgrade_failed_rollback_failed"
)
// upgradeEdge upgrade the edgecore version
func upgradeEdge(request *restful.Request, response *restful.Response) {
resp := commontypes.NodeUpgradeJobResponse{}
- defer func() {
- if _, err := response.Write([]byte("ok")); err != nil {
- klog.Errorf("failed to send upgrade edge resp, err: %v", err)
- }
- }()
+ taskID := resp.UpgradeID
+ taskType := util.TaskUpgrade
+ nodeID := resp.NodeName
- limit := int64(3 * 1024 * 1024)
lr := &io.LimitedReader{
R: request.Request.Body,
- N: limit + 1,
+ N: millionByte + 1,
}
body, err := io.ReadAll(lr)
if err != nil {
- klog.Errorf("failed to get req body: %v", err)
+ err = response.WriteError(http.StatusBadRequest, fmt.Errorf("failed to get req body: %v", err))
+ if err != nil {
+ klog.Warning(err.Error())
+ }
return
}
if lr.N <= 0 {
- klog.Errorf("%v", errors.NewRequestEntityTooLargeError(fmt.Sprintf("limit is %d", limit)))
+ err = response.WriteError(http.StatusBadRequest, errors.NewRequestEntityTooLargeError("the request body can only be up to 1MB in size"))
+ if err != nil {
+ klog.Warning(err.Error())
+ }
return
}
- if err := json.Unmarshal(body, &resp); err != nil {
- klog.Errorf("failed to marshal upgrade info: %v", err)
+ if err = json.Unmarshal(body, &resp); err != nil {
+ err = response.WriteError(http.StatusBadRequest, fmt.Errorf("failed to marshal task info: %v", err))
+ if err != nil {
+ klog.Warning(err.Error())
+ }
return
}
+ newResp := commontypes.NodeTaskResponse{
+ NodeName: resp.NodeName,
+ Event: "Upgrade",
+ Action: api.ActionSuccess,
+ Reason: resp.Reason,
+ }
- msg := beehiveModel.NewMessage("").SetRoute(modules.CloudHubModuleName, modules.CloudHubModuleName).
- SetResourceOperation(fmt.Sprintf("%s/%s/node/%s", controller.NodeUpgrade, resp.UpgradeID, resp.NodeName), controller.NodeUpgrade).FillBody(resp)
- beehiveContext.Send(modules.NodeUpgradeJobControllerModuleName, *msg)
+ if resp.Status == UpgradeFailedRollbackSuccess {
+ newResp.Event = "Rollback"
+ newResp.Action = api.ActionFailure
+ }
+
+ if resp.Status == UpgradeFailedRollbackFailed {
+ newResp.Event = "Rollback"
+ newResp.Action = api.ActionSuccess
+ }
+
+ msg := beehiveModel.NewMessage("").SetRoute(modules.CloudHubModuleName, modules.CloudHubModuleGroup).
+ SetResourceOperation(fmt.Sprintf("task/%s/node/%s", taskID, nodeID), taskType).FillBody(newResp)
+ beehiveContext.Send(modules.TaskManagerModuleName, *msg)
+
+ if _, err = response.Write([]byte("ok")); err != nil {
+ klog.Errorf("failed to send the task resp to edge , err: %v", err)
+ }
}
diff --git a/cloud/pkg/common/messagelayer/context.go b/cloud/pkg/common/messagelayer/context.go
index 8a2e9d4bd..58bee37a7 100644
--- a/cloud/pkg/common/messagelayer/context.go
+++ b/cloud/pkg/common/messagelayer/context.go
@@ -96,18 +96,10 @@ func DynamicControllerMessageLayer() MessageLayer {
}
}
-func NodeUpgradeJobControllerMessageLayer() MessageLayer {
+func TaskManagerMessageLayer() MessageLayer {
return &ContextMessageLayer{
SendModuleName: modules.CloudHubModuleName,
- ReceiveModuleName: modules.NodeUpgradeJobControllerModuleName,
- ResponseModuleName: modules.CloudHubModuleName,
- }
-}
-
-func ImagePrePullControllerMessageLayer() MessageLayer {
- return &ContextMessageLayer{
- SendModuleName: modules.CloudHubModuleName,
- ReceiveModuleName: modules.ImagePrePullControllerModuleName,
+ ReceiveModuleName: modules.TaskManagerModuleName,
ResponseModuleName: modules.CloudHubModuleName,
}
}
diff --git a/cloud/pkg/common/modules/modules.go b/cloud/pkg/common/modules/modules.go
index f458aee5e..12fa9927e 100644
--- a/cloud/pkg/common/modules/modules.go
+++ b/cloud/pkg/common/modules/modules.go
@@ -16,8 +16,8 @@ const (
NodeUpgradeJobControllerModuleName = "nodeupgradejobcontroller"
NodeUpgradeJobControllerModuleGroup = "nodeupgradejobcontroller"
- ImagePrePullControllerModuleName = "imageprepullcontroller"
- ImagePrePullControllerModuleGroup = "imageprepullcontroller"
+ TaskManagerModuleName = "taskmanager"
+ TaskManagerModuleGroup = "taskmanager"
SyncControllerModuleName = "synccontroller"
SyncControllerModuleGroup = "synccontroller"
diff --git a/cloud/pkg/imageprepullcontroller/controller/downstream.go b/cloud/pkg/imageprepullcontroller/controller/downstream.go
deleted file mode 100644
index d82c58413..000000000
--- a/cloud/pkg/imageprepullcontroller/controller/downstream.go
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
-Copyright 2023 The KubeEdge Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package controller
-
-import (
- "fmt"
- "time"
-
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/apimachinery/pkg/watch"
- k8sinformer "k8s.io/client-go/informers"
- "k8s.io/client-go/kubernetes"
- "k8s.io/klog/v2"
-
- beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
- "github.com/kubeedge/beehive/pkg/core/model"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/client"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/informers"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/util"
- "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/manager"
- commontypes "github.com/kubeedge/kubeedge/common/types"
- "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
- crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned"
- crdinformers "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions"
-)
-
-type DownstreamController struct {
- kubeClient kubernetes.Interface
- informer k8sinformer.SharedInformerFactory
- crdClient crdClientset.Interface
- messageLayer messagelayer.MessageLayer
-
- imagePrePullJobManager *manager.ImagePrePullJobManager
-}
-
-// Start DownstreamController
-func (dc *DownstreamController) Start() error {
- klog.Info("Start ImagePrePullJob Downstream Controller")
- go dc.syncImagePrePullJob()
- return nil
-}
-
-// syncImagePrePullJob is used to get events from informer
-func (dc *DownstreamController) syncImagePrePullJob() {
- for {
- select {
- case <-beehiveContext.Done():
- klog.Info("stop sync ImagePrePullJob")
- return
- case e := <-dc.imagePrePullJobManager.Events():
- imagePrePull, ok := e.Object.(*v1alpha1.ImagePrePullJob)
- if !ok {
- klog.Warningf("object type: %T unsupported", e.Object)
- continue
- }
- switch e.Type {
- case watch.Added:
- dc.imagePrePullJobAdded(imagePrePull)
- case watch.Deleted:
- dc.imagePrePullJobDeleted(imagePrePull)
- case watch.Modified:
- dc.imagePrePullJobUpdate(imagePrePull)
- default:
- klog.Warningf("ImagePrePullJob event type: %s unsupported", e.Type)
- }
- }
- }
-}
-
-// imagePrePullJobAdded is used to process addition of new ImagePrePullJob in apiserver
-func (dc *DownstreamController) imagePrePullJobAdded(imagePrePull *v1alpha1.ImagePrePullJob) {
- klog.V(4).Infof("add ImagePrePullJob: %v", imagePrePull)
- // store in cache map
- dc.imagePrePullJobManager.ImagePrePullMap.Store(imagePrePull.Name, imagePrePull)
-
- // If imagePrePullJob is not initial state, we don't need to send message\
- if imagePrePull.Status.State != v1alpha1.PrePullInitialValue {
- klog.Errorf("The ImagePrePullJob %s is already running or completed, don't send message again", imagePrePull.Name)
- return
- }
-
- // get node list that need prepull images
- var nodesToPrePullImage []string
- if len(imagePrePull.Spec.ImagePrePullTemplate.NodeNames) != 0 {
- for _, node := range imagePrePull.Spec.ImagePrePullTemplate.NodeNames {
- nodeInfo, err := dc.informer.Core().V1().Nodes().Lister().Get(node)
- if err != nil {
- klog.Errorf("Failed to get node(%s) info: %v", node, err)
- continue
- }
-
- if validateNode(nodeInfo) {
- nodesToPrePullImage = append(nodesToPrePullImage, nodeInfo.Name)
- }
- }
- } else if imagePrePull.Spec.ImagePrePullTemplate.LabelSelector != nil {
- selector, err := metav1.LabelSelectorAsSelector(imagePrePull.Spec.ImagePrePullTemplate.LabelSelector)
- if err != nil {
- klog.Errorf("LabelSelector(%s) is not valid: %v", imagePrePull.Spec.ImagePrePullTemplate.LabelSelector, err)
- return
- }
-
- nodes, err := dc.informer.Core().V1().Nodes().Lister().List(selector)
- if err != nil {
- klog.Errorf("Failed to get nodes with label %s: %v", selector.String(), err)
- return
- }
-
- for _, node := range nodes {
- if validateNode(node) {
- nodesToPrePullImage = append(nodesToPrePullImage, node.Name)
- }
- }
- }
-
- // deduplicate: remove duplicate nodes to avoid repeating prepull images on the same node
- nodesToPrePullImage = util.RemoveDuplicateElement(nodesToPrePullImage)
-
- klog.Infof("Filtered finished, images will be prepulled on below nodes\n%v\n", nodesToPrePullImage)
-
- go func() {
- for _, node := range nodesToPrePullImage {
- dc.processPrePull(node, imagePrePull)
- }
- }()
-}
-
-// imagePrePullJobDeleted is used to process deleted ImagePrePullJob in apiServer
-func (dc *DownstreamController) imagePrePullJobDeleted(imagePrePull *v1alpha1.ImagePrePullJob) {
- // delete drom cache map
- dc.imagePrePullJobManager.ImagePrePullMap.Delete(imagePrePull.Name)
-}
-
-// imagePrePullJobUpdate is used to process update of ImagePrePullJob in apiServer
-// Now we don't allow update spec, so we only update the cache map in imagePrePullJobUpdate func.
-func (dc *DownstreamController) imagePrePullJobUpdate(imagePrePull *v1alpha1.ImagePrePullJob) {
- _, ok := dc.imagePrePullJobManager.ImagePrePullMap.Load(imagePrePull.Name)
- // store in cache map
- dc.imagePrePullJobManager.ImagePrePullMap.Store(imagePrePull.Name, imagePrePull)
- if !ok {
- klog.Infof("ImagePrePull Job %s not exist, and store it into first", imagePrePull.Name)
- // If ImagePrePullJob not present in ImagePrePull map means it is not modified and added.
- dc.imagePrePullJobAdded(imagePrePull)
- }
-}
-
-// processPrePull process prepull job added and send it to edge nodes.
-func (dc *DownstreamController) processPrePull(node string, imagePrePull *v1alpha1.ImagePrePullJob) {
- klog.V(4).Infof("begin to send imagePrePull message to %s", node)
-
- imagePrePullTemplateInfo := imagePrePull.Spec.ImagePrePullTemplate
- imagePrePullRequest := commontypes.ImagePrePullJobRequest{
- Images: imagePrePullTemplateInfo.Images,
- NodeName: node,
- Secret: imagePrePullTemplateInfo.ImageSecret,
- RetryTimes: imagePrePullTemplateInfo.RetryTimes,
- CheckItems: imagePrePullTemplateInfo.CheckItems,
- }
-
- // handle timeout: could not receive image prepull msg feedback from edge node
- // send prepull timeout response message to upstream
- go dc.handleImagePrePullJobTimeoutOnEachNode(node, imagePrePull.Name, imagePrePullTemplateInfo.TimeoutSecondsOnEachNode)
-
- // send prepull msg to edge node
- msg := model.NewMessage("")
- resource := buildPrePullResource(imagePrePull.Name, node)
- msg.BuildRouter(modules.ImagePrePullControllerModuleName, modules.ImagePrePullControllerModuleGroup, resource, ImagePrePull).
- FillBody(imagePrePullRequest)
-
- err := dc.messageLayer.Send(*msg)
- if err != nil {
- klog.Errorf("Failed to send prepull message %s due to error %v", msg.GetID(), err)
- return
- }
-
- // update imagePrePullJob status to prepulling
- status := &v1alpha1.ImagePrePullStatus{
- NodeName: node,
- State: v1alpha1.PrePulling,
- }
- err = patchImagePrePullStatus(dc.crdClient, imagePrePull, status)
- if err != nil {
- klog.Errorf("Failed to mark imagePrePullJob prepulling status: %v", err)
- }
-}
-
-// handleImagePrePullJobTimeoutOnEachNode is used to handle the situation that cloud don't receive prepull result
-// from edge node within the timeout period.
-// If so, the ImagePrePullJobState will update to timeout
-func (dc *DownstreamController) handleImagePrePullJobTimeoutOnEachNode(node, jobName string, timeoutSecondsEachNode *uint32) {
- var timeout uint32 = 360
- if timeoutSecondsEachNode != nil && *timeoutSecondsEachNode != 0 {
- timeout = *timeoutSecondsEachNode
- }
-
- receiveFeedback := false
-
- _ = wait.Poll(10*time.Second, time.Duration(timeout)*time.Second, func() (bool, error) {
- cacheValue, ok := dc.imagePrePullJobManager.ImagePrePullMap.Load(jobName)
- if !ok {
- receiveFeedback = true
- klog.Errorf("ImagePrePullJob %s is not exist", jobName)
- return false, fmt.Errorf("imagePrePullJob %s is not exist", jobName)
- }
- imagePrePullValue := cacheValue.(*v1alpha1.ImagePrePullJob)
- for _, statusValue := range imagePrePullValue.Status.Status {
- if statusValue.NodeName == node && (statusValue.State == v1alpha1.PrePullSuccessful || statusValue.State == v1alpha1.PrePullFailed) {
- receiveFeedback = true
- return true, nil
- }
- break
- }
- return false, nil
- })
-
- if receiveFeedback {
- return
- }
- klog.Errorf("TIMEOUT to receive image prepull %s response from edge node %s", jobName, node)
-
- // construct timeout image prepull response and send it to upstream controller
- responseResource := buildPrePullResource(jobName, node)
- resp := commontypes.ImagePrePullJobResponse{
- NodeName: node,
- State: v1alpha1.PrePullFailed,
- Reason: "timeout to receive response from edge",
- }
-
- respMsg := model.NewMessage("").
- BuildRouter(modules.ImagePrePullControllerModuleName, modules.ImagePrePullControllerModuleGroup, responseResource, ImagePrePull).
- FillBody(resp)
- beehiveContext.Send(modules.ImagePrePullControllerModuleName, *respMsg)
-}
-
-// NewDownstreamController new downstream controller to process downstream imageprepull msg to edge nodes.
-func NewDownstreamController(crdInformerFactory crdinformers.SharedInformerFactory) (*DownstreamController, error) {
- imagePrePullJobManager, err := manager.NewImagePrePullJobManager(crdInformerFactory.Operations().V1alpha1().ImagePrePullJobs().Informer())
- if err != nil {
- klog.Warningf("Create ImagePrePullJob manager failed with error: %s", err)
- return nil, err
- }
-
- dc := &DownstreamController{
- kubeClient: client.GetKubeClient(),
- informer: informers.GetInformersManager().GetKubeInformerFactory(),
- crdClient: client.GetCRDClient(),
- imagePrePullJobManager: imagePrePullJobManager,
- messageLayer: messagelayer.ImagePrePullControllerMessageLayer(),
- }
- return dc, nil
-}
diff --git a/cloud/pkg/imageprepullcontroller/controller/util.go b/cloud/pkg/imageprepullcontroller/controller/util.go
deleted file mode 100644
index 592164833..000000000
--- a/cloud/pkg/imageprepullcontroller/controller/util.go
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
-Copyright 2023 The KubeEdge Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package controller
-
-import (
- "context"
- "encoding/json"
- "fmt"
- "strings"
-
- jsonpatch "github.com/evanphx/json-patch"
- v1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- apimachineryType "k8s.io/apimachinery/pkg/types"
- "k8s.io/klog/v2"
-
- "github.com/kubeedge/kubeedge/cloud/pkg/common/util"
- "github.com/kubeedge/kubeedge/common/constants"
- "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
- crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned"
-)
-
-const ImagePrePull = "prepull"
-
-func validateNode(node *v1.Node) bool {
- if !util.IsEdgeNode(node) {
- klog.Warningf("Node(%s) is not edge node", node.Name)
- return false
- }
-
- // if node is in NotReady state, cannot prepull images
- for _, condition := range node.Status.Conditions {
- if condition.Type == v1.NodeReady && condition.Status != v1.ConditionTrue {
- klog.Warningf("Node(%s) is in NotReady state", node.Name)
- return false
- }
- }
-
- return true
-}
-
-// buildPrePullResource build prepull resource in msg send to edge node
-func buildPrePullResource(imagePrePullName, nodeName string) string {
- resource := fmt.Sprintf("%s/%s/%s/%s", "node", nodeName, ImagePrePull, imagePrePullName)
- return resource
-}
-
-func parsePrePullresource(resource string) (string, string, error) {
- var nodeName, jobName string
- sli := strings.Split(resource, constants.ResourceSep)
- if len(sli) != 4 {
- return nodeName, jobName, fmt.Errorf("the resource %s is not the standard type", resource)
- }
- return sli[1], sli[3], nil
-}
-
-func patchImagePrePullStatus(crdClient crdClientset.Interface, imagePrePull *v1alpha1.ImagePrePullJob, status *v1alpha1.ImagePrePullStatus) error {
- oldValue := imagePrePull.DeepCopy()
- newValue := updateNodeImagePrePullStatus(oldValue, status)
-
- var completeFlag int
- var failedFlag bool
- newValue.Status.State = v1alpha1.PrePulling
- for _, statusValue := range newValue.Status.Status {
- if statusValue.State == v1alpha1.PrePullFailed {
- failedFlag = true
- completeFlag++
- }
- if statusValue.State == v1alpha1.PrePullSuccessful {
- completeFlag++
- }
- }
- if completeFlag == len(newValue.Status.Status) {
- if failedFlag {
- newValue.Status.State = v1alpha1.PrePullFailed
- } else {
- newValue.Status.State = v1alpha1.PrePullSuccessful
- }
- }
-
- oldData, err := json.Marshal(oldValue)
- if err != nil {
- return fmt.Errorf("failed to marshal the old ImagePrePullJob(%s): %v", oldValue.Name, err)
- }
-
- newData, err := json.Marshal(newValue)
- if err != nil {
- return fmt.Errorf("failed to marshal the new ImagePrePullJob(%s): %v", newValue.Name, err)
- }
-
- patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
- if err != nil {
- return fmt.Errorf("failed to create a merge patch: %v", err)
- }
-
- _, err = crdClient.OperationsV1alpha1().ImagePrePullJobs().Patch(context.TODO(), newValue.Name, apimachineryType.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
- if err != nil {
- return fmt.Errorf("failed to patch update ImagePrePullJob status: %v", err)
- }
-
- return nil
-}
-
-func updateNodeImagePrePullStatus(imagePrePull *v1alpha1.ImagePrePullJob, status *v1alpha1.ImagePrePullStatus) *v1alpha1.ImagePrePullJob {
- // return value imageprepull cannot populate the input parameter old
- newValue := imagePrePull.DeepCopy()
-
- for index, nodeStatus := range newValue.Status.Status {
- if nodeStatus.NodeName == status.NodeName {
- newValue.Status.Status[index] = *status
- return newValue
- }
- }
-
- newValue.Status.Status = append(newValue.Status.Status, *status)
- return newValue
-}
diff --git a/cloud/pkg/imageprepullcontroller/imageprepullcontroller.go b/cloud/pkg/imageprepullcontroller/imageprepullcontroller.go
deleted file mode 100644
index 4f772d258..000000000
--- a/cloud/pkg/imageprepullcontroller/imageprepullcontroller.go
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
-Copyright 2023 The KubeEdge Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package imageprepullcontroller
-
-import (
- "time"
-
- "k8s.io/klog/v2"
-
- "github.com/kubeedge/beehive/pkg/core"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/informers"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
- "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/config"
- "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/controller"
- "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1"
-)
-
-// ImagePrePullController is controller for processing prepull images on edge nodes
-type ImagePrePullController struct {
- downstream *controller.DownstreamController
- upstream *controller.UpstreamController
- enable bool
-}
-
-var _ core.Module = (*ImagePrePullController)(nil)
-
-func newImagePrePullController(enable bool) *ImagePrePullController {
- if !enable {
- return &ImagePrePullController{enable: enable}
- }
- downstream, err := controller.NewDownstreamController(informers.GetInformersManager().GetKubeEdgeInformerFactory())
- if err != nil {
- klog.Exitf("New ImagePrePull Controller downstream failed with error: %s", err)
- }
- upstream, err := controller.NewUpstreamController(downstream)
- if err != nil {
- klog.Exitf("New ImagePrePull Controller upstream failed with error: %s", err)
- }
- return &ImagePrePullController{
- downstream: downstream,
- upstream: upstream,
- enable: enable,
- }
-}
-
-func Register(dc *v1alpha1.ImagePrePullController) {
- config.InitConfigure(dc)
- core.Register(newImagePrePullController(dc.Enable))
-}
-
-// Name of controller
-func (uc *ImagePrePullController) Name() string {
- return modules.ImagePrePullControllerModuleName
-}
-
-// Group of controller
-func (uc *ImagePrePullController) Group() string {
- return modules.ImagePrePullControllerModuleGroup
-}
-
-// Enable indicates whether enable this module
-func (uc *ImagePrePullController) Enable() bool {
- return uc.enable
-}
-
-// Start controller
-func (uc *ImagePrePullController) Start() {
- if err := uc.downstream.Start(); err != nil {
- klog.Exitf("start ImagePrePullJob controller downstream failed with error: %s", err)
- }
- // wait for downstream controller to start and load ImagePrePullJob
- // TODO think about sync
- time.Sleep(1 * time.Second)
- if err := uc.upstream.Start(); err != nil {
- klog.Exitf("start ImagePrePullJob controller upstream failed with error: %s", err)
- }
-}
diff --git a/cloud/pkg/imageprepullcontroller/manager/common.go b/cloud/pkg/imageprepullcontroller/manager/common.go
deleted file mode 100644
index 9f371875a..000000000
--- a/cloud/pkg/imageprepullcontroller/manager/common.go
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
-Copyright 2023 The KubeEdge Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package manager
-
-import (
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/watch"
- "k8s.io/klog/v2"
-)
-
-// Manager define the interface of a Manager, ImagePrePullJob Manager implement it
-type Manager interface {
- Events() chan watch.Event
-}
-
-// CommonResourceEventHandler can be used by ImagePrePullJob Manager
-type CommonResourceEventHandler struct {
- events chan watch.Event
-}
-
-func (c *CommonResourceEventHandler) obj2Event(t watch.EventType, obj interface{}) {
- eventObj, ok := obj.(runtime.Object)
- if !ok {
- klog.Warningf("unknown type: %T, ignore", obj)
- return
- }
- c.events <- watch.Event{Type: t, Object: eventObj}
-}
-
-// OnAdd handle Add event
-func (c *CommonResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool) {
- c.obj2Event(watch.Added, obj)
-}
-
-// OnUpdate handle Update event
-func (c *CommonResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
- c.obj2Event(watch.Modified, newObj)
-}
-
-// OnDelete handle Delete event
-func (c *CommonResourceEventHandler) OnDelete(obj interface{}) {
- c.obj2Event(watch.Deleted, obj)
-}
-
-// NewCommonResourceEventHandler create CommonResourceEventHandler used by ImagePrePullJob Manager
-func NewCommonResourceEventHandler(events chan watch.Event) *CommonResourceEventHandler {
- return &CommonResourceEventHandler{events: events}
-}
diff --git a/cloud/pkg/imageprepullcontroller/manager/imageprepull.go b/cloud/pkg/imageprepullcontroller/manager/imageprepull.go
deleted file mode 100644
index cdb2eaccc..000000000
--- a/cloud/pkg/imageprepullcontroller/manager/imageprepull.go
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
-Copyright 2023 The KubeEdge Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package manager
-
-import (
- "sync"
-
- "k8s.io/apimachinery/pkg/watch"
- "k8s.io/client-go/tools/cache"
-
- "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/config"
-)
-
-// ImagePrePullJobManager is a manager watch ImagePrePullJob change event
-type ImagePrePullJobManager struct {
- // events from watch kubernetes api server
- events chan watch.Event
-
- // ImagePrePullMap, key is ImagePrePullJob.Name, value is *v1alpha1.ImagePrePullJob{}
- ImagePrePullMap sync.Map
-}
-
-// Events return a channel, can receive all ImagePrePullJob event
-func (dmm *ImagePrePullJobManager) Events() chan watch.Event {
- return dmm.events
-}
-
-// NewImagePrePullJobManager create ImagePrePullJobManager from config
-func NewImagePrePullJobManager(si cache.SharedIndexInformer) (*ImagePrePullJobManager, error) {
- events := make(chan watch.Event, config.Config.Buffer.ImagePrePullJobEvent)
- rh := NewCommonResourceEventHandler(events)
- _, err := si.AddEventHandler(rh)
- if err != nil {
- return nil, err
- }
-
- return &ImagePrePullJobManager{events: events}, nil
-}
diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go b/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go
deleted file mode 100644
index 6efee8012..000000000
--- a/cloud/pkg/nodeupgradejobcontroller/controller/downstream.go
+++ /dev/null
@@ -1,447 +0,0 @@
-/*
-Copyright 2022 The KubeEdge Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package controller
-
-import (
- "context"
- "encoding/json"
- "fmt"
- "time"
-
- "github.com/google/uuid"
- v1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- apimachineryType "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/apimachinery/pkg/watch"
- k8sinformer "k8s.io/client-go/informers"
- "k8s.io/client-go/kubernetes"
- "k8s.io/klog/v2"
-
- beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
- "github.com/kubeedge/beehive/pkg/core/model"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/client"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/informers"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/util"
- "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller/manager"
- "github.com/kubeedge/kubeedge/common/constants"
- commontypes "github.com/kubeedge/kubeedge/common/types"
- "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
- crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned"
- crdinformers "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions"
-)
-
-type DownstreamController struct {
- kubeClient kubernetes.Interface
- informer k8sinformer.SharedInformerFactory
- crdClient crdClientset.Interface
- messageLayer messagelayer.MessageLayer
-
- nodeUpgradeJobManager *manager.NodeUpgradeJobManager
-}
-
-// Start DownstreamController
-func (dc *DownstreamController) Start() error {
- klog.Info("Start NodeUpgradeJob Downstream Controller")
-
- go dc.syncNodeUpgradeJob()
-
- return nil
-}
-
-// syncNodeUpgradeJob is used to get events from informer
-func (dc *DownstreamController) syncNodeUpgradeJob() {
- for {
- select {
- case <-beehiveContext.Done():
- klog.Info("stop sync NodeUpgradeJob")
- return
- case e := <-dc.nodeUpgradeJobManager.Events():
- upgrade, ok := e.Object.(*v1alpha1.NodeUpgradeJob)
- if !ok {
- klog.Warningf("object type: %T unsupported", e.Object)
- continue
- }
- switch e.Type {
- case watch.Added:
- dc.nodeUpgradeJobAdded(upgrade)
- case watch.Deleted:
- dc.nodeUpgradeJobDeleted(upgrade)
- case watch.Modified:
- dc.nodeUpgradeJobUpdated(upgrade)
- default:
- klog.Warningf("NodeUpgradeJob event type: %s unsupported", e.Type)
- }
- }
- }
-}
-
-func buildUpgradeResource(upgradeID, nodeID string) string {
- resource := fmt.Sprintf("%s%s%s%s%s%s%s", NodeUpgrade, constants.ResourceSep, upgradeID, constants.ResourceSep, "node", constants.ResourceSep, nodeID)
- return resource
-}
-
-// nodeUpgradeJobAdded is used to process addition of new NodeUpgradeJob in apiserver
-func (dc *DownstreamController) nodeUpgradeJobAdded(upgrade *v1alpha1.NodeUpgradeJob) {
- klog.V(4).Infof("add NodeUpgradeJob: %v", upgrade)
- // store in cache map
- dc.nodeUpgradeJobManager.UpgradeMap.Store(upgrade.Name, upgrade)
-
- // If all or partial edge nodes upgrade is upgrading or completed, we don't need to send upgrade message
- if isCompleted(upgrade) {
- klog.Errorf("The nodeUpgradeJob is already running or completed, don't send upgrade message again")
- return
- }
-
- // get node list that need upgrading
- var nodesToUpgrade []string
- if len(upgrade.Spec.NodeNames) != 0 {
- for _, node := range upgrade.Spec.NodeNames {
- nodeInfo, err := dc.informer.Core().V1().Nodes().Lister().Get(node)
- if err != nil {
- klog.Errorf("Failed to get node(%s) info: %v", node, err)
- continue
- }
-
- if needUpgrade(nodeInfo, upgrade.Spec.Version) {
- nodesToUpgrade = append(nodesToUpgrade, nodeInfo.Name)
- }
- }
- } else if upgrade.Spec.LabelSelector != nil {
- selector, err := metav1.LabelSelectorAsSelector(upgrade.Spec.LabelSelector)
- if err != nil {
- klog.Errorf("LabelSelector(%s) is not valid: %v", upgrade.Spec.LabelSelector, err)
- return
- }
-
- nodes, err := dc.informer.Core().V1().Nodes().Lister().List(selector)
- if err != nil {
- klog.Errorf("Failed to get nodes with label %s: %v", selector.String(), err)
- return
- }
-
- for _, node := range nodes {
- if needUpgrade(node, upgrade.Spec.Version) {
- nodesToUpgrade = append(nodesToUpgrade, node.Name)
- }
- }
- }
-
- // deduplicate: remove duplicate nodes to avoid repeating upgrade to the same node
- nodesToUpgrade = util.RemoveDuplicateElement(nodesToUpgrade)
-
- klog.Infof("Filtered finished, the below nodes are to upgrade\n%v\n", nodesToUpgrade)
-
- // upgrade most `UpgradeJob.Spec.Concurrency` nodes once a time
- nodesChan := make(chan string, upgrade.Spec.Concurrency)
-
- // select nodes to do upgrade operation
- go dc.selectConcurrentNodes(nodesChan, nodesToUpgrade, upgrade)
-
- go func() {
- for node := range nodesChan {
- dc.processUpgrade(node, upgrade)
- }
- }()
-}
-
-// processUpgrade do the upgrade operation on node
-func (dc *DownstreamController) processUpgrade(node string, upgrade *v1alpha1.NodeUpgradeJob) {
- klog.V(4).Infof("begin to upgrade node %s", node)
- // if users specify Image, we'll use upgrade Version as its image tag, even though Image contains tag.
- // if not, we'll use default image: kubeedge/installation-package:${Version}
- var repo string
- var err error
- repo = "kubeedge/installation-package"
- if upgrade.Spec.Image != "" {
- repo, err = GetImageRepo(upgrade.Spec.Image)
- if err != nil {
- klog.Errorf("Image format is not right: %v", err)
- return
- }
- }
- imageTag := upgrade.Spec.Version
- image := fmt.Sprintf("%s:%s", repo, imageTag)
-
- // send upgrade msg to edge node
- msg := model.NewMessage("")
-
- resource := buildUpgradeResource(upgrade.Name, node)
-
- upgradeReq := commontypes.NodeUpgradeJobRequest{
- UpgradeID: upgrade.Name,
- HistoryID: uuid.New().String(),
- UpgradeTool: upgrade.Spec.UpgradeTool,
- Version: upgrade.Spec.Version,
- Image: image,
- }
-
- msg.BuildRouter(modules.NodeUpgradeJobControllerModuleName, modules.NodeUpgradeJobControllerModuleGroup, resource, NodeUpgrade).
- FillBody(upgradeReq)
-
- err = dc.messageLayer.Send(*msg)
- if err != nil {
- klog.Errorf("Failed to send upgrade message %v due to error %v", msg.GetID(), err)
- return
- }
-
- // process time out: could not receive upgrade feedback from edge
- // send upgrade timeout response message to upstream
- go dc.handleNodeUpgradeJobTimeout(node, upgrade.Name, upgrade.Spec.Version, upgradeReq.HistoryID, upgrade.Spec.TimeoutSeconds)
-
- // mark Upgrade state upgrading
- status := &v1alpha1.UpgradeStatus{
- NodeName: node,
- State: v1alpha1.Upgrading,
- History: v1alpha1.History{
- HistoryID: upgradeReq.HistoryID,
- UpgradeTime: time.Now().Format(ISO8601UTC),
- },
- }
- err = patchNodeUpgradeJobStatus(dc.crdClient, upgrade, status)
- if err != nil {
- // not return, continue to mark node unschedulable
- klog.Errorf("Failed to mark Upgrade upgrading status: %v", err)
- }
-
- // mark edge node unschedulable
- // the effect is like running cmd: kubectl drain <node-to-drain> --ignore-daemonsets
- unscheduleNode := v1.Node{}
- unscheduleNode.Spec.Unschedulable = true
-
- // add a upgrade label
- unscheduleNode.Labels = map[string]string{NodeUpgradeJobStatusKey: NodeUpgradeJobStatusValue}
- byteNode, err := json.Marshal(unscheduleNode)
- if err != nil {
- klog.Warningf("marshal data failed: %v", err)
- return
- }
-
- _, err = dc.kubeClient.CoreV1().Nodes().Patch(context.Background(), node, apimachineryType.StrategicMergePatchType, byteNode, metav1.PatchOptions{})
- if err != nil {
- klog.Errorf("failed to drain node %s: %v", node, err)
- return
- }
-}
-
-// selectConcurrentNodes select the nodes to do upgrade operation, and put it into channel nodesChan
-func (dc *DownstreamController) selectConcurrentNodes(nodesChan chan string, allNodes []string, upgrade *v1alpha1.NodeUpgradeJob) {
- // the default concurrency is 1
- // this means that we will upgrade nodes one by one
- // only when the last one node upgrade finished, we'll continue to upgrade the next one node
- concurrency := 1
- if upgrade.Spec.Concurrency != 0 {
- concurrency = int(upgrade.Spec.Concurrency)
- }
-
- timeout := int(*upgrade.Spec.TimeoutSeconds) * (len(allNodes) + 1) / concurrency
-
- err := wait.Poll(10*time.Second, time.Duration(timeout)*time.Second, func() (bool, error) {
- upgradeJob, err := dc.crdClient.OperationsV1alpha1().NodeUpgradeJobs().Get(context.TODO(), upgrade.Name, metav1.GetOptions{})
- if err != nil {
- return false, nil
- }
-
- // if all the nodes upgrade is completed, close channel to inform that the upgrade is finished
- if upgradeJob.Status.State == v1alpha1.Completed {
- klog.Infof("all the nodes upgrade status are completed")
- close(nodesChan)
- return true, nil
- }
-
- // calculate the number of nodes in upgrading operation
- upgradingNum := 0
- for _, status := range upgradeJob.Status.Status {
- if status.State == v1alpha1.Upgrading {
- upgradingNum++
- }
- }
-
- // ensure the max number of upgrading nodes is Concurrency
- if upgradingNum < concurrency {
- for i := 0; i < concurrency-upgradingNum; i++ {
- nodesChan <- allNodes[i]
- }
-
- allNodes = allNodes[:int(concurrency)-upgradingNum]
- }
-
- if len(allNodes) == 0 {
- // all the nodes are to upgrade
- klog.Infof("The number of nodes need to be upgraded reach 0")
- close(nodesChan)
- return true, nil
- }
- return false, nil
- })
-
- if err != nil {
- klog.Errorf("failed to select all the related nodes to do upgrade operation: %v", err)
- close(nodesChan)
- }
-}
-
-func needUpgrade(node *v1.Node, upgradeVersion string) bool {
- if filterVersion(node.Status.NodeInfo.KubeletVersion, upgradeVersion) {
- klog.Warningf("Node(%s) version(%s) already on the expected version %s.", node.Name, node.Status.NodeInfo.KubeletVersion, upgradeVersion)
- return false
- }
-
- // we only care about edge nodes, so just remove not edge nodes
- if !util.IsEdgeNode(node) {
- klog.Warningf("Node(%s) is not edge node", node.Name)
- return false
- }
-
- // if node is in Upgrading state, don't need upgrade
- if _, ok := node.Labels[NodeUpgradeJobStatusKey]; ok {
- klog.Warningf("Node(%s) is in upgrade state", node.Name)
- return false
- }
-
- // if node is in NotReady state, don't need upgrade
- for _, condition := range node.Status.Conditions {
- if condition.Type == v1.NodeReady && condition.Status != v1.ConditionTrue {
- klog.Warningf("Node(%s) is in NotReady state", node.Name)
- return false
- }
- }
-
- return true
-}
-
-// handleNodeUpgradeJobTimeout is used to handle the situation that cloud don't receive upgrade result from edge node
-// within the timeout period
-func (dc *DownstreamController) handleNodeUpgradeJobTimeout(node string, upgradeID string, upgradeVersion string, historyID string, timeoutSeconds *uint32) {
- // by default, if we don't receive upgrade response in 300s, we think it's timeout
- // if we have specified the timeout in Upgrade, we'll use it as the timeout time
- var timeout uint32 = 300
- if timeoutSeconds != nil && *timeoutSeconds != 0 {
- timeout = *timeoutSeconds
- }
-
- receiveFeedback := false
-
- // check whether edgecore report to the cloud about the upgrade result
- // we don't care about function Poll return error
- // if we don't receive Upgrade response, Poll function also return error: timed out waiting for the condition
- // we only care about variable: receiveFeedback
- _ = wait.Poll(10*time.Second, time.Duration(timeout)*time.Second, func() (bool, error) {
- v, ok := dc.nodeUpgradeJobManager.UpgradeMap.Load(upgradeID)
- if !ok {
- // we think it's receiveFeedback to avoid construct timeout response by ourselves
- receiveFeedback = true
- klog.Errorf("NodeUpgradeJob %v not exist", upgradeID)
- return false, fmt.Errorf("nodeUpgrade %v not exist", upgradeID)
- }
- upgradeValue := v.(*v1alpha1.NodeUpgradeJob)
- for index := range upgradeValue.Status.Status {
- if upgradeValue.Status.Status[index].NodeName == node {
- // if HistoryID matches and state is v1alpha1.Completed
- // it means we've received the specified Upgrade Operation
- if upgradeValue.Status.Status[index].History.HistoryID == historyID &&
- upgradeValue.Status.Status[index].State == v1alpha1.Completed {
- receiveFeedback = true
- return true, nil
- }
- break
- }
- }
- return false, nil
- })
-
- if receiveFeedback {
- // if already receive edge upgrade feedback, do nothing
- return
- }
-
- klog.Errorf("NOT receive node(%s) upgrade(%s) feedback response", node, upgradeID)
-
- // construct timeout upgrade response
- // and send it to upgrade controller upstream
- upgradeResource := buildUpgradeResource(upgradeID, node)
- resp := commontypes.NodeUpgradeJobResponse{
- UpgradeID: upgradeID,
- HistoryID: historyID,
- NodeName: node,
- FromVersion: "",
- ToVersion: upgradeVersion,
- Status: string(v1alpha1.UpgradeFailedRollbackSuccess),
- Reason: "timeout to get upgrade response from edge, maybe error due to cloud or edge",
- }
-
- updateMsg := model.NewMessage("").
- BuildRouter(modules.NodeUpgradeJobControllerModuleName, modules.NodeUpgradeJobControllerModuleGroup, upgradeResource, NodeUpgrade).
- FillBody(resp)
-
- // send upgrade resp message to upgrade controller upstream directly
- // let upgrade controller upstream to update upgrade status
- beehiveContext.Send(modules.NodeUpgradeJobControllerModuleName, *updateMsg)
-}
-
-// nodeUpgradeJobDeleted is used to process deleted NodeUpgradeJob in apiserver
-func (dc *DownstreamController) nodeUpgradeJobDeleted(upgrade *v1alpha1.NodeUpgradeJob) {
- // just need to delete from cache map
- dc.nodeUpgradeJobManager.UpgradeMap.Delete(upgrade.Name)
-}
-
-// upgradeAdded is used to process update of new NodeUpgradeJob in apiserver
-func (dc *DownstreamController) nodeUpgradeJobUpdated(upgrade *v1alpha1.NodeUpgradeJob) {
- oldValue, ok := dc.nodeUpgradeJobManager.UpgradeMap.Load(upgrade.Name)
- // store in cache map
- dc.nodeUpgradeJobManager.UpgradeMap.Store(upgrade.Name, upgrade)
- if !ok {
- klog.Infof("Upgrade %s not exist, and store it first", upgrade.Name)
- // If Upgrade not present in Upgrade map means it is not modified and added.
- dc.nodeUpgradeJobAdded(upgrade)
- return
- }
-
- old := oldValue.(*v1alpha1.NodeUpgradeJob)
- if !isUpgradeUpdated(upgrade, old) {
- klog.V(4).Infof("Upgrade %s no need to update", upgrade.Name)
- return
- }
-
- dc.nodeUpgradeJobAdded(upgrade)
-}
-
-// isUpgradeUpdated checks Upgrade is actually updated or not
-func isUpgradeUpdated(new *v1alpha1.NodeUpgradeJob, old *v1alpha1.NodeUpgradeJob) bool {
- // now we don't allow update spec fields
- // so always return false to avoid sending Upgrade msg to edge again when status fields changed
- return false
-}
-
-func NewDownstreamController(crdInformerFactory crdinformers.SharedInformerFactory) (*DownstreamController, error) {
- nodeUpgradeJobManager, err := manager.NewNodeUpgradeJobManager(crdInformerFactory.Operations().V1alpha1().NodeUpgradeJobs().Informer())
- if err != nil {
- klog.Warningf("Create NodeUpgradeJob manager failed with error: %s", err)
- return nil, err
- }
-
- dc := &DownstreamController{
- kubeClient: client.GetKubeClient(),
- informer: informers.GetInformersManager().GetKubeInformerFactory(),
- crdClient: client.GetCRDClient(),
- nodeUpgradeJobManager: nodeUpgradeJobManager,
- messageLayer: messagelayer.NodeUpgradeJobControllerMessageLayer(),
- }
- return dc, nil
-}
diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/upstream.go b/cloud/pkg/nodeupgradejobcontroller/controller/upstream.go
deleted file mode 100644
index 6b3e53ffb..000000000
--- a/cloud/pkg/nodeupgradejobcontroller/controller/upstream.go
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
-Copyright 2022 The KubeEdge Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package controller
-
-import (
- "context"
- "encoding/json"
- "fmt"
- "strings"
-
- jsonpatch "github.com/evanphx/json-patch"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- apimachineryType "k8s.io/apimachinery/pkg/types"
- k8sinformer "k8s.io/client-go/informers"
- "k8s.io/client-go/kubernetes"
- "k8s.io/klog/v2"
-
- beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
- "github.com/kubeedge/beehive/pkg/core/model"
- keclient "github.com/kubeedge/kubeedge/cloud/pkg/common/client"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/informers"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer"
- "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller/config"
- "github.com/kubeedge/kubeedge/common/types"
- "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
- crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned"
-)
-
-// UpstreamController subscribe messages from edge and sync to k8s api server
-type UpstreamController struct {
- // downstream controller to update NodeUpgradeJob status in cache
- dc *DownstreamController
-
- kubeClient kubernetes.Interface
- informer k8sinformer.SharedInformerFactory
- crdClient crdClientset.Interface
- messageLayer messagelayer.MessageLayer
- // message channel
- nodeUpgradeJobStatusChan chan model.Message
-}
-
-// Start UpstreamController
-func (uc *UpstreamController) Start() error {
- klog.Info("Start NodeUpgradeJob Upstream Controller")
-
- uc.nodeUpgradeJobStatusChan = make(chan model.Message, config.Config.Buffer.UpdateNodeUpgradeJobStatus)
- go uc.dispatchMessage()
-
- for i := 0; i < int(config.Config.Load.NodeUpgradeJobWorkers); i++ {
- go uc.updateNodeUpgradeJobStatus()
- }
- return nil
-}
-
-// Start UpstreamController
-func (uc *UpstreamController) dispatchMessage() {
- for {
- select {
- case <-beehiveContext.Done():
- klog.Info("Stop dispatch NodeUpgradeJob upstream message")
- return
- default:
- }
-
- msg, err := uc.messageLayer.Receive()
- if err != nil {
- klog.Warningf("Receive message failed, %v", err)
- continue
- }
-
- klog.V(4).Infof("NodeUpgradeJob upstream controller receive msg %#v", msg)
-
- uc.nodeUpgradeJobStatusChan <- msg
- }
-}
-
-// updateNodeUpgradeJobStatus update NodeUpgradeJob status field
-func (uc *UpstreamController) updateNodeUpgradeJobStatus() {
- for {
- select {
- case <-beehiveContext.Done():
- klog.Info("Stop update NodeUpgradeJob status")
- return
- case msg := <-uc.nodeUpgradeJobStatusChan:
- klog.V(4).Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource())
-
- // get nodeID and upgradeID from Upgrade msg:
- nodeID := getNodeName(msg.GetResource())
- upgradeID := getUpgradeID(msg.GetResource())
-
- oldValue, ok := uc.dc.nodeUpgradeJobManager.UpgradeMap.Load(upgradeID)
- if !ok {
- klog.Errorf("NodeUpgradeJob %s not exist", upgradeID)
- continue
- }
-
- upgrade, ok := oldValue.(*v1alpha1.NodeUpgradeJob)
- if !ok {
- klog.Errorf("NodeUpgradeJob info %T is not valid", oldValue)
- continue
- }
-
- data, err := msg.GetContentData()
- if err != nil {
- klog.Errorf("failed to get node upgrade content data: %v", err)
- continue
- }
- resp := &types.NodeUpgradeJobResponse{}
- err = json.Unmarshal(data, resp)
- if err != nil {
- klog.Errorf("Failed to unmarshal node upgrade response: %v", err)
- continue
- }
-
- status := &v1alpha1.UpgradeStatus{
- NodeName: nodeID,
- State: v1alpha1.Completed,
- History: v1alpha1.History{
- HistoryID: resp.HistoryID,
- FromVersion: resp.FromVersion,
- ToVersion: resp.ToVersion,
- Result: v1alpha1.UpgradeResult(resp.Status),
- Reason: resp.Reason,
- },
- }
- err = patchNodeUpgradeJobStatus(uc.crdClient, upgrade, status)
- if err != nil {
- klog.Errorf("Failed to mark NodeUpgradeJob status to completed: %v", err)
- }
-
- // The below are to mark edge node schedulable
- // And to keep a successful record in node annotation only when upgrade is successful
- // like: nodeupgradejob.operations.kubeedge.io/history: "v1.9.0->v1.10.0->v1.11.1"
- nodeInfo, err := uc.informer.Core().V1().Nodes().Lister().Get(nodeID)
- if err != nil {
- klog.Errorf("Failed to get node info: %v", err)
- continue
- }
-
- // mark edge node schedulable
- // the effect is like running cmd: kubectl uncordon <node-to-drain>
- if nodeInfo.Labels != nil {
- if value, ok := nodeInfo.Labels[NodeUpgradeJobStatusKey]; ok {
- if value == NodeUpgradeJobStatusValue {
- nodeInfo.Spec.Unschedulable = false
- delete(nodeInfo.Labels, NodeUpgradeJobStatusKey)
- }
- }
- }
- // record upgrade logs in node annotation
- if v1alpha1.UpgradeResult(resp.Status) == v1alpha1.UpgradeSuccess {
- if nodeInfo.Annotations == nil {
- nodeInfo.Annotations = make(map[string]string)
- }
- nodeInfo.Annotations[NodeUpgradeHistoryKey] = mergeAnnotationUpgradeHistory(nodeInfo.Annotations[NodeUpgradeHistoryKey], resp.FromVersion, resp.ToVersion)
- }
- _, err = uc.kubeClient.CoreV1().Nodes().Update(context.Background(), nodeInfo, metav1.UpdateOptions{})
- if err != nil {
- // just log, and continue to process the next step
- klog.Errorf("Failed to mark node schedulable and add upgrade record: %v", err)
- }
- }
- }
-}
-
-// patchNodeUpgradeJobStatus call patch api to patch update NodeUpgradeJob status
-func patchNodeUpgradeJobStatus(crdClient crdClientset.Interface, upgrade *v1alpha1.NodeUpgradeJob, status *v1alpha1.UpgradeStatus) error {
- oldValue := upgrade.DeepCopy()
-
- newValue := UpdateNodeUpgradeJobStatus(oldValue, status)
-
- // after mark each node upgrade state, we also need to judge whether all edge node upgrade is completed
- // if all edge node is in completed state, we should set the total state to completed
- var completed int
- for _, v := range newValue.Status.Status {
- if v.State == v1alpha1.Completed {
- completed++
- }
- }
- if completed == len(newValue.Status.Status) {
- newValue.Status.State = v1alpha1.Completed
- } else {
- newValue.Status.State = v1alpha1.Upgrading
- }
-
- oldData, err := json.Marshal(oldValue)
- if err != nil {
- return fmt.Errorf("failed to marshal the old NodeUpgradeJob(%s): %v", oldValue.Name, err)
- }
-
- newData, err := json.Marshal(newValue)
- if err != nil {
- return fmt.Errorf("failed to marshal the new NodeUpgradeJob(%s): %v", newValue.Name, err)
- }
-
- patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
- if err != nil {
- return fmt.Errorf("failed to create a merge patch: %v", err)
- }
-
- _, err = crdClient.OperationsV1alpha1().NodeUpgradeJobs().Patch(context.TODO(), newValue.Name, apimachineryType.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
- if err != nil {
- return fmt.Errorf("failed to patch update NodeUpgradeJob status: %v", err)
- }
-
- return nil
-}
-
-func getNodeName(resource string) string {
- // upgrade/${UpgradeID}/node/${NodeID}
- s := strings.Split(resource, "/")
- return s[3]
-}
-func getUpgradeID(resource string) string {
- // upgrade/${UpgradeID}/node/${NodeID}
- s := strings.Split(resource, "/")
- return s[1]
-}
-
-// NewUpstreamController create UpstreamController from config
-func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) {
- uc := &UpstreamController{
- kubeClient: keclient.GetKubeClient(),
- informer: informers.GetInformersManager().GetKubeInformerFactory(),
- crdClient: keclient.GetCRDClient(),
- messageLayer: messagelayer.NodeUpgradeJobControllerMessageLayer(),
- dc: dc,
- }
- return uc, nil
-}
diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/util.go b/cloud/pkg/nodeupgradejobcontroller/controller/util.go
deleted file mode 100644
index 8b80c4261..000000000
--- a/cloud/pkg/nodeupgradejobcontroller/controller/util.go
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
-Copyright 2022 The KubeEdge Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package controller
-
-import (
- "fmt"
- "strings"
- "time"
-
- "github.com/distribution/distribution/v3/reference"
-
- "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
-)
-
-const (
- NodeUpgradeJobStatusKey = "nodeupgradejob.operations.kubeedge.io/status"
- NodeUpgradeJobStatusValue = ""
- NodeUpgradeHistoryKey = "nodeupgradejob.operations.kubeedge.io/history"
-)
-
-const (
- NodeUpgrade = "upgrade"
-
- ISO8601UTC = "2006-01-02T15:04:05Z"
-)
-
-// filterVersion returns true only if the edge node version already on the upgrade req
-// version is like: v1.22.6-kubeedge-v1.10.0-beta.0.185+95378fb019912a, expected is like v1.10.0
-func filterVersion(version string, expected string) bool {
- // if not correct version format, also return true
- index := strings.Index(version, "-kubeedge-")
- if index == -1 {
- return false
- }
-
- length := len("-kubeedge-")
-
- // filter nodes that already in the required version
- return version[index+length:] == expected
-}
-
-// isCompleted returns true only if some/all edge upgrade is upgrading or completed
-func isCompleted(upgrade *v1alpha1.NodeUpgradeJob) bool {
- // all edge node upgrade is upgrading or completed
- if upgrade.Status.State != v1alpha1.InitialValue {
- return true
- }
-
- // partial edge node upgrade is upgrading or completed
- for _, status := range upgrade.Status.Status {
- if status.State != v1alpha1.InitialValue {
- return true
- }
- }
-
- return false
-}
-
-// UpdateNodeUpgradeJobStatus updates the status
-// return the updated result
-func UpdateNodeUpgradeJobStatus(old *v1alpha1.NodeUpgradeJob, status *v1alpha1.UpgradeStatus) *v1alpha1.NodeUpgradeJob {
- // return value upgrade cannot populate the input parameter old
- upgrade := old.DeepCopy()
-
- for index := range upgrade.Status.Status {
- // If Node's Upgrade info exist, just overwrite
- if upgrade.Status.Status[index].NodeName == status.NodeName {
- // The input status no upgradeTime, we need set it with old value
- status.History.UpgradeTime = upgrade.Status.Status[index].History.UpgradeTime
- upgrade.Status.Status[index] = *status
- return upgrade
- }
- }
-
- // if Node's Upgrade info not exist, just append
- if status.History.UpgradeTime == "" {
- // If upgrade time is blank, set to the current time
- status.History.UpgradeTime = time.Now().Format(ISO8601UTC)
- }
- upgrade.Status.Status = append(upgrade.Status.Status, *status)
-
- return upgrade
-}
-
-// mergeAnnotationUpgradeHistory constructs the new history based on the origin history
-// and we'll only keep 3 records
-func mergeAnnotationUpgradeHistory(origin, fromVersion, toVersion string) string {
- newHistory := fmt.Sprintf("%s->%s", fromVersion, toVersion)
- if origin == "" {
- return newHistory
- }
-
- sets := strings.Split(origin, ";")
- if len(sets) > 2 {
- sets = sets[1:]
- }
-
- sets = append(sets, newHistory)
- return strings.Join(sets, ";")
-}
-
-// GetImageRepo gets repo from a container image
-func GetImageRepo(image string) (string, error) {
- named, err := reference.ParseNormalizedNamed(image)
- if err != nil {
- return "", fmt.Errorf("failed to parse image name: %v", err)
- }
-
- return named.Name(), nil
-}
diff --git a/cloud/pkg/nodeupgradejobcontroller/nodeupgradejobcontroller.go b/cloud/pkg/nodeupgradejobcontroller/nodeupgradejobcontroller.go
deleted file mode 100644
index d9c4d8e4c..000000000
--- a/cloud/pkg/nodeupgradejobcontroller/nodeupgradejobcontroller.go
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
-Copyright 2022 The KubeEdge Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package nodeupgradejobcontroller
-
-import (
- "time"
-
- "k8s.io/klog/v2"
-
- "github.com/kubeedge/beehive/pkg/core"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/informers"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
- "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller/config"
- "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller/controller"
- "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1"
-)
-
-// NodeUpgradeJobController is controller for processing upgrading edge node from cloud
-type NodeUpgradeJobController struct {
- downstream *controller.DownstreamController
- upstream *controller.UpstreamController
- enable bool
-}
-
-var _ core.Module = (*NodeUpgradeJobController)(nil)
-
-func newNodeUpgradeJobController(enable bool) *NodeUpgradeJobController {
- if !enable {
- return &NodeUpgradeJobController{enable: enable}
- }
- downstream, err := controller.NewDownstreamController(informers.GetInformersManager().GetKubeEdgeInformerFactory())
- if err != nil {
- klog.Exitf("New NodeUpgradeJob Controller downstream failed with error: %s", err)
- }
- upstream, err := controller.NewUpstreamController(downstream)
- if err != nil {
- klog.Exitf("New NodeUpgradeJob Controller upstream failed with error: %s", err)
- }
- return &NodeUpgradeJobController{
- downstream: downstream,
- upstream: upstream,
- enable: enable,
- }
-}
-
-func Register(dc *v1alpha1.NodeUpgradeJobController) {
- config.InitConfigure(dc)
- core.Register(newNodeUpgradeJobController(dc.Enable))
-}
-
-// Name of controller
-func (uc *NodeUpgradeJobController) Name() string {
- return modules.NodeUpgradeJobControllerModuleName
-}
-
-// Group of controller
-func (uc *NodeUpgradeJobController) Group() string {
- return modules.NodeUpgradeJobControllerModuleGroup
-}
-
-// Enable indicates whether enable this module
-func (uc *NodeUpgradeJobController) Enable() bool {
- return uc.enable
-}
-
-// Start controller
-func (uc *NodeUpgradeJobController) Start() {
- if err := uc.downstream.Start(); err != nil {
- klog.Exitf("start NodeUpgradeJob controller downstream failed with error: %s", err)
- }
- // wait for downstream controller to start and load NodeUpgradeJob
- // TODO think about sync
- time.Sleep(1 * time.Second)
- if err := uc.upstream.Start(); err != nil {
- klog.Exitf("start NodeUpgradeJob controller upstream failed with error: %s", err)
- }
-}
diff --git a/cloud/pkg/imageprepullcontroller/config/config.go b/cloud/pkg/taskmanager/config/config.go
index 88572a842..1940f14f8 100644
--- a/cloud/pkg/imageprepullcontroller/config/config.go
+++ b/cloud/pkg/taskmanager/config/config.go
@@ -26,13 +26,13 @@ var Config Configure
var once sync.Once
type Configure struct {
- v1alpha1.ImagePrePullController
+ v1alpha1.TaskManager
}
-func InitConfigure(dc *v1alpha1.ImagePrePullController) {
+func InitConfigure(tm *v1alpha1.TaskManager) {
once.Do(func() {
Config = Configure{
- ImagePrePullController: *dc,
+ TaskManager: *tm,
}
})
}
diff --git a/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_controller.go b/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_controller.go
new file mode 100644
index 000000000..31dc525aa
--- /dev/null
+++ b/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_controller.go
@@ -0,0 +1,338 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package imageprepullcontroller
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "os"
+ "strconv"
+ "sync"
+ "time"
+
+ jsonpatch "github.com/evanphx/json-patch"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ apimachineryType "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/klog/v2"
+
+ beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/client"
+ keclient "github.com/kubeedge/kubeedge/cloud/pkg/common/client"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/informers"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/controller"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/manager"
+ commontypes "github.com/kubeedge/kubeedge/common/types"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
+ crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+)
+
+type ImagePrePullController struct {
+ sync.Mutex
+ *controller.BaseController
+}
+
+var cache *manager.TaskCache
+
+func NewImagePrePullController(messageChan chan util.TaskMessage) (*ImagePrePullController, error) {
+ var err error
+ cache, err = manager.NewTaskCache(
+ informers.GetInformersManager().GetKubeEdgeInformerFactory().Operations().V1alpha1().ImagePrePullJobs().Informer())
+ if err != nil {
+ klog.Warningf("Create image pre pull controller failed with error: %s", err)
+ return nil, err
+ }
+ return &ImagePrePullController{
+ BaseController: &controller.BaseController{
+ Informer: informers.GetInformersManager().GetKubeInformerFactory(),
+ TaskManager: cache,
+ MessageChan: messageChan,
+ CrdClient: client.GetCRDClient(),
+ KubeClient: keclient.GetKubeClient(),
+ },
+ }, nil
+}
+
+func (ndc *ImagePrePullController) ReportNodeStatus(taskID, nodeID string, event fsm.Event) (api.State, error) {
+ nodeFSM := NewImagePrePullNodeFSM(taskID, nodeID)
+ err := nodeFSM.AllowTransit(event)
+ if err != nil {
+ return "", err
+ }
+ state, err := nodeFSM.CurrentState()
+ if err != nil {
+ return "", err
+ }
+ ndc.Lock()
+ defer ndc.Unlock()
+ err = nodeFSM.Transit(event)
+ if err != nil {
+ return "", err
+ }
+ checkStatusChanged(nodeFSM, state)
+ state, err = nodeFSM.CurrentState()
+ if err != nil {
+ return "", err
+ }
+ return state, nil
+}
+
+func checkStatusChanged(nodeFSM *fsm.FSM, state api.State) {
+ err := wait.Poll(100*time.Millisecond, time.Second, func() (bool, error) {
+ nowState, err := nodeFSM.CurrentState()
+ if err != nil {
+ return false, nil
+ }
+ if nowState == state {
+ return false, nil
+ }
+ return true, err
+ })
+ if err != nil {
+ klog.V(4).Infof("check status changed failed: %s", err.Error())
+ }
+}
+
+func (ndc *ImagePrePullController) ReportTaskStatus(taskID string, event fsm.Event) (api.State, error) {
+ taskFSM := NewImagePrePullTaskFSM(taskID)
+ state, err := taskFSM.CurrentState()
+ if err != nil {
+ return "", err
+ }
+ err = taskFSM.AllowTransit(event)
+ if err != nil {
+ return "", err
+ }
+ err = taskFSM.Transit(event)
+ if err != nil {
+ return "", err
+ }
+ checkStatusChanged(taskFSM, state)
+ return taskFSM.CurrentState()
+}
+
+func (ndc *ImagePrePullController) StageCompleted(taskID string, state api.State) bool {
+ taskFSM := NewImagePrePullTaskFSM(taskID)
+ return taskFSM.TaskStagCompleted(state)
+}
+
+func (ndc *ImagePrePullController) GetNodeStatus(name string) ([]v1alpha1.TaskStatus, error) {
+ imagePrePull, err := ndc.CrdClient.OperationsV1alpha1().ImagePrePullJobs().Get(context.TODO(), name, metav1.GetOptions{})
+ if err != nil {
+ return nil, err
+ }
+ statusList := make([]v1alpha1.TaskStatus, len(imagePrePull.Status.Status))
+ for i, status := range imagePrePull.Status.Status {
+ if status.TaskStatus == nil {
+ statusList[i] = v1alpha1.TaskStatus{}
+ continue
+ }
+ statusList[i] = *status.TaskStatus
+ }
+ return statusList, nil
+}
+
+func (ndc *ImagePrePullController) UpdateNodeStatus(name string, nodeStatus []v1alpha1.TaskStatus) error {
+ imagePrePull, err := ndc.CrdClient.OperationsV1alpha1().ImagePrePullJobs().Get(context.TODO(), name, metav1.GetOptions{})
+ if err != nil {
+ return err
+ }
+ status := imagePrePull.Status
+ statusList := make([]v1alpha1.ImagePrePullStatus, len(nodeStatus))
+ for i := 0; i < len(nodeStatus); i++ {
+ statusList[i].TaskStatus = &nodeStatus[i]
+ }
+ status.Status = statusList
+ err = patchStatus(imagePrePull, status, ndc.CrdClient)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func patchStatus(imagePrePullJob *v1alpha1.ImagePrePullJob, status v1alpha1.ImagePrePullJobStatus, crdClient crdClientset.Interface) error {
+ oldData, err := json.Marshal(imagePrePullJob)
+ if err != nil {
+ return fmt.Errorf("failed to marshal the old ImagePrePullJob(%s): %v", imagePrePullJob.Name, err)
+ }
+ imagePrePullJob.Status = status
+ newData, err := json.Marshal(imagePrePullJob)
+ if err != nil {
+ return fmt.Errorf("failed to marshal the new ImagePrePullJob(%s): %v", imagePrePullJob.Name, err)
+ }
+
+ patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
+ if err != nil {
+ return fmt.Errorf("failed to create a merge patch: %v", err)
+ }
+
+ result, err := crdClient.OperationsV1alpha1().ImagePrePullJobs().Patch(context.TODO(), imagePrePullJob.Name, apimachineryType.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
+ if err != nil {
+ return fmt.Errorf("failed to patch update ImagePrePullJob status: %v", err)
+ }
+ klog.V(4).Info("patch update task status result: ", result)
+ return nil
+}
+
+func (ndc *ImagePrePullController) Start() error {
+ go ndc.startSync()
+ return nil
+}
+
+func (ndc *ImagePrePullController) startSync() {
+ imagePrePullList, err := ndc.CrdClient.OperationsV1alpha1().ImagePrePullJobs().List(context.TODO(), metav1.ListOptions{})
+ if err != nil {
+ klog.Errorf(err.Error())
+ os.Exit(2)
+ }
+ for _, imagePrePull := range imagePrePullList.Items {
+ if fsm.TaskFinish(imagePrePull.Status.State) {
+ continue
+ }
+ ndc.imagePrePullJobAdded(&imagePrePull)
+ }
+ for {
+ select {
+ case <-beehiveContext.Done():
+ klog.Info("stop sync ImagePrePullJob")
+ return
+ case e := <-ndc.TaskManager.Events():
+ prePull, ok := e.Object.(*v1alpha1.ImagePrePullJob)
+ if !ok {
+ klog.Warningf("object type: %T unsupported", e.Object)
+ continue
+ }
+ switch e.Type {
+ case watch.Added:
+ ndc.imagePrePullJobAdded(prePull)
+ case watch.Deleted:
+ ndc.imagePrePullJobDeleted(prePull)
+ case watch.Modified:
+ ndc.imagePrePullJobUpdated(prePull)
+ default:
+ klog.Warningf("ImagePrePullJob event type: %s unsupported", e.Type)
+ }
+ }
+ }
+}
+
+// imagePrePullJobAdded is used to process addition of new ImagePrePullJob in apiserver
+func (ndc *ImagePrePullController) imagePrePullJobAdded(imagePrePull *v1alpha1.ImagePrePullJob) {
+ klog.V(4).Infof("add ImagePrePullJob: %v", imagePrePull)
+ // store in cache map
+ ndc.TaskManager.CacheMap.Store(imagePrePull.Name, imagePrePull)
+
+ // If all or partial edge nodes image pull is pulling or completed, we don't need to send pull message
+ if fsm.TaskFinish(imagePrePull.Status.State) {
+ klog.Warning("The ImagePrePullJob is completed, don't send pull message again")
+ return
+ }
+
+ ndc.processPrePull(imagePrePull)
+}
+
+// processPrePull do the pre pull operation on node
+func (ndc *ImagePrePullController) processPrePull(imagePrePull *v1alpha1.ImagePrePullJob) {
+ imagePrePullTemplateInfo := imagePrePull.Spec.ImagePrePullTemplate
+ imagePrePullRequest := commontypes.ImagePrePullJobRequest{
+ Images: imagePrePullTemplateInfo.Images,
+ Secret: imagePrePullTemplateInfo.ImageSecret,
+ RetryTimes: imagePrePullTemplateInfo.RetryTimes,
+ CheckItems: imagePrePullTemplateInfo.CheckItems,
+ }
+ tolerate, err := strconv.ParseFloat(imagePrePull.Spec.ImagePrePullTemplate.FailureTolerate, 64)
+ if err != nil {
+ klog.Errorf("convert FailureTolerate to float64 failed: %v", err)
+ tolerate = 0.1
+ }
+
+ concurrency := imagePrePull.Spec.ImagePrePullTemplate.Concurrency
+ if concurrency <= 0 {
+ concurrency = 1
+ }
+ klog.V(4).Infof("deal task message: %v", imagePrePull)
+ ndc.MessageChan <- util.TaskMessage{
+ Type: util.TaskPrePull,
+ CheckItem: imagePrePull.Spec.ImagePrePullTemplate.CheckItems,
+ Name: imagePrePull.Name,
+ TimeOutSeconds: imagePrePull.Spec.ImagePrePullTemplate.TimeoutSeconds,
+ Concurrency: concurrency,
+ FailureTolerate: tolerate,
+ NodeNames: imagePrePull.Spec.ImagePrePullTemplate.NodeNames,
+ LabelSelector: imagePrePull.Spec.ImagePrePullTemplate.LabelSelector,
+ Status: v1alpha1.TaskStatus{},
+ Msg: imagePrePullRequest,
+ }
+}
+
+// imagePrePullJobDeleted is used to process deleted ImagePrePullJob in apiserver
+func (ndc *ImagePrePullController) imagePrePullJobDeleted(imagePrePull *v1alpha1.ImagePrePullJob) {
+ // just need to delete from cache map
+ ndc.TaskManager.CacheMap.Delete(imagePrePull.Name)
+ klog.Errorf("image pre pull job %s delete", imagePrePull.Name)
+ ndc.MessageChan <- util.TaskMessage{
+ Type: util.TaskPrePull,
+ Name: imagePrePull.Name,
+ ShutDown: true,
+ }
+}
+
+// imagePrePullJobUpdated is used to process update of new ImagePrePullJob in apiserver
+func (ndc *ImagePrePullController) imagePrePullJobUpdated(pullJob *v1alpha1.ImagePrePullJob) {
+ oldValue, ok := ndc.TaskManager.CacheMap.Load(pullJob.Name)
+ old := oldValue.(*v1alpha1.ImagePrePullJob)
+ if !ok {
+ klog.Infof("Update %s not exist, and store it first", pullJob.Name)
+ // If PrePull not present in PrePull map means it is not modified and added.
+ ndc.imagePrePullJobAdded(pullJob)
+ return
+ }
+
+ // store in cache map
+ ndc.TaskManager.CacheMap.Store(pullJob.Name, pullJob)
+
+ node := checkUpdateNode(old, pullJob)
+ if node == nil {
+ klog.Info("none node update")
+ return
+ }
+
+ ndc.MessageChan <- util.TaskMessage{
+ Type: util.TaskPrePull,
+ Name: pullJob.Name,
+ Status: *node,
+ }
+}
+
+func checkUpdateNode(old, new *v1alpha1.ImagePrePullJob) *v1alpha1.TaskStatus {
+ if len(old.Status.Status) == 0 {
+ return nil
+ }
+ for i, updateNode := range new.Status.Status {
+ oldNode := old.Status.Status[i]
+ if !util.NodeUpdated(*oldNode.TaskStatus, *updateNode.TaskStatus) {
+ continue
+ }
+ return updateNode.TaskStatus
+ }
+ return nil
+}
diff --git a/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_task.go b/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_task.go
new file mode 100644
index 000000000..3778a0ed8
--- /dev/null
+++ b/cloud/pkg/taskmanager/imageprepullcontroller/image_prepull_task.go
@@ -0,0 +1,133 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package imageprepullcontroller
+
+import (
+ "encoding/json"
+ "fmt"
+ "time"
+
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/client"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util"
+ fsmapi "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ v1alpha12 "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+)
+
+func currentPrePullNodeState(id, nodeName string) (v1alpha12.State, error) {
+ v, ok := cache.CacheMap.Load(id)
+ if !ok {
+ return "", fmt.Errorf("can not find task %s", id)
+ }
+ task := v.(*v1alpha1.ImagePrePullJob)
+ var state v1alpha12.State
+ for _, status := range task.Status.Status {
+ if status.NodeName == nodeName {
+ state = status.State
+ break
+ }
+ }
+ if state == "" {
+ state = v1alpha12.TaskInit
+ }
+ return state, nil
+}
+
+func updatePrePullNodeState(id, nodeName string, state v1alpha12.State, event fsm.Event) error {
+ v, ok := cache.CacheMap.Load(id)
+ if !ok {
+ return fmt.Errorf("can not find task %s", id)
+ }
+ task := v.(*v1alpha1.ImagePrePullJob)
+ newTask := task.DeepCopy()
+ status := newTask.Status.DeepCopy()
+ for i, nodeStatus := range status.Status {
+ if nodeStatus.NodeName == nodeName {
+ var imagesStatus []v1alpha1.ImageStatus
+ err := json.Unmarshal([]byte(event.ExternalMessage), &imagesStatus)
+ if err != nil {
+ klog.Warningf("Failed to unmarshal images status: %v", err)
+ }
+ status.Status[i] = v1alpha1.ImagePrePullStatus{
+ TaskStatus: &v1alpha1.TaskStatus{
+ NodeName: nodeName,
+ State: state,
+ Event: event.Type,
+ Action: event.Action,
+ Time: time.Now().Format(util.ISO8601UTC),
+ Reason: event.Msg,
+ },
+ ImageStatus: imagesStatus,
+ }
+ break
+ }
+ }
+ err := patchStatus(newTask, *status, client.GetCRDClient())
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func NewImagePrePullNodeFSM(taskName, nodeName string) *fsm.FSM {
+ fsm := &fsm.FSM{}
+ return fsm.NodeName(nodeName).ID(taskName).Guard(fsmapi.PrePullRule).StageSequence(fsmapi.PrePullStageSequence).CurrentFunc(currentPrePullNodeState).UpdateFunc(updatePrePullNodeState)
+}
+
+func NewImagePrePullTaskFSM(taskName string) *fsm.FSM {
+ fsm := &fsm.FSM{}
+ return fsm.ID(taskName).Guard(fsmapi.PrePullRule).StageSequence(fsmapi.PrePullStageSequence).CurrentFunc(currentPrePullTaskState).UpdateFunc(updateUpgradeTaskState)
+}
+
+func currentPrePullTaskState(id, _ string) (v1alpha12.State, error) {
+ v, ok := cache.CacheMap.Load(id)
+ if !ok {
+ return "", fmt.Errorf("can not find task %s", id)
+ }
+ task := v.(*v1alpha1.ImagePrePullJob)
+ state := task.Status.State
+ if state == "" {
+ state = v1alpha12.TaskInit
+ }
+ return state, nil
+}
+
+func updateUpgradeTaskState(id, _ string, state v1alpha12.State, event fsm.Event) error {
+ v, ok := cache.CacheMap.Load(id)
+ if !ok {
+ return fmt.Errorf("can not find task %s", id)
+ }
+ task := v.(*v1alpha1.ImagePrePullJob)
+ newTask := task.DeepCopy()
+ status := newTask.Status.DeepCopy()
+
+ status.Event = event.Type
+ status.Action = event.Action
+ status.Reason = event.Msg
+ status.State = state
+ status.Time = time.Now().Format(util.ISO8601UTC)
+
+ err := patchStatus(newTask, *status, client.GetCRDClient())
+
+ if err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/cloud/pkg/taskmanager/manager/downstream.go b/cloud/pkg/taskmanager/manager/downstream.go
new file mode 100644
index 000000000..9e8e47263
--- /dev/null
+++ b/cloud/pkg/taskmanager/manager/downstream.go
@@ -0,0 +1,64 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package manager
+
+import (
+ "k8s.io/klog/v2"
+
+ beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
+ "github.com/kubeedge/beehive/pkg/core/model"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer"
+)
+
+type DownstreamController struct {
+ downStreamChan chan model.Message
+ messageLayer messagelayer.MessageLayer
+}
+
+// Start DownstreamController
+func (dc *DownstreamController) Start() error {
+ klog.Info("Start TaskManager Downstream Controller")
+
+ go dc.syncTask()
+
+ return nil
+}
+
+// syncTask is used to get events from informer
+func (dc *DownstreamController) syncTask() {
+ for {
+ select {
+ case <-beehiveContext.Done():
+ klog.Info("stop sync tasks")
+ return
+ case msg := <-dc.downStreamChan:
+ err := dc.messageLayer.Send(msg)
+ if err != nil {
+ klog.Errorf("Failed to send upgrade message %v due to error %v", msg.GetID(), err)
+ return
+ }
+ }
+ }
+}
+
+func NewDownstreamController(messageChan chan model.Message) (*DownstreamController, error) {
+ dc := &DownstreamController{
+ downStreamChan: messageChan,
+ messageLayer: messagelayer.TaskManagerMessageLayer(),
+ }
+ return dc, nil
+}
diff --git a/cloud/pkg/taskmanager/manager/executor.go b/cloud/pkg/taskmanager/manager/executor.go
new file mode 100644
index 000000000..d3a7aec45
--- /dev/null
+++ b/cloud/pkg/taskmanager/manager/executor.go
@@ -0,0 +1,433 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package manager
+
+import (
+ "fmt"
+ "reflect"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/google/uuid"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/klog/v2"
+
+ beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
+ "github.com/kubeedge/beehive/pkg/core/model"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/client"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/nodeupgradecontroller"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/controller"
+ "github.com/kubeedge/kubeedge/common/constants"
+ commontypes "github.com/kubeedge/kubeedge/common/types"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+)
+
+const TimeOutSecond = 300
+
+type Executor struct {
+ task util.TaskMessage
+ statusChan chan *v1alpha1.TaskStatus
+ nodes []v1alpha1.TaskStatus
+ controller controller.Controller
+ maxFailedNodes float64
+ failedNodes map[string]bool
+ workers workers
+}
+
+func NewExecutorMachine(messageChan chan util.TaskMessage, downStreamChan chan model.Message) (*ExecutorMachine, error) {
+ executorMachine = &ExecutorMachine{
+ kubeClient: client.GetKubeClient(),
+ executors: map[string]*Executor{},
+ messageChan: messageChan,
+ downStreamChan: downStreamChan,
+ }
+ return executorMachine, nil
+}
+
+func GetExecutorMachine() *ExecutorMachine {
+ return executorMachine
+}
+
+// Start ExecutorMachine
+func (em *ExecutorMachine) Start() error {
+ klog.Info("Start ExecutorMachine")
+
+ go em.syncTask()
+
+ return nil
+}
+
+// syncTask is used to get events from informer
+func (em *ExecutorMachine) syncTask() {
+ for {
+ select {
+ case <-beehiveContext.Done():
+ klog.Info("stop sync tasks")
+ return
+ case msg := <-em.messageChan:
+ if msg.ShutDown {
+ klog.Errorf("delete executor %s ", msg.Name)
+ DeleteExecutor(msg)
+ break
+ }
+ err := GetExecutor(msg).HandleMessage(msg.Status)
+ if err != nil {
+ klog.Errorf("Failed to handel %s message due to error %s", msg.Type, err.Error())
+ break
+ }
+ }
+ }
+}
+
+type ExecutorMachine struct {
+ kubeClient kubernetes.Interface
+ executors map[string]*Executor
+ messageChan chan util.TaskMessage
+ downStreamChan chan model.Message
+ sync.Mutex
+}
+
+var executorMachine *ExecutorMachine
+
+func GetExecutor(msg util.TaskMessage) *Executor {
+ executorMachine.Lock()
+ e, ok := executorMachine.executors[fmt.Sprintf("%s::%s", msg.Type, msg.Name)]
+ executorMachine.Unlock()
+ if ok && e != nil {
+ return e
+ }
+ e, err := initExecutor(msg)
+ if err != nil {
+ klog.Errorf("executor init failed, error: %s", err.Error())
+ return nil
+ }
+ return e
+}
+
+func DeleteExecutor(msg util.TaskMessage) {
+ executorMachine.Lock()
+ defer executorMachine.Unlock()
+ delete(executorMachine.executors, fmt.Sprintf("%s::%s", msg.Type, msg.Name))
+}
+
+func (e *Executor) HandleMessage(status v1alpha1.TaskStatus) error {
+ if e == nil {
+ return fmt.Errorf("executor is nil")
+ }
+ e.statusChan <- &status
+ return nil
+}
+
+func (e *Executor) initMessage(node v1alpha1.TaskStatus) *model.Message {
+ // delete it in 1.18
+ if e.task.Type == util.TaskUpgrade {
+ msg := e.initHistoryMessage(node)
+ if msg != nil {
+ klog.Warningf("send history message to node")
+ return msg
+ }
+ }
+
+ msg := model.NewMessage("")
+ resource := buildTaskResource(e.task.Type, e.task.Name, node.NodeName)
+
+ taskReq := commontypes.NodeTaskRequest{
+ TaskID: e.task.Name,
+ Type: e.task.Type,
+ State: string(node.State),
+ }
+ taskReq.Item = e.task.Msg
+ if node.State == api.TaskChecking {
+ taskReq.Item = commontypes.NodePreCheckRequest{
+ CheckItem: e.task.CheckItem,
+ }
+ }
+ msg.BuildRouter(modules.TaskManagerModuleName, modules.TaskManagerModuleGroup, resource, e.task.Type).
+ FillBody(taskReq)
+ return msg
+}
+
+func (e *Executor) initHistoryMessage(node v1alpha1.TaskStatus) *model.Message {
+ resource := buildUpgradeResource(e.task.Name, node.NodeName)
+ req := e.task.Msg.(commontypes.NodeUpgradeJobRequest)
+ upgradeController := e.controller.(*nodeupgradecontroller.NodeUpgradeController)
+ edgeVersion, err := upgradeController.GetNodeVersion(node.NodeName)
+ if err != nil {
+ klog.Errorf("get node version failed: %s", err.Error())
+ return nil
+ }
+ less, err := util.VersionLess(edgeVersion, "v1.16.0")
+ if err != nil {
+ klog.Errorf("version less failed: %s", err.Error())
+ return nil
+ }
+ if !less {
+ return nil
+ }
+ klog.Warningf("edge version is %s, is less than version %s", edgeVersion, "v1.16.0")
+ upgradeReq := commontypes.NodeUpgradeJobRequest{
+ UpgradeID: e.task.Name,
+ HistoryID: uuid.New().String(),
+ UpgradeTool: "keadm",
+ Version: req.Version,
+ Image: req.Image,
+ }
+ msg := model.NewMessage("")
+ msg.BuildRouter(modules.NodeUpgradeJobControllerModuleName, modules.NodeUpgradeJobControllerModuleGroup, resource, util.TaskUpgrade).
+ FillBody(upgradeReq)
+ return msg
+}
+
+func initExecutor(message util.TaskMessage) (*Executor, error) {
+ controller, err := controller.GetController(message.Type)
+ if err != nil {
+ return nil, err
+ }
+ nodeStatus, err := controller.GetNodeStatus(message.Name)
+ if err != nil {
+ return nil, err
+ }
+ if len(nodeStatus) == 0 {
+ nodeList := controller.ValidateNode(message)
+ if len(nodeList) == 0 {
+ return nil, fmt.Errorf("no node need to be upgrade")
+ }
+ nodeStatus = make([]v1alpha1.TaskStatus, len(nodeList))
+ for i, node := range nodeList {
+ nodeStatus[i] = v1alpha1.TaskStatus{NodeName: node.Name}
+ }
+ err = controller.UpdateNodeStatus(message.Name, nodeStatus)
+ if err != nil {
+ return nil, err
+ }
+ }
+ e := &Executor{
+ task: message,
+ statusChan: make(chan *v1alpha1.TaskStatus, 10),
+ nodes: nodeStatus,
+ controller: controller,
+ maxFailedNodes: float64(len(nodeStatus)) * (message.FailureTolerate),
+ failedNodes: map[string]bool{},
+ workers: workers{
+ number: int(message.Concurrency),
+ jobs: make(map[string]int),
+ shuttingDown: false,
+ Mutex: sync.Mutex{},
+ },
+ }
+ go e.start()
+ executorMachine.executors[fmt.Sprintf("%s::%s", message.Type, message.Name)] = e
+ return e, nil
+}
+
+func (e *Executor) start() {
+ index, err := e.initWorker(0)
+ if err != nil {
+ klog.Errorf(err.Error())
+ return
+ }
+ for {
+ select {
+ case <-beehiveContext.Done():
+ klog.Info("stop sync tasks")
+ return
+ case status := <-e.statusChan:
+ if reflect.DeepEqual(*status, v1alpha1.TaskStatus{}) {
+ break
+ }
+ if !e.controller.StageCompleted(e.task.Name, status.State) {
+ break
+ }
+ var endNode int
+ endNode, err = e.workers.endJob(status.NodeName)
+ if err != nil {
+ klog.Errorf(err.Error())
+ break
+ }
+
+ e.nodes[endNode] = *status
+ err = e.dealFailedNode(*status)
+ if err != nil {
+ klog.Warning(err.Error())
+ break
+ }
+
+ if index >= len(e.nodes) {
+ if len(e.workers.jobs) != 0 {
+ break
+ }
+ var state api.State
+ state, err = e.completedTaskStage()
+ if err != nil {
+ klog.Errorf(err.Error())
+ break
+ }
+ if fsm.TaskFinish(state) {
+ DeleteExecutor(e.task)
+ klog.Infof("task %s is finish", e.task.Name)
+ return
+ }
+
+ // next stage
+ index = 0
+ }
+
+ index, err = e.initWorker(index)
+ if err != nil {
+ klog.Errorf(err.Error())
+ }
+ }
+ }
+}
+
+func (e *Executor) dealFailedNode(node v1alpha1.TaskStatus) error {
+ if node.State == api.TaskFailed {
+ e.failedNodes[node.NodeName] = true
+ }
+ if float64(len(e.failedNodes)) < e.maxFailedNodes {
+ return nil
+ }
+ e.workers.shuttingDown = true
+ if len(e.workers.jobs) > 0 {
+ klog.Warningf("wait for all workers(%d/%d) for task %s to finish running ", len(e.workers.jobs), e.workers.number, e.task.Name)
+ return nil
+ }
+
+ errMsg := fmt.Sprintf("the number of failed nodes is %d/%d, which exceeds the failure tolerance threshold.", len(e.failedNodes), len(e.nodes))
+ _, err := e.controller.ReportTaskStatus(e.task.Name, fsm.Event{
+ Type: node.Event,
+ Action: api.ActionFailure,
+ Msg: errMsg,
+ })
+ if err != nil {
+ return fmt.Errorf("%s, report status failed, %s", errMsg, err.Error())
+ }
+ return fmt.Errorf(errMsg)
+}
+
+func (e *Executor) completedTaskStage() (api.State, error) {
+ var event = e.nodes[0].Event
+ for _, node := range e.nodes {
+ if node.State != api.TaskFailed {
+ event = node.Event
+ break
+ }
+ }
+ state, err := e.controller.ReportTaskStatus(e.task.Name, fsm.Event{
+ Type: event,
+ Action: api.ActionSuccess,
+ })
+ if err != nil {
+ return "", err
+ }
+ return state, nil
+}
+
+func (e *Executor) initWorker(index int) (int, error) {
+ for ; index < len(e.nodes); index++ {
+ node := e.nodes[index]
+ if e.controller.StageCompleted(e.task.Name, node.State) {
+ err := e.dealFailedNode(node)
+ if err != nil {
+ return 0, err
+ }
+ continue
+ }
+ err := e.workers.addJob(node, index, e)
+ if err != nil {
+ klog.V(4).Info(err.Error())
+ break
+ }
+ }
+ return index, nil
+}
+
+type workers struct {
+ number int
+ jobs map[string]int
+ sync.Mutex
+ shuttingDown bool
+}
+
+func (w *workers) addJob(node v1alpha1.TaskStatus, index int, e *Executor) error {
+ if w.shuttingDown {
+ return fmt.Errorf("workers is stopped")
+ }
+ w.Lock()
+ if len(w.jobs) >= w.number {
+ w.Unlock()
+ return fmt.Errorf("workers are all running, %v/%v", len(w.jobs), w.number)
+ }
+ w.jobs[node.NodeName] = index
+ w.Unlock()
+ msg := e.initMessage(node)
+ go e.handelTimeOutJob(index)
+ executorMachine.downStreamChan <- *msg
+ return nil
+}
+
+func (e *Executor) handelTimeOutJob(index int) {
+ lastState := e.nodes[index].State
+ timeoutSecond := *e.task.TimeOutSeconds
+ if timeoutSecond == 0 {
+ timeoutSecond = TimeOutSecond
+ }
+ err := wait.Poll(1*time.Second, time.Duration(timeoutSecond)*time.Second, func() (bool, error) {
+ if lastState != e.nodes[index].State || fsm.TaskFinish(e.nodes[index].State) {
+ return true, nil
+ }
+ klog.V(4).Infof("node %s stage is not completed", e.nodes[index].NodeName)
+ return false, nil
+ })
+ if err != nil {
+ _, err = e.controller.ReportNodeStatus(e.task.Name, e.nodes[index].NodeName, fsm.Event{
+ Type: api.EventTimeOut,
+ Action: api.ActionFailure,
+ Msg: fmt.Sprintf("node task %s execution timeout, %s", lastState, err.Error()),
+ })
+ if err != nil {
+ klog.Warningf(err.Error())
+ }
+ }
+}
+
+func (w *workers) endJob(job string) (int, error) {
+ index, ok := w.jobs[job]
+ if !ok {
+ return index, fmt.Errorf("end job %s error, job not exist", job)
+ }
+ w.Lock()
+ delete(w.jobs, job)
+ w.Unlock()
+ return index, nil
+}
+
+func buildTaskResource(task, taskID, nodeID string) string {
+ resource := strings.Join([]string{task, taskID, "node", nodeID}, constants.ResourceSep)
+ return resource
+}
+
+func buildUpgradeResource(upgradeID, nodeID string) string {
+ resource := strings.Join([]string{util.TaskUpgrade, upgradeID, "node", nodeID}, constants.ResourceSep)
+ return resource
+}
diff --git a/cloud/pkg/imageprepullcontroller/controller/upstream.go b/cloud/pkg/taskmanager/manager/upstream.go
index 98fd28bea..b94370cf3 100644
--- a/cloud/pkg/imageprepullcontroller/controller/upstream.go
+++ b/cloud/pkg/taskmanager/manager/upstream.go
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package controller
+package manager
import (
"encoding/json"
@@ -28,15 +28,17 @@ import (
keclient "github.com/kubeedge/kubeedge/cloud/pkg/common/client"
"github.com/kubeedge/kubeedge/cloud/pkg/common/informers"
"github.com/kubeedge/kubeedge/cloud/pkg/common/messagelayer"
- "github.com/kubeedge/kubeedge/cloud/pkg/imageprepullcontroller/config"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/config"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/controller"
"github.com/kubeedge/kubeedge/common/types"
- "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
)
// UpstreamController subscribe messages from edge and sync to k8s api server
type UpstreamController struct {
- // downstream controller to update ImagePrePullJob status in cache
+ // downstream controller to update NodeUpgradeJob status in cache
dc *DownstreamController
kubeClient kubernetes.Interface
@@ -44,103 +46,99 @@ type UpstreamController struct {
crdClient crdClientset.Interface
messageLayer messagelayer.MessageLayer
// message channel
- imagePrePullJobStatusChan chan model.Message
+ taskStatusChan chan model.Message
}
-// Start imageprepull UpstreamController
+// Start UpstreamController
func (uc *UpstreamController) Start() error {
- klog.Info("Start ImagePrePullJob Upstream Controller")
+ klog.Info("Start Task Upstream Controller")
- uc.imagePrePullJobStatusChan = make(chan model.Message, config.Config.Buffer.ImagePrePullJobStatus)
+ uc.taskStatusChan = make(chan model.Message, config.Config.Buffer.TaskStatus)
go uc.dispatchMessage()
- for i := 0; i < int(config.Config.Load.ImagePrePullJobWorkers); i++ {
- go uc.updateImagePrePullJobStatus()
+ for i := 0; i < int(config.Config.Load.TaskWorkers); i++ {
+ go uc.updateTaskStatus()
}
return nil
}
-// updateImagePrePullJobStatus update imagePrePullJob status
-func (uc *UpstreamController) updateImagePrePullJobStatus() {
+// Start UpstreamController
+func (uc *UpstreamController) dispatchMessage() {
+ for {
+ select {
+ case <-beehiveContext.Done():
+ klog.Info("Stop dispatch task upstream message")
+ return
+ default:
+ }
+
+ msg, err := uc.messageLayer.Receive()
+ if err != nil {
+ klog.Warningf("Receive message failed, %v", err)
+ continue
+ }
+
+ klog.V(4).Infof("task upstream controller receive msg %#v", msg)
+
+ uc.taskStatusChan <- msg
+ }
+}
+
+// updateTaskStatus update NodeUpgradeJob status field
+func (uc *UpstreamController) updateTaskStatus() {
for {
select {
case <-beehiveContext.Done():
- klog.Info("Stop update ImagePrePullJob status")
+ klog.Info("Stop update NodeUpgradeJob status")
return
- case msg := <-uc.imagePrePullJobStatusChan:
+ case msg := <-uc.taskStatusChan:
klog.V(4).Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource())
- nodeName, jobName, err := parsePrePullresource(msg.GetResource())
- if err != nil {
- klog.Errorf("message resource %s is not supported", msg.GetResource())
- continue
- }
- oldValue, ok := uc.dc.imagePrePullJobManager.ImagePrePullMap.Load(jobName)
- if !ok {
- klog.Errorf("ImagePrePullJob %s not exist", jobName)
- continue
- }
+ // get nodeID and upgradeID from Upgrade msg:
+ nodeID := util.GetNodeName(msg.GetResource())
+ taskID := util.GetTaskID(msg.GetResource())
- imagePrePull, ok := oldValue.(*v1alpha1.ImagePrePullJob)
- if !ok {
- klog.Errorf("ImagePrePullJob info %T is not valid", oldValue)
+ data, err := msg.GetContentData()
+ if err != nil {
+ klog.Errorf("failed to get node upgrade content data: %v", err)
continue
}
- data, err := msg.GetContentData()
+ c, err := controller.GetController(msg.GetOperation())
if err != nil {
- klog.Errorf("failed to get image prepull content from response msg, err: %v", err)
+ klog.Errorf("Failed to get controller: %v", err)
continue
}
- resp := &types.ImagePrePullJobResponse{}
- err = json.Unmarshal(data, resp)
+
+ resp := types.NodeTaskResponse{}
+ err = json.Unmarshal(data, &resp)
if err != nil {
- klog.Errorf("Failed to unmarshal image prepull response: %v", err)
+ klog.Errorf("Failed to unmarshal node upgrade response: %v", err)
continue
}
-
- status := &v1alpha1.ImagePrePullStatus{
- NodeName: nodeName,
- State: resp.State,
- Reason: resp.Reason,
- ImageStatus: resp.ImageStatus,
+ event := fsm.Event{
+ Type: resp.Event,
+ Action: resp.Action,
+ Msg: resp.Reason,
+ ExternalMessage: resp.ExternalMessage,
}
- err = patchImagePrePullStatus(uc.crdClient, imagePrePull, status)
+
+ _, err = c.ReportNodeStatus(taskID, nodeID, event)
if err != nil {
- klog.Errorf("Failed to patch ImagePrePullJob status, err: %v", err)
+ klog.Errorf("Failed to report status: %v", err)
+ continue
}
}
}
}
-// dispatchMessage receive the message from edge and write into imagePrePullJobStatusChan
-func (uc *UpstreamController) dispatchMessage() {
- for {
- select {
- case <-beehiveContext.Done():
- klog.Info("Stop dispatch ImagePrePullJob upstream message")
- return
- default:
- }
-
- msg, err := uc.messageLayer.Receive()
- if err != nil {
- klog.Warningf("Receive message failed, %v", err)
- continue
- }
-
- klog.V(4).Infof("ImagePrePullJob upstream controller receive msg %#v", msg)
- uc.imagePrePullJobStatusChan <- msg
- }
-}
-
// NewUpstreamController create UpstreamController from config
func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) {
uc := &UpstreamController{
kubeClient: keclient.GetKubeClient(),
informer: informers.GetInformersManager().GetKubeInformerFactory(),
crdClient: keclient.GetCRDClient(),
- messageLayer: messagelayer.ImagePrePullControllerMessageLayer(),
+ messageLayer: messagelayer.TaskManagerMessageLayer(),
dc: dc,
}
return uc, nil
diff --git a/cloud/pkg/taskmanager/nodeupgradecontroller/node_upgrade_controller.go b/cloud/pkg/taskmanager/nodeupgradecontroller/node_upgrade_controller.go
new file mode 100644
index 000000000..649b95cbb
--- /dev/null
+++ b/cloud/pkg/taskmanager/nodeupgradecontroller/node_upgrade_controller.go
@@ -0,0 +1,386 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package nodeupgradecontroller
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "os"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ jsonpatch "github.com/evanphx/json-patch"
+ "github.com/google/uuid"
+ v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ apimachineryType "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/klog/v2"
+
+ beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/client"
+ keclient "github.com/kubeedge/kubeedge/cloud/pkg/common/client"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/informers"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/controller"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/manager"
+ commontypes "github.com/kubeedge/kubeedge/common/types"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
+ crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+)
+
+const NodeUpgrade = "NodeUpgradeController"
+
+type NodeUpgradeController struct {
+ sync.Mutex
+ *controller.BaseController
+}
+
+var cache *manager.TaskCache
+
+func NewNodeUpgradeController(messageChan chan util.TaskMessage) (*NodeUpgradeController, error) {
+ var err error
+ cache, err = manager.NewTaskCache(
+ informers.GetInformersManager().GetKubeEdgeInformerFactory().Operations().V1alpha1().NodeUpgradeJobs().Informer())
+ if err != nil {
+ klog.Warningf("Create NodeUpgradeJob manager failed with error: %s", err)
+ return nil, err
+ }
+ return &NodeUpgradeController{
+ BaseController: &controller.BaseController{
+ Informer: informers.GetInformersManager().GetKubeInformerFactory(),
+ TaskManager: cache,
+ MessageChan: messageChan,
+ CrdClient: client.GetCRDClient(),
+ KubeClient: keclient.GetKubeClient(),
+ },
+ }, nil
+}
+
+func (ndc *NodeUpgradeController) ReportNodeStatus(taskID, nodeID string, event fsm.Event) (api.State, error) {
+ nodeFSM := NewUpgradeNodeFSM(taskID, nodeID)
+ err := nodeFSM.AllowTransit(event)
+ if err != nil {
+ return "", err
+ }
+ state, err := nodeFSM.CurrentState()
+ if err != nil {
+ return "", err
+ }
+ ndc.Lock()
+ defer ndc.Unlock()
+ err = nodeFSM.Transit(event)
+ if err != nil {
+ return "", err
+ }
+ checkStatusChanged(nodeFSM, state)
+ state, err = nodeFSM.CurrentState()
+ if err != nil {
+ return "", err
+ }
+ return state, nil
+}
+
+func checkStatusChanged(nodeFSM *fsm.FSM, state api.State) {
+ err := wait.Poll(100*time.Millisecond, time.Second, func() (bool, error) {
+ nowState, err := nodeFSM.CurrentState()
+ if err != nil {
+ return false, nil
+ }
+ if nowState == state {
+ return false, nil
+ }
+ return true, err
+ })
+ if err != nil {
+ klog.V(4).Infof("check status changed failed: %s", err.Error())
+ }
+}
+
+func (ndc *NodeUpgradeController) ReportTaskStatus(taskID string, event fsm.Event) (api.State, error) {
+ taskFSM := NewUpgradeTaskFSM(taskID)
+ state, err := taskFSM.CurrentState()
+ if err != nil {
+ return "", err
+ }
+ err = taskFSM.AllowTransit(event)
+ if err != nil {
+ return "", err
+ }
+ err = taskFSM.Transit(event)
+ if err != nil {
+ return "", err
+ }
+ checkStatusChanged(taskFSM, state)
+ return taskFSM.CurrentState()
+}
+
+func (ndc *NodeUpgradeController) ValidateNode(taskMessage util.TaskMessage) []v1.Node {
+ var validateNodes []v1.Node
+ nodes := ndc.BaseController.ValidateNode(taskMessage)
+ req, ok := taskMessage.Msg.(commontypes.NodeUpgradeJobRequest)
+ if !ok {
+ klog.Errorf("convert message to commontypes.NodeUpgradeJobRequest failed")
+ return nil
+ }
+ for _, node := range nodes {
+ if needUpgrade(node, req.Version) {
+ validateNodes = append(validateNodes, node)
+ }
+ }
+ return validateNodes
+}
+
+func (ndc *NodeUpgradeController) StageCompleted(taskID string, state api.State) bool {
+ taskFSM := NewUpgradeTaskFSM(taskID)
+ return taskFSM.TaskStagCompleted(state)
+}
+
+func (ndc *NodeUpgradeController) GetNodeStatus(name string) ([]v1alpha1.TaskStatus, error) {
+ nodeUpgrade, err := ndc.CrdClient.OperationsV1alpha1().NodeUpgradeJobs().Get(context.TODO(), name, metav1.GetOptions{})
+ if err != nil {
+ return nil, err
+ }
+ return nodeUpgrade.Status.Status, nil
+}
+
+func (ndc *NodeUpgradeController) GetNodeVersion(name string) (string, error) {
+ node, err := ndc.Informer.Core().V1().Nodes().Lister().Get(name)
+ if err != nil {
+ return "", err
+ }
+ strs := strings.Split(node.Status.NodeInfo.KubeletVersion, "-")
+ return strs[2], nil
+}
+
+func (ndc *NodeUpgradeController) UpdateNodeStatus(name string, nodeStatus []v1alpha1.TaskStatus) error {
+ nodeUpgrade, err := ndc.CrdClient.OperationsV1alpha1().NodeUpgradeJobs().Get(context.TODO(), name, metav1.GetOptions{})
+ if err != nil {
+ return err
+ }
+ status := nodeUpgrade.Status
+ status.Status = nodeStatus
+ err = patchStatus(nodeUpgrade, status, ndc.CrdClient)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func patchStatus(nodeUpgrade *v1alpha1.NodeUpgradeJob, status v1alpha1.NodeUpgradeJobStatus, crdClient crdClientset.Interface) error {
+ oldData, err := json.Marshal(nodeUpgrade)
+ if err != nil {
+ return fmt.Errorf("failed to marshal the old NodeUpgradeJob(%s): %v", nodeUpgrade.Name, err)
+ }
+ nodeUpgrade.Status = status
+ newData, err := json.Marshal(nodeUpgrade)
+ if err != nil {
+ return fmt.Errorf("failed to marshal the new NodeUpgradeJob(%s): %v", nodeUpgrade.Name, err)
+ }
+
+ patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
+ if err != nil {
+ return fmt.Errorf("failed to create a merge patch: %v", err)
+ }
+
+ result, err := crdClient.OperationsV1alpha1().NodeUpgradeJobs().Patch(context.TODO(), nodeUpgrade.Name, apimachineryType.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
+ if err != nil {
+ return fmt.Errorf("failed to patch update NodeUpgradeJob status: %v", err)
+ }
+ klog.V(4).Info("patch upgrade task status result: ", result)
+ return nil
+}
+
+func (ndc *NodeUpgradeController) Start() error {
+ go ndc.startSync()
+ return nil
+}
+
+func (ndc *NodeUpgradeController) startSync() {
+ nodeUpgradeList, err := ndc.CrdClient.OperationsV1alpha1().NodeUpgradeJobs().List(context.TODO(), metav1.ListOptions{})
+ if err != nil {
+ klog.Errorf(err.Error())
+ os.Exit(2)
+ }
+ for _, nodeUpgrade := range nodeUpgradeList.Items {
+ if fsm.TaskFinish(nodeUpgrade.Status.State) {
+ continue
+ }
+ ndc.nodeUpgradeJobAdded(&nodeUpgrade)
+ }
+ for {
+ select {
+ case <-beehiveContext.Done():
+ klog.Info("stop sync NodeUpgradeJob")
+ return
+ case e := <-ndc.TaskManager.Events():
+ upgrade, ok := e.Object.(*v1alpha1.NodeUpgradeJob)
+ if !ok {
+ klog.Warningf("object type: %T unsupported", e.Object)
+ continue
+ }
+ switch e.Type {
+ case watch.Added:
+ ndc.nodeUpgradeJobAdded(upgrade)
+ case watch.Deleted:
+ ndc.nodeUpgradeJobDeleted(upgrade)
+ case watch.Modified:
+ ndc.nodeUpgradeJobUpdated(upgrade)
+ default:
+ klog.Warningf("NodeUpgradeJob event type: %s unsupported", e.Type)
+ }
+ }
+ }
+}
+
+// nodeUpgradeJobAdded is used to process addition of new NodeUpgradeJob in apiserver
+func (ndc *NodeUpgradeController) nodeUpgradeJobAdded(upgrade *v1alpha1.NodeUpgradeJob) {
+ klog.V(4).Infof("add NodeUpgradeJob: %v", upgrade)
+ // store in cache map
+ ndc.TaskManager.CacheMap.Store(upgrade.Name, upgrade)
+
+ // If all or partial edge nodes upgrade is upgrading or completed, we don't need to send upgrade message
+ if fsm.TaskFinish(upgrade.Status.State) {
+ klog.Warning("The nodeUpgradeJob is completed, don't send upgrade message again")
+ return
+ }
+
+ ndc.processUpgrade(upgrade)
+}
+
+// processUpgrade do the upgrade operation on node
+func (ndc *NodeUpgradeController) processUpgrade(upgrade *v1alpha1.NodeUpgradeJob) {
+ // if users specify Image, we'll use upgrade Version as its image tag, even though Image contains tag.
+ // if not, we'll use default image: kubeedge/installation-package:${Version}
+ var repo string
+ var err error
+ repo = "kubeedge/installation-package"
+ if upgrade.Spec.Image != "" {
+ repo, err = util.GetImageRepo(upgrade.Spec.Image)
+ if err != nil {
+ klog.Errorf("Image format is not right: %v", err)
+ return
+ }
+ }
+ imageTag := upgrade.Spec.Version
+ image := fmt.Sprintf("%s:%s", repo, imageTag)
+
+ upgradeReq := commontypes.NodeUpgradeJobRequest{
+ UpgradeID: upgrade.Name,
+ HistoryID: uuid.New().String(),
+ Version: upgrade.Spec.Version,
+ Image: image,
+ }
+
+ tolerate, err := strconv.ParseFloat(upgrade.Spec.FailureTolerate, 64)
+ if err != nil {
+ klog.Errorf("convert FailureTolerate to float64 failed: %v", err)
+ tolerate = 0.1
+ }
+
+ concurrency := upgrade.Spec.Concurrency
+ if concurrency <= 0 {
+ concurrency = 1
+ }
+ klog.V(4).Infof("deal task message: %v", upgrade)
+ ndc.MessageChan <- util.TaskMessage{
+ Type: util.TaskUpgrade,
+ CheckItem: upgrade.Spec.CheckItems,
+ Name: upgrade.Name,
+ TimeOutSeconds: upgrade.Spec.TimeoutSeconds,
+ Concurrency: concurrency,
+ FailureTolerate: tolerate,
+ NodeNames: upgrade.Spec.NodeNames,
+ LabelSelector: upgrade.Spec.LabelSelector,
+ Status: v1alpha1.TaskStatus{},
+ Msg: upgradeReq,
+ }
+}
+
+func needUpgrade(node v1.Node, upgradeVersion string) bool {
+ if util.FilterVersion(node.Status.NodeInfo.KubeletVersion, upgradeVersion) {
+ klog.Warningf("Node(%s) version(%s) already on the expected version %s.", node.Name, node.Status.NodeInfo.KubeletVersion, upgradeVersion)
+ return false
+ }
+
+ // if node is in Upgrading state, don't need upgrade
+ if _, ok := node.Labels[util.NodeUpgradeJobStatusKey]; ok {
+ klog.Warningf("Node(%s) is in upgrade state", node.Name)
+ return false
+ }
+
+ return true
+}
+
+// nodeUpgradeJobDeleted is used to process deleted NodeUpgradeJob in apiserver
+func (ndc *NodeUpgradeController) nodeUpgradeJobDeleted(upgrade *v1alpha1.NodeUpgradeJob) {
+ // just need to delete from cache map
+ ndc.TaskManager.CacheMap.Delete(upgrade.Name)
+ klog.Errorf("upgrade job %s delete", upgrade.Name)
+ ndc.MessageChan <- util.TaskMessage{
+ Type: util.TaskUpgrade,
+ Name: upgrade.Name,
+ ShutDown: true,
+ }
+}
+
+// upgradeAdded is used to process update of new NodeUpgradeJob in apiserver
+func (ndc *NodeUpgradeController) nodeUpgradeJobUpdated(upgrade *v1alpha1.NodeUpgradeJob) {
+ oldValue, ok := ndc.TaskManager.CacheMap.Load(upgrade.Name)
+ old := oldValue.(*v1alpha1.NodeUpgradeJob)
+ if !ok {
+ klog.Infof("Upgrade %s not exist, and store it first", upgrade.Name)
+ // If Upgrade not present in Upgrade map means it is not modified and added.
+ ndc.nodeUpgradeJobAdded(upgrade)
+ return
+ }
+
+ // store in cache map
+ ndc.TaskManager.CacheMap.Store(upgrade.Name, upgrade)
+
+ node := checkUpdateNode(old, upgrade)
+ if node == nil {
+ klog.Info("none node update")
+ return
+ }
+
+ ndc.MessageChan <- util.TaskMessage{
+ Type: util.TaskUpgrade,
+ Name: upgrade.Name,
+ Status: *node,
+ }
+}
+
+func checkUpdateNode(old, new *v1alpha1.NodeUpgradeJob) *v1alpha1.TaskStatus {
+ if len(old.Status.Status) == 0 {
+ return nil
+ }
+ for i, updateNode := range new.Status.Status {
+ oldNode := old.Status.Status[i]
+ if !util.NodeUpdated(oldNode, updateNode) {
+ continue
+ }
+ return &updateNode
+ }
+ return nil
+}
diff --git a/cloud/pkg/taskmanager/nodeupgradecontroller/upgrade_task.go b/cloud/pkg/taskmanager/nodeupgradecontroller/upgrade_task.go
new file mode 100644
index 000000000..6f28eb9ae
--- /dev/null
+++ b/cloud/pkg/taskmanager/nodeupgradecontroller/upgrade_task.go
@@ -0,0 +1,122 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package nodeupgradecontroller
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/client"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util"
+ fsmapi "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ v1alpha12 "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+)
+
+func currentUpgradeNodeState(id, nodeName string) (v1alpha12.State, error) {
+ v, ok := cache.CacheMap.Load(id)
+ if !ok {
+ return "", fmt.Errorf("can not find task %s", id)
+ }
+ task := v.(*v1alpha1.NodeUpgradeJob)
+ var state v1alpha12.State
+ for _, status := range task.Status.Status {
+ if status.NodeName == nodeName {
+ state = status.State
+ break
+ }
+ }
+ if state == "" {
+ state = v1alpha12.TaskInit
+ }
+ return state, nil
+}
+
+func updateUpgradeNodeState(id, nodeName string, state v1alpha12.State, event fsm.Event) error {
+ v, ok := cache.CacheMap.Load(id)
+ if !ok {
+ return fmt.Errorf("can not find task %s", id)
+ }
+ task := v.(*v1alpha1.NodeUpgradeJob)
+ newTask := task.DeepCopy()
+ status := newTask.Status.DeepCopy()
+ for i, nodeStatus := range status.Status {
+ if nodeStatus.NodeName == nodeName {
+ status.Status[i] = v1alpha1.TaskStatus{
+ NodeName: nodeName,
+ State: state,
+ Event: event.Type,
+ Action: event.Action,
+ Time: time.Now().Format(util.ISO8601UTC),
+ Reason: event.Msg,
+ }
+ break
+ }
+ }
+ err := patchStatus(newTask, *status, client.GetCRDClient())
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func NewUpgradeNodeFSM(taskName, nodeName string) *fsm.FSM {
+ fsm := &fsm.FSM{}
+ return fsm.NodeName(nodeName).ID(taskName).Guard(fsmapi.UpgradeRule).StageSequence(fsmapi.UpdateStageSequence).CurrentFunc(currentUpgradeNodeState).UpdateFunc(updateUpgradeNodeState)
+}
+
+func NewUpgradeTaskFSM(taskName string) *fsm.FSM {
+ fsm := &fsm.FSM{}
+ return fsm.ID(taskName).Guard(fsmapi.UpgradeRule).StageSequence(fsmapi.UpdateStageSequence).CurrentFunc(currentUpgradeTaskState).UpdateFunc(updateUpgradeTaskState)
+}
+
+func currentUpgradeTaskState(id, _ string) (v1alpha12.State, error) {
+ v, ok := cache.CacheMap.Load(id)
+ if !ok {
+ return "", fmt.Errorf("can not find task %s", id)
+ }
+ task := v.(*v1alpha1.NodeUpgradeJob)
+ state := task.Status.State
+ if state == "" {
+ state = v1alpha12.TaskInit
+ }
+ return state, nil
+}
+
+func updateUpgradeTaskState(id, _ string, state v1alpha12.State, event fsm.Event) error {
+ v, ok := cache.CacheMap.Load(id)
+ if !ok {
+ return fmt.Errorf("can not find task %s", id)
+ }
+ task := v.(*v1alpha1.NodeUpgradeJob)
+ newTask := task.DeepCopy()
+ status := newTask.Status.DeepCopy()
+
+ status.Event = event.Type
+ status.Action = event.Action
+ status.Reason = event.Msg
+ status.State = state
+ status.Time = time.Now().Format(util.ISO8601UTC)
+
+ err := patchStatus(newTask, *status, client.GetCRDClient())
+
+ if err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/cloud/pkg/taskmanager/task_manager.go b/cloud/pkg/taskmanager/task_manager.go
new file mode 100644
index 000000000..425ecff64
--- /dev/null
+++ b/cloud/pkg/taskmanager/task_manager.go
@@ -0,0 +1,123 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package taskmanager
+
+import (
+ "time"
+
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/beehive/pkg/core"
+ "github.com/kubeedge/beehive/pkg/core/model"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/config"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/imageprepullcontroller"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/manager"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/nodeupgradecontroller"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/controller"
+ "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1"
+)
+
+type TaskManager struct {
+ downstream *manager.DownstreamController
+ executorMachine *manager.ExecutorMachine
+ upstream *manager.UpstreamController
+ enable bool
+}
+
+var _ core.Module = (*TaskManager)(nil)
+
+func newTaskManager(enable bool) *TaskManager {
+ if !enable {
+ return &TaskManager{enable: enable}
+ }
+ taskMessage := make(chan util.TaskMessage, 10)
+ downStreamMessage := make(chan model.Message, 10)
+ downstream, err := manager.NewDownstreamController(downStreamMessage)
+ if err != nil {
+ klog.Exitf("New task manager downstream failed with error: %s", err)
+ }
+ upstream, err := manager.NewUpstreamController(downstream)
+ if err != nil {
+ klog.Exitf("New task manager upstream failed with error: %s", err)
+ }
+ executorMachine, err := manager.NewExecutorMachine(taskMessage, downStreamMessage)
+ if err != nil {
+ klog.Exitf("New executor machine failed with error: %s", err)
+ }
+
+ upgradeNodeController, err := nodeupgradecontroller.NewNodeUpgradeController(taskMessage)
+ if err != nil {
+ klog.Exitf("New upgrade node controller failed with error: %s", err)
+ }
+
+ imagePrePullController, err := imageprepullcontroller.NewImagePrePullController(taskMessage)
+ if err != nil {
+ klog.Exitf("New upgrade node controller failed with error: %s", err)
+ }
+ controller.Register(util.TaskUpgrade, upgradeNodeController)
+ controller.Register(util.TaskPrePull, imagePrePullController)
+
+ return &TaskManager{
+ downstream: downstream,
+ executorMachine: executorMachine,
+ upstream: upstream,
+ enable: enable,
+ }
+}
+
+func Register(dc *v1alpha1.TaskManager) {
+ config.InitConfigure(dc)
+ core.Register(newTaskManager(dc.Enable))
+ //core.Register(newNodeUpgradeJobController())
+}
+
+// Name of controller
+func (uc *TaskManager) Name() string {
+ return modules.TaskManagerModuleName
+}
+
+// Group of controller
+func (uc *TaskManager) Group() string {
+ return modules.TaskManagerModuleGroup
+}
+
+// Enable indicates whether enable this module
+func (uc *TaskManager) Enable() bool {
+ return uc.enable
+}
+
+// Start controller
+func (uc *TaskManager) Start() {
+ if err := uc.downstream.Start(); err != nil {
+ klog.Exitf("start task manager downstream failed with error: %s", err)
+ }
+ // wait for downstream controller to start and load NodeUpgradeJob
+ // TODO think about sync
+ time.Sleep(1 * time.Second)
+ if err := uc.upstream.Start(); err != nil {
+ klog.Exitf("start task manager upstream failed with error: %s", err)
+ }
+ if err := uc.executorMachine.Start(); err != nil {
+ klog.Exitf("start task manager executorMachine failed with error: %s", err)
+ }
+
+ if err := controller.StartAllController(); err != nil {
+ klog.Exitf("start controller failed with error: %s", err)
+ }
+}
diff --git a/cloud/pkg/taskmanager/util/controller/controller.go b/cloud/pkg/taskmanager/util/controller/controller.go
new file mode 100644
index 000000000..78259d3ab
--- /dev/null
+++ b/cloud/pkg/taskmanager/util/controller/controller.go
@@ -0,0 +1,169 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ "fmt"
+
+ v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ k8sinformer "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/util/manager"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
+ crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+)
+
+type Controller interface {
+ Name() string
+ Start() error
+ ReportNodeStatus(string, string, fsm.Event) (api.State, error)
+ ReportTaskStatus(string, fsm.Event) (api.State, error)
+ ValidateNode(util.TaskMessage) []v1.Node
+ GetNodeStatus(string) ([]v1alpha1.TaskStatus, error)
+ UpdateNodeStatus(string, []v1alpha1.TaskStatus) error
+ StageCompleted(taskID string, state api.State) bool
+}
+
+type BaseController struct {
+ name string
+ Informer k8sinformer.SharedInformerFactory
+ TaskManager *manager.TaskCache
+ MessageChan chan util.TaskMessage
+ KubeClient kubernetes.Interface
+ CrdClient crdClientset.Interface
+}
+
+func (bc *BaseController) Name() string {
+ return bc.name
+}
+
+func (bc *BaseController) Start() error {
+ return fmt.Errorf("controller not implemented")
+}
+
+func (bc *BaseController) StageCompleted(taskID string, state api.State) bool {
+ return false
+}
+
+func (bc *BaseController) ValidateNode(taskMessage util.TaskMessage) []v1.Node {
+ var validateNodes []v1.Node
+ nodes, err := bc.getNodeList(taskMessage.NodeNames, taskMessage.LabelSelector)
+ if err != nil {
+ klog.Warningf("get node list error: %s", err.Error())
+ return nil
+ }
+ for _, node := range nodes {
+ if !util.IsEdgeNode(node) {
+ klog.Warningf("Node(%s) is not edge node", node.Name)
+ continue
+ }
+ ready := isNodeReady(node)
+ if !ready {
+ continue
+ }
+ validateNodes = append(validateNodes, *node)
+ }
+ return validateNodes
+}
+
+func (bc *BaseController) GetNodeStatus(name string) ([]v1alpha1.TaskStatus, error) {
+ return nil, fmt.Errorf("function GetNodeStatus need to be init")
+}
+
+func (bc *BaseController) UpdateNodeStatus(name string, status []v1alpha1.TaskStatus) error {
+ return fmt.Errorf("function UpdateNodeStatus need to be init")
+}
+
+func isNodeReady(node *v1.Node) bool {
+ for _, condition := range node.Status.Conditions {
+ if condition.Type == v1.NodeReady && condition.Status != v1.ConditionTrue {
+ klog.Warningf("Node(%s) is in NotReady state", node.Name)
+ return false
+ }
+ }
+ return true
+}
+
+func (bc *BaseController) ReportNodeStatus(taskID string, nodeID string, event fsm.Event) (api.State, error) {
+ return "", fmt.Errorf("function ReportNodeStatus need to be init")
+}
+
+func (bc *BaseController) ReportTaskStatus(taskID string, event fsm.Event) (api.State, error) {
+ return "", fmt.Errorf("function ReportTaskStatus need to be init")
+}
+
+var (
+ controllers = map[string]Controller{}
+)
+
+func Register(name string, controller Controller) {
+ if _, ok := controllers[name]; ok {
+ klog.Warning("controller %s exists ", name)
+ }
+ controllers[name] = controller
+}
+
+func StartAllController() error {
+ for name, controller := range controllers {
+ err := controller.Start()
+ if err != nil {
+ return fmt.Errorf("start %s controller failed: %s", name, err.Error())
+ }
+ }
+ return nil
+}
+
+func GetController(name string) (Controller, error) {
+ controller, ok := controllers[name]
+ if !ok {
+ return nil, fmt.Errorf("controller %s is not registered", name)
+ }
+ return controller, nil
+}
+
+func (bc *BaseController) getNodeList(nodeNames []string, labelSelector *metav1.LabelSelector) ([]*v1.Node, error) {
+ var nodesToUpgrade []*v1.Node
+
+ if len(nodeNames) != 0 {
+ for _, name := range nodeNames {
+ node, err := bc.Informer.Core().V1().Nodes().Lister().Get(name)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get node with name %s: %v", name, err)
+ }
+ nodesToUpgrade = append(nodesToUpgrade, node)
+ }
+ } else if labelSelector != nil {
+ selector, err := metav1.LabelSelectorAsSelector(labelSelector)
+ if err != nil {
+ return nil, fmt.Errorf("labelSelector(%s) is not valid: %v", labelSelector, err)
+ }
+
+ nodes, err := bc.Informer.Core().V1().Nodes().Lister().List(selector)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get nodes with label %s: %v", selector.String(), err)
+ }
+ nodesToUpgrade = nodes
+ }
+
+ return nodesToUpgrade, nil
+}
diff --git a/cloud/pkg/nodeupgradejobcontroller/manager/common.go b/cloud/pkg/taskmanager/util/manager/common.go
index 6e8008abf..59200daa3 100644
--- a/cloud/pkg/nodeupgradejobcontroller/manager/common.go
+++ b/cloud/pkg/taskmanager/util/manager/common.go
@@ -1,5 +1,5 @@
/*
-Copyright 2022 The KubeEdge Authors.
+Copyright 2023 The KubeEdge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
diff --git a/cloud/pkg/nodeupgradejobcontroller/manager/nodeupgrade.go b/cloud/pkg/taskmanager/util/manager/task_cache.go
index 7cdb78e1a..9614c7732 100644
--- a/cloud/pkg/nodeupgradejobcontroller/manager/nodeupgrade.go
+++ b/cloud/pkg/taskmanager/util/manager/task_cache.go
@@ -1,5 +1,5 @@
/*
-Copyright 2022 The KubeEdge Authors.
+Copyright 2023 The KubeEdge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -22,31 +22,31 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
- "github.com/kubeedge/kubeedge/cloud/pkg/nodeupgradejobcontroller/config"
+ "github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/config"
)
-// NodeUpgradeJobManager is a manager watch NodeUpgradeJob change event
-type NodeUpgradeJobManager struct {
+// TaskCache is a manager watch CRD change event
+type TaskCache struct {
// events from watch kubernetes api server
events chan watch.Event
- // UpgradeMap, key is NodeUpgradeJob.Name, value is *v1alpha1.NodeUpgradeJob{}
- UpgradeMap sync.Map
+ // CacheMap, key is NodeUpgradeJob.Name, value is *v1alpha1.NodeUpgradeJob{}
+ CacheMap sync.Map
}
// Events return a channel, can receive all NodeUpgradeJob event
-func (dmm *NodeUpgradeJobManager) Events() chan watch.Event {
+func (dmm *TaskCache) Events() chan watch.Event {
return dmm.events
}
-// NewNodeUpgradeJobManager create NodeUpgradeJobManager from config
-func NewNodeUpgradeJobManager(si cache.SharedIndexInformer) (*NodeUpgradeJobManager, error) {
- events := make(chan watch.Event, config.Config.Buffer.NodeUpgradeJobEvent)
+// NewTaskCache create TaskCache from config
+func NewTaskCache(si cache.SharedIndexInformer) (*TaskCache, error) {
+ events := make(chan watch.Event, config.Config.Buffer.TaskEvent)
rh := NewCommonResourceEventHandler(events)
_, err := si.AddEventHandler(rh)
if err != nil {
return nil, err
}
- return &NodeUpgradeJobManager{events: events}, nil
+ return &TaskCache{events: events}, nil
}
diff --git a/cloud/pkg/taskmanager/util/util.go b/cloud/pkg/taskmanager/util/util.go
new file mode 100644
index 000000000..bf3aea571
--- /dev/null
+++ b/cloud/pkg/taskmanager/util/util.go
@@ -0,0 +1,181 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package util
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/distribution/distribution/v3/reference"
+ metav1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/apis/meta/v1"
+ versionutil "k8s.io/apimachinery/pkg/util/version"
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/kubeedge/common/constants"
+ "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
+)
+
+const (
+ NodeUpgradeJobStatusKey = "nodeupgradejob.operations.kubeedge.io/status"
+ NodeUpgradeJobStatusValue = ""
+ NodeUpgradeHistoryKey = "nodeupgradejob.operations.kubeedge.io/history"
+)
+
+const (
+ TaskUpgrade = "upgrade"
+ TaskRollback = "rollback"
+ TaskBackup = "backup"
+ TaskPrePull = "prepull"
+
+ ISO8601UTC = "2006-01-02T15:04:05Z"
+)
+
+type TaskMessage struct {
+ Type string
+ Name string
+ TimeOutSeconds *uint32
+ ShutDown bool
+ CheckItem []string
+ Concurrency int32
+ FailureTolerate float64
+ NodeNames []string
+ LabelSelector *v1.LabelSelector
+ Status v1alpha1.TaskStatus
+ Msg interface{}
+}
+
+// FilterVersion returns true only if the edge node version already on the upgrade req
+// version is like: v1.22.6-kubeedge-v1.10.0-beta.0.185+95378fb019912a, expected is like v1.10.0
+func FilterVersion(version string, expected string) bool {
+ // if not correct version format, also return true
+ strs := strings.Split(version, "-")
+ if len(strs) < 3 {
+ klog.Warningf("version format should be {k8s version}-kubeedge-{edgecore version}, but got : %s", version)
+ return true
+ }
+
+ // filter nodes that already in the required version
+ less, err := VersionLess(strs[2], expected)
+ if err != nil {
+ klog.Warningf("version filter failed: %s", err.Error())
+ less = false
+ }
+ return !less
+}
+
+// IsEdgeNode checks whether a node is an Edge Node
+// only if label {"node-role.kubernetes.io/edge": ""} exists, it is an edge node
+func IsEdgeNode(node *metav1.Node) bool {
+ if node.Labels == nil {
+ return false
+ }
+ if _, ok := node.Labels[constants.EdgeNodeRoleKey]; !ok {
+ return false
+ }
+
+ if node.Labels[constants.EdgeNodeRoleKey] != constants.EdgeNodeRoleValue {
+ return false
+ }
+
+ return true
+}
+
+// RemoveDuplicateElement deduplicate
+func RemoveDuplicateElement(s []string) []string {
+ result := make([]string, 0, len(s))
+ temp := make(map[string]struct{}, len(s))
+
+ for _, item := range s {
+ if _, ok := temp[item]; !ok {
+ temp[item] = struct{}{}
+ result = append(result, item)
+ }
+ }
+
+ return result
+}
+
+// MergeAnnotationUpgradeHistory constructs the new history based on the origin history
+// and we'll only keep 3 records
+func MergeAnnotationUpgradeHistory(origin, fromVersion, toVersion string) string {
+ newHistory := fmt.Sprintf("%s->%s", fromVersion, toVersion)
+ if origin == "" {
+ return newHistory
+ }
+
+ sets := strings.Split(origin, ";")
+ if len(sets) > 2 {
+ sets = sets[1:]
+ }
+
+ sets = append(sets, newHistory)
+ return strings.Join(sets, ";")
+}
+
+// GetImageRepo gets repo from a container image
+func GetImageRepo(image string) (string, error) {
+ named, err := reference.ParseNormalizedNamed(image)
+ if err != nil {
+ return "", fmt.Errorf("failed to parse image name: %v", err)
+ }
+
+ return named.Name(), nil
+}
+
+func GetNodeName(resource string) string {
+ // task/${TaskID}/node/${NodeID}
+ s := strings.Split(resource, "/")
+ return s[3]
+}
+func GetTaskID(resource string) string {
+ // task/${TaskID}/node/${NodeID}
+ s := strings.Split(resource, "/")
+ return s[1]
+}
+
+func VersionLess(version1, version2 string) (bool, error) {
+ less := false
+ ver1, err := versionutil.ParseGeneric(version1)
+ if err != nil {
+ return less, fmt.Errorf("version1 error: %v", err)
+ }
+ ver2, err := versionutil.ParseGeneric(version2)
+ if err != nil {
+ return less, fmt.Errorf("version2 error: %v", err)
+ }
+ // If the remote Major version is bigger or if the Major versions are the same,
+ // but the remote Minor is bigger use the client version release. This handles Major bumps too.
+ if ver1.Major() < ver2.Major() ||
+ (ver1.Major() == ver2.Major()) && ver1.Minor() < ver2.Minor() ||
+ (ver1.Major() == ver2.Major() && ver1.Minor() == ver2.Minor()) && ver1.Patch() < ver2.Patch() {
+ less = true
+ }
+ return less, nil
+}
+
+func NodeUpdated(old, new v1alpha1.TaskStatus) bool {
+ if old.NodeName != new.NodeName {
+ klog.V(4).Infof("old node %s and new node %s is not same", old.NodeName, new.NodeName)
+ return false
+ }
+ if old.State == new.State || new.State == "" {
+ klog.V(4).Infof("node %s state is not change", old.NodeName)
+ return false
+ }
+ return true
+}
diff --git a/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go b/cloud/pkg/taskmanager/util/util_test.go
index 499da62fa..9a13e2295 100644
--- a/cloud/pkg/nodeupgradejobcontroller/controller/util_test.go
+++ b/cloud/pkg/taskmanager/util/util_test.go
@@ -1,5 +1,5 @@
/*
-Copyright 2022 The KubeEdge Authors.
+Copyright 2023 The KubeEdge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -14,13 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package controller
+package util
import (
"reflect"
"testing"
-
- "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
)
func TestFilterVersion(t *testing.T) {
@@ -40,13 +38,13 @@ func TestFilterVersion(t *testing.T) {
name: "not match expected version",
version: "v1.22.6-kubeedge-v1.10.0-beta.0.194+77ea462f402efb",
expected: "v1.10.0",
- expectResult: false,
+ expectResult: true,
},
{
name: "no right format version",
version: "v1.22.6",
expected: "v1.10.0",
- expectResult: false,
+ expectResult: true,
},
{
name: "match expected version",
@@ -58,7 +56,7 @@ func TestFilterVersion(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- result := filterVersion(test.version, test.expected)
+ result := FilterVersion(test.version, test.expected)
if result != test.expectResult {
t.Errorf("Got = %v, Want = %v", result, test.expectResult)
}
@@ -66,84 +64,34 @@ func TestFilterVersion(t *testing.T) {
}
}
-func TestUpdateUpgradeStatus(t *testing.T) {
- upgrade := v1alpha1.NodeUpgradeJob{
- Status: v1alpha1.NodeUpgradeJobStatus{
- Status: []v1alpha1.UpgradeStatus{
- {
- NodeName: "edge-node",
- State: v1alpha1.Completed,
- History: v1alpha1.History{
- Reason: "the first upgrade",
- UpgradeTime: "2023-09-22T17:33:00Z",
- },
- },
- },
- },
- }
- upgrade2 := upgrade.DeepCopy()
- upgrade2.Status.Status[0].History.Reason = "the second upgrade"
-
- upgrade3 := upgrade.DeepCopy()
- upgrade3.Status.Status = append(upgrade3.Status.Status, v1alpha1.UpgradeStatus{
- NodeName: "edge-node2",
- State: v1alpha1.Completed,
- History: v1alpha1.History{
- Reason: "the first upgrade",
- UpgradeTime: "2023-09-22T17:35:00Z",
- },
- })
-
+func TestRemoveDuplicateElement(t *testing.T) {
tests := []struct {
name string
- upgrade *v1alpha1.NodeUpgradeJob
- status *v1alpha1.UpgradeStatus
- expected *v1alpha1.NodeUpgradeJob
+ input []string
+ expected []string
}{
{
- name: "case1: first add one node",
- upgrade: &v1alpha1.NodeUpgradeJob{},
- status: &v1alpha1.UpgradeStatus{
- NodeName: "edge-node",
- State: v1alpha1.Completed,
- History: v1alpha1.History{
- Reason: "the first upgrade",
- UpgradeTime: "2023-09-22T17:33:00Z",
- },
- },
- expected: upgrade.DeepCopy(),
+ name: "case 1",
+ input: []string{"a", "b", "c"},
+ expected: []string{"a", "b", "c"},
},
{
- name: "case2: add to one NOT exist node record",
- upgrade: upgrade.DeepCopy(),
- status: &v1alpha1.UpgradeStatus{
- NodeName: "edge-node2",
- State: v1alpha1.Completed,
- History: v1alpha1.History{
- Reason: "the first upgrade",
- UpgradeTime: "2023-09-22T17:35:00Z",
- },
- },
- expected: upgrade3,
+ name: "case 2",
+ input: []string{"a", "a", "b", "c", "b", "a", "a"},
+ expected: []string{"a", "b", "c"},
},
{
- name: "case3: add to one exist node record",
- upgrade: upgrade.DeepCopy(),
- status: &v1alpha1.UpgradeStatus{
- NodeName: "edge-node",
- State: v1alpha1.Completed,
- History: v1alpha1.History{
- Reason: "the second upgrade",
- },
- },
- expected: upgrade2,
+ name: "case 3",
+ input: []string{},
+ expected: []string{},
},
}
+
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- newValue := UpdateNodeUpgradeJobStatus(test.upgrade, test.status)
- if !reflect.DeepEqual(newValue, test.expected) {
- t.Errorf("Got = %v, Want = %v", newValue, test.expected)
+ result := RemoveDuplicateElement(test.input)
+ if !reflect.DeepEqual(result, test.expected) {
+ t.Errorf("Got = %v, Want = %v", result, test.expected)
}
})
}
@@ -182,7 +130,7 @@ func TestMergeAnnotationUpgradeHistory(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- result := mergeAnnotationUpgradeHistory(test.origin, test.fromVersion, test.toVersion)
+ result := MergeAnnotationUpgradeHistory(test.origin, test.fromVersion, test.toVersion)
if result != test.expected {
t.Errorf("Got = %v, Want = %v", result, test.expected)
}
diff --git a/common/constants/default.go b/common/constants/default.go
index fab460eee..6b1e5c9da 100644
--- a/common/constants/default.go
+++ b/common/constants/default.go
@@ -28,6 +28,7 @@ const (
DefaultCAURL = "/ca.crt"
DefaultCertURL = "/edge.crt"
DefaultNodeUpgradeURL = "/nodeupgrade"
+ DefaultTaskStateReportURL = "/task/{taskType}/name/{taskID}/node/{nodeID}/status"
DefaultServiceAccountIssuer = "https://kubernetes.default.svc.cluster.local"
// Edged
@@ -112,7 +113,7 @@ const (
DefaultDeviceModelEventBuffer = 1
DefaultUpdateDeviceStatusWorkers = 1
- // NodeUpgradeJobController
+ // TaskManager
DefaultNodeUpgradeJobStatusBuffer = 1024
DefaultNodeUpgradeJobEventBuffer = 1
DefaultNodeUpgradeJobWorkers = 1
diff --git a/common/types/types.go b/common/types/types.go
index e03667a1b..dfe14dad1 100644
--- a/common/types/types.go
+++ b/common/types/types.go
@@ -6,6 +6,7 @@ import (
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
"github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
)
@@ -50,6 +51,38 @@ type NodeUpgradeJobResponse struct {
Reason string
}
+// NodePreCheckRequest is pre-check msg coming from cloud to edge
+type NodePreCheckRequest struct {
+ CheckItem []string
+}
+
+type NodeTaskRequest struct {
+ TaskID string
+ Type string
+ State string
+ Item interface{}
+}
+
+type NodeTaskResponse struct {
+ // NodeName is the name of edge node.
+ NodeName string
+ // State represents for the upgrade state phase of the edge node.
+ // There are several possible state values: "", Upgrading, BackingUp, RollingBack and Checking.
+ State api.State
+ // Event represents for the event of the ImagePrePullJob.
+ // There are three possible event values: Init, Check, Pull.
+ Event string
+ // Action represents for the action of the ImagePrePullJob.
+ // There are three possible action values: Success, Failure, TimeOut.
+ Action api.Action
+ // Reason represents for the reason of the ImagePrePullJob.
+ Reason string
+ // Time represents for the running time of the ImagePrePullJob.
+ Time string
+
+ ExternalMessage string
+}
+
// ObjectResp is the object that api-server response
type ObjectResp struct {
Object metaV1.Object
@@ -68,7 +101,7 @@ type ImagePrePullJobRequest struct {
// ImagePrePullJobResponse is used to report status msg to cloudhub https service from each node
type ImagePrePullJobResponse struct {
NodeName string
- State v1alpha1.PrePullState
+ State string
Reason string
ImageStatus []v1alpha1.ImageStatus
}
diff --git a/edge/pkg/common/util/util.go b/edge/pkg/common/util/util.go
new file mode 100644
index 000000000..ec978c191
--- /dev/null
+++ b/edge/pkg/common/util/util.go
@@ -0,0 +1,32 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package util
+
+import (
+ "fmt"
+
+ beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
+ "github.com/kubeedge/beehive/pkg/core/model"
+ "github.com/kubeedge/kubeedge/common/types"
+ "github.com/kubeedge/kubeedge/edge/pkg/common/modules"
+)
+
+func ReportTaskResult(taskType, taskID string, resp types.NodeTaskResponse) {
+ msg := model.NewMessage("").SetRoute(modules.EdgeHubModuleName, modules.HubGroup).
+ SetResourceOperation(fmt.Sprintf("task/%s/node/%s", taskID, resp.NodeName), taskType).FillBody(resp)
+ beehiveContext.Send(modules.EdgeHubModuleName, *msg)
+}
diff --git a/edge/pkg/edgehub/edgehub.go b/edge/pkg/edgehub/edgehub.go
index 2498d07f2..bf63f1f9e 100644
--- a/edge/pkg/edgehub/edgehub.go
+++ b/edge/pkg/edgehub/edgehub.go
@@ -13,8 +13,8 @@ import (
"github.com/kubeedge/kubeedge/edge/pkg/edgehub/certificate"
"github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients"
"github.com/kubeedge/kubeedge/edge/pkg/edgehub/config"
- // register Upgrade handler
- _ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/upgrade"
+ // register Task handler
+ _ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/task"
"github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha2"
)
diff --git a/edge/pkg/edgehub/task/task_handler.go b/edge/pkg/edgehub/task/task_handler.go
new file mode 100644
index 000000000..ac42d64bb
--- /dev/null
+++ b/edge/pkg/edgehub/task/task_handler.go
@@ -0,0 +1,78 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package task
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "github.com/kubeedge/beehive/pkg/core/model"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
+ commontypes "github.com/kubeedge/kubeedge/common/types"
+ "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options"
+ "github.com/kubeedge/kubeedge/edge/pkg/common/util"
+ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients"
+ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/common/msghandler"
+ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/task/taskexecutor"
+)
+
+func init() {
+ handler := &taskHandler{}
+ msghandler.RegisterHandler(handler)
+}
+
+type taskHandler struct{}
+
+func (th *taskHandler) Filter(message *model.Message) bool {
+ name := message.GetGroup()
+ return name == modules.TaskManagerModuleName
+}
+
+func (th *taskHandler) Process(message *model.Message, clientHub clients.Adapter) error {
+ taskReq := &commontypes.NodeTaskRequest{}
+ data, err := message.GetContentData()
+ if err != nil {
+ return fmt.Errorf("failed to get content data: %v", err)
+ }
+ err = json.Unmarshal(data, taskReq)
+ if err != nil {
+ return fmt.Errorf("unmarshal failed: %v", err)
+ }
+ executor, err := taskexecutor.GetExecutor(taskReq.Type)
+ if err != nil {
+ return err
+ }
+ event, err := executor.Do(*taskReq)
+ if err != nil {
+ return err
+ }
+
+ // use external tool like keadm
+ //Or the task needs to use the goroutine to control the message reply itself.
+ if event.Action == "" {
+ return nil
+ }
+
+ resp := commontypes.NodeTaskResponse{
+ NodeName: options.GetEdgeCoreConfig().Modules.Edged.HostnameOverride,
+ Event: event.Type,
+ Action: event.Action,
+ Reason: event.Msg,
+ }
+ util.ReportTaskResult(taskReq.Type, taskReq.TaskID, resp)
+ return nil
+}
diff --git a/edge/pkg/edgehub/task/taskexecutor/image_prepull.go b/edge/pkg/edgehub/task/taskexecutor/image_prepull.go
new file mode 100644
index 000000000..a2b19c8e7
--- /dev/null
+++ b/edge/pkg/edgehub/task/taskexecutor/image_prepull.go
@@ -0,0 +1,184 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package taskexecutor
+
+import (
+ "encoding/json"
+ "fmt"
+ "strings"
+
+ v1 "k8s.io/api/core/v1"
+ runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/kubeedge/common/constants"
+ "github.com/kubeedge/kubeedge/common/types"
+ commontypes "github.com/kubeedge/kubeedge/common/types"
+ "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options"
+ edgeutil "github.com/kubeedge/kubeedge/edge/pkg/common/util"
+ metaclient "github.com/kubeedge/kubeedge/edge/pkg/metamanager/client"
+ "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+)
+
+const (
+ TaskPrePull = "prepull"
+)
+
+type PrePull struct {
+ *BaseExecutor
+}
+
+func (p *PrePull) Name() string {
+ return p.name
+}
+
+func NewPrePullExecutor() Executor {
+ methods := map[string]func(types.NodeTaskRequest) fsm.Event{
+ string(api.TaskChecking): preCheck,
+ string(api.TaskInit): emptyInit,
+ "": emptyInit,
+ string(api.PullingState): pullImages,
+ }
+ return &PrePull{
+ BaseExecutor: NewBaseExecutor(TaskPrePull, methods),
+ }
+}
+
+func pullImages(taskReq types.NodeTaskRequest) fsm.Event {
+ event := fsm.Event{
+ Type: "Pull",
+ Action: api.ActionSuccess,
+ }
+
+ // get edgecore config
+ edgeCoreConfig := options.GetEdgeCoreConfig()
+ if edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint == "" {
+ edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint = edgeCoreConfig.Modules.Edged.RemoteRuntimeEndpoint
+ }
+
+ // parse message request
+ prePullReq, err := getImagePrePullJobRequest(taskReq)
+ if err != nil {
+ event.Msg = err.Error()
+ event.Action = api.ActionFailure
+ return event
+ }
+
+ // pull images
+ container, err := util.NewContainerRuntime(edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint, edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.CgroupDriver)
+ if err != nil {
+ event.Msg = err.Error()
+ event.Action = api.ActionFailure
+ return event
+ }
+
+ go func() {
+ errorStr, imageStatus := prePullImages(*prePullReq, container)
+ if errorStr != "" {
+ event.Action = api.ActionFailure
+ event.Msg = errorStr
+ }
+
+ data, err := json.Marshal(imageStatus)
+ if err != nil {
+ klog.Warningf("marshal imageStatus failed: %v", err)
+ }
+ resp := commontypes.NodeTaskResponse{
+ NodeName: edgeCoreConfig.Modules.Edged.HostnameOverride,
+ Event: event.Type,
+ Action: event.Action,
+ Reason: event.Msg,
+ ExternalMessage: string(data),
+ }
+ edgeutil.ReportTaskResult(taskReq.Type, taskReq.TaskID, resp)
+ }()
+ return fsm.Event{}
+}
+
+func getImagePrePullJobRequest(taskReq commontypes.NodeTaskRequest) (*commontypes.ImagePrePullJobRequest, error) {
+ var prePullReq commontypes.ImagePrePullJobRequest
+ data, err := json.Marshal(taskReq.Item)
+ if err != nil {
+ return nil, err
+ }
+ err = json.Unmarshal(data, &prePullReq)
+ if err != nil {
+ return nil, err
+ }
+ return &prePullReq, err
+}
+
+func prePullImages(prePullReq commontypes.ImagePrePullJobRequest, container util.ContainerRuntime) (string, []v1alpha1.ImageStatus) {
+ errorStr := ""
+ authConfig, err := makeAuthConfig(prePullReq.Secret)
+ if err != nil {
+ return errorStr, []v1alpha1.ImageStatus{}
+ }
+
+ var imageStatus []v1alpha1.ImageStatus
+ for _, image := range prePullReq.Images {
+ prePullStatus := v1alpha1.ImageStatus{
+ Image: image,
+ }
+ for i := 0; i < int(prePullReq.RetryTimes); i++ {
+ err = container.PullImage(image, authConfig, nil)
+ if err == nil {
+ break
+ }
+ }
+ if err != nil {
+ klog.Errorf("pull image %s failed, err: %v", image, err)
+ errorStr = fmt.Sprintf("pull image failed, err: %v", err)
+ prePullStatus.State = api.TaskFailed
+ prePullStatus.Reason = err.Error()
+ } else {
+ klog.Infof("pull image %s successfully!", image)
+ prePullStatus.State = api.TaskSuccessful
+ }
+ imageStatus = append(imageStatus, prePullStatus)
+ }
+
+ return errorStr, imageStatus
+}
+
+func makeAuthConfig(pullsecret string) (*runtimeapi.AuthConfig, error) {
+ if pullsecret == "" {
+ return nil, nil
+ }
+
+ secretSli := strings.Split(pullsecret, constants.ResourceSep)
+ if len(secretSli) != 2 {
+ return nil, fmt.Errorf("pull secret format is not correct")
+ }
+
+ client := metaclient.New()
+ secret, err := client.Secrets(secretSli[0]).Get(secretSli[1])
+ if err != nil {
+ return nil, fmt.Errorf("get secret %s failed, %v", secretSli[1], err)
+ }
+
+ var auth runtimeapi.AuthConfig
+ err = json.Unmarshal(secret.Data[v1.DockerConfigJsonKey], &auth)
+ if err != nil {
+ return nil, fmt.Errorf("unmarshal secret %s to auth file failed, %v", secretSli[1], err)
+ }
+
+ return &auth, nil
+}
diff --git a/edge/pkg/edgehub/task/taskexecutor/node_backup.go b/edge/pkg/edgehub/task/taskexecutor/node_backup.go
new file mode 100644
index 000000000..1d4583978
--- /dev/null
+++ b/edge/pkg/edgehub/task/taskexecutor/node_backup.go
@@ -0,0 +1,106 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package taskexecutor
+
+import (
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/kubeedge/common/constants"
+ commontypes "github.com/kubeedge/kubeedge/common/types"
+ "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options"
+ "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+ "github.com/kubeedge/kubeedge/pkg/version"
+)
+
+func backupNode(taskReq commontypes.NodeTaskRequest) (event fsm.Event) {
+ event = fsm.Event{
+ Type: "Backup",
+ Action: api.ActionSuccess,
+ }
+ var err error
+ defer func() {
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.Msg = err.Error()
+ }
+ }()
+ backupPath := filepath.Join(util.KubeEdgeBackupPath, version.Get().String())
+ err = backup(backupPath)
+ if err != nil {
+ cleanErr := os.Remove(backupPath)
+ if cleanErr != nil {
+ klog.Warningf("clean backup path failed: %s", err.Error())
+ }
+ return
+ }
+ return event
+}
+
+func backup(backupPath string) error {
+ config := options.GetEdgeCoreConfig()
+ klog.Infof("backup start, backup path: %s", backupPath)
+ if err := os.MkdirAll(backupPath, 0750); err != nil {
+ return fmt.Errorf("mkdirall failed: %v", err)
+ }
+
+ // backup edgecore.db: copy from origin path to backup path
+ if err := copy(config.DataBase.DataSource, filepath.Join(backupPath, "edgecore.db")); err != nil {
+ return fmt.Errorf("failed to backup db: %v", err)
+ }
+ // backup edgecore.yaml: copy from origin path to backup path
+ if err := copy(constants.DefaultConfigDir+"edgecore.yaml", filepath.Join(backupPath, "edgecore.yaml")); err != nil {
+ return fmt.Errorf("failed to back config: %v", err)
+ }
+ // backup edgecore: copy from origin path to backup path
+ if err := copy(filepath.Join(util.KubeEdgeUsrBinPath, util.KubeEdgeBinaryName), filepath.Join(backupPath, util.KubeEdgeBinaryName)); err != nil {
+ return fmt.Errorf("failed to backup edgecore: %v", err)
+ }
+ return nil
+}
+
+func copy(src, dst string) error {
+ sourceFileStat, err := os.Stat(src)
+ if err != nil {
+ return err
+ }
+
+ if !sourceFileStat.Mode().IsRegular() {
+ return fmt.Errorf("%s is not a regular file", src)
+ }
+
+ source, err := os.Open(src)
+ if err != nil {
+ return err
+ }
+ defer source.Close()
+
+ // copy file using src file mode
+ destination, err := os.OpenFile(dst, os.O_RDWR|os.O_CREATE|os.O_TRUNC, sourceFileStat.Mode())
+ if err != nil {
+ return err
+ }
+ defer destination.Close()
+ _, err = io.Copy(destination, source)
+ return err
+}
diff --git a/edge/pkg/edgehub/task/taskexecutor/node_rollback.go b/edge/pkg/edgehub/task/taskexecutor/node_rollback.go
new file mode 100644
index 000000000..85233dc45
--- /dev/null
+++ b/edge/pkg/edgehub/task/taskexecutor/node_rollback.go
@@ -0,0 +1,72 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package taskexecutor
+
+import (
+ "fmt"
+ "os/exec"
+
+ "k8s.io/klog/v2"
+
+ commontypes "github.com/kubeedge/kubeedge/common/types"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+ "github.com/kubeedge/kubeedge/pkg/version"
+)
+
+func rollbackNode(taskReq commontypes.NodeTaskRequest) (event fsm.Event) {
+ event = fsm.Event{
+ Type: "Rollback",
+ Action: api.ActionSuccess,
+ }
+ var err error
+ defer func() {
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.Msg = err.Error()
+ }
+ }()
+
+ var upgradeReq *commontypes.NodeUpgradeJobRequest
+ upgradeReq, err = getTaskRequest(taskReq)
+ if err != nil {
+ return
+ }
+
+ err = rollback(upgradeReq)
+ if err != nil {
+ return
+ }
+ return event
+}
+
+func rollback(upgradeReq *commontypes.NodeUpgradeJobRequest) error {
+ klog.Infof("Begin to run rollback command")
+ rollBackCmd := fmt.Sprintf("keadm rollback edge --name %s --history %s >> /tmp/keadm.log 2>&1",
+ upgradeReq.UpgradeID, version.Get())
+
+ // run upgrade cmd to upgrade edge node
+ // use nohup command to start a child progress
+ command := fmt.Sprintf("nohup %s &", rollBackCmd)
+ cmd := exec.Command("bash", "-c", command)
+ s, err := cmd.CombinedOutput()
+ if err != nil {
+ return fmt.Errorf("run rollback command %s failed: %v, %s", command, err, s)
+ }
+ klog.Infof("!!! Finish rollback ")
+ return nil
+}
diff --git a/edge/pkg/edgehub/upgrade/upgrade.go b/edge/pkg/edgehub/task/taskexecutor/node_upgrade.go
index e26cd50ca..8546ab96d 100644
--- a/edge/pkg/edgehub/upgrade/upgrade.go
+++ b/edge/pkg/edgehub/task/taskexecutor/node_upgrade.go
@@ -1,5 +1,5 @@
/*
-Copyright 2022 The KubeEdge Authors.
+Copyright 2023 The KubeEdge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -14,100 +14,137 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package upgrade
+package taskexecutor
import (
"encoding/json"
"fmt"
"os/exec"
"path/filepath"
- "strings"
- "sync"
"k8s.io/klog/v2"
- "github.com/kubeedge/beehive/pkg/core/model"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
+ "github.com/kubeedge/kubeedge/common/types"
commontypes "github.com/kubeedge/kubeedge/common/types"
"github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options"
- "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients"
- "github.com/kubeedge/kubeedge/edge/pkg/edgehub/common/msghandler"
"github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
"github.com/kubeedge/kubeedge/pkg/version"
)
-func init() {
- handler := &upgradeHandler{}
- msghandler.RegisterHandler(handler)
+const (
+ TaskUpgrade = "upgrade"
+)
- // register upgrade tool: keadm
- // if not specify it, also use default upgrade tool: keadm
- RegisterUpgradeProvider("", &keadmUpgrade{})
- RegisterUpgradeProvider("keadm", &keadmUpgrade{})
+type Upgrade struct {
+ *BaseExecutor
}
-type upgradeHandler struct{}
-
-func (uh *upgradeHandler) Filter(message *model.Message) bool {
- name := message.GetGroup()
- return name == modules.NodeUpgradeJobControllerModuleGroup
+func (u *Upgrade) Name() string {
+ return u.name
}
-func (uh *upgradeHandler) Process(message *model.Message, clientHub clients.Adapter) error {
- upgradeReq := &commontypes.NodeUpgradeJobRequest{}
- data, err := message.GetContentData()
- if err != nil {
- return fmt.Errorf("failed to get content data: %v", err)
+func NewUpgradeExecutor() Executor {
+ methods := map[string]func(types.NodeTaskRequest) fsm.Event{
+ string(api.TaskChecking): preCheck,
+ string(api.TaskInit): initUpgrade,
+ "": initUpgrade,
+ string(api.BackingUpState): backupNode,
+ string(api.RollingBackState): rollbackNode,
+ string(api.UpgradingState): upgrade,
}
- err = json.Unmarshal(data, upgradeReq)
- if err != nil {
- return fmt.Errorf("unmarshal failed: %v", err)
+ return &Upgrade{
+ BaseExecutor: NewBaseExecutor(TaskUpgrade, methods),
}
+}
- err = validateUpgrade(upgradeReq)
- if err != nil {
- return fmt.Errorf("upgrade request is not valid: %v", err)
+func initUpgrade(taskReq types.NodeTaskRequest) (event fsm.Event) {
+ event = fsm.Event{
+ Type: "Init",
+ Action: api.ActionSuccess,
}
+ var err error
+ defer func() {
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.Msg = err.Error()
+ }
+ }()
- tool := strings.ToLower(upgradeReq.UpgradeTool)
- if _, ok := upgradeToolProviders[tool]; !ok {
- return fmt.Errorf("not supported upgrade tool type: %v", upgradeReq.UpgradeTool)
+ var upgradeReq *commontypes.NodeUpgradeJobRequest
+ upgradeReq, err = getTaskRequest(taskReq)
+ if err != nil {
+ return
}
- return upgradeToolProviders[tool].Upgrade(upgradeReq)
+ if upgradeReq.UpgradeID == "" {
+ err = fmt.Errorf("upgradeID cannot be empty")
+ return
+ }
+ if upgradeReq.Version == version.Get().String() {
+ return fsm.Event{
+ Type: "Upgrading",
+ Action: api.ActionSuccess,
+ }
+ }
+ err = prepareKeadm(upgradeReq)
+ if err != nil {
+ return
+ }
+ return event
}
-func validateUpgrade(upgrade *commontypes.NodeUpgradeJobRequest) error {
- if upgrade.UpgradeID == "" {
- return fmt.Errorf("upgradeID cannot be empty")
+func getTaskRequest(taskReq commontypes.NodeTaskRequest) (*commontypes.NodeUpgradeJobRequest, error) {
+ data, err := json.Marshal(taskReq.Item)
+ if err != nil {
+ return nil, err
}
- if upgrade.Version == version.Get().String() {
- return fmt.Errorf("edge node already on specific version, no need to upgrade")
+ var upgradeReq commontypes.NodeUpgradeJobRequest
+ err = json.Unmarshal(data, &upgradeReq)
+ if err != nil {
+ return nil, err
}
-
- return nil
+ return &upgradeReq, err
}
-type Provider interface {
- Upgrade(upgrade *commontypes.NodeUpgradeJobRequest) error
+func upgrade(taskReq types.NodeTaskRequest) (event fsm.Event) {
+ opts := options.GetEdgeCoreOptions()
+ event = fsm.Event{
+ Type: "Upgrade",
+ }
+ upgradeReq, err := getTaskRequest(taskReq)
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.Msg = err.Error()
+ return
+ }
+ err = keadmUpgrade(*upgradeReq, opts)
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.Msg = err.Error()
+ }
+ return
}
-var (
- upgradeToolProviders = make(map[string]Provider)
- mutex sync.Mutex
-)
+func keadmUpgrade(upgradeReq commontypes.NodeUpgradeJobRequest, opts *options.EdgeCoreOptions) error {
+ klog.Infof("Begin to run upgrade command")
+ upgradeCmd := fmt.Sprintf("keadm upgrade --upgradeID %s --historyID %s --fromVersion %s --toVersion %s --config %s --image %s > /tmp/keadm.log 2>&1",
+ upgradeReq.UpgradeID, upgradeReq.HistoryID, version.Get(), upgradeReq.Version, opts.ConfigFile, upgradeReq.Image)
-func RegisterUpgradeProvider(upgradeToolType string, provider Provider) {
- mutex.Lock()
- defer mutex.Unlock()
- upgradeToolProviders[upgradeToolType] = provider
+ // run upgrade cmd to upgrade edge node
+ // use nohup command to start a child progress
+ command := fmt.Sprintf("nohup %s &", upgradeCmd)
+ cmd := exec.Command("bash", "-c", command)
+ s, err := cmd.CombinedOutput()
+ if err != nil {
+ return fmt.Errorf("run upgrade command %s failed: %v, %s", command, err, s)
+ }
+ klog.Infof("!!! Finish upgrade from Version %s to %s ...", version.Get(), upgradeReq.Version)
+ return nil
}
-type keadmUpgrade struct{}
-
-func (*keadmUpgrade) Upgrade(upgradeReq *commontypes.NodeUpgradeJobRequest) error {
- // get edgecore start options and config
- opts := options.GetEdgeCoreOptions()
+func prepareKeadm(upgradeReq *commontypes.NodeUpgradeJobRequest) error {
config := options.GetEdgeCoreConfig()
// install the requested installer keadm from docker image
@@ -119,7 +156,6 @@ func (*keadmUpgrade) Upgrade(upgradeReq *commontypes.NodeUpgradeJobRequest) erro
if err != nil {
return fmt.Errorf("failed to new container runtime: %v", err)
}
-
image := upgradeReq.Image
// TODO: do some verification 1.sha256(pass in using CRD) 2.image signature verification
@@ -135,22 +171,5 @@ func (*keadmUpgrade) Upgrade(upgradeReq *commontypes.NodeUpgradeJobRequest) erro
if err != nil {
return fmt.Errorf("failed to cp file from image to host: %v", err)
}
-
- klog.Infof("Begin to run upgrade command")
- upgradeCmd := fmt.Sprintf("keadm upgrade --upgradeID %s --historyID %s --fromVersion %s --toVersion %s --config %s --image %s > /tmp/keadm.log 2>&1",
- upgradeReq.UpgradeID, upgradeReq.HistoryID, version.Get(), upgradeReq.Version, opts.ConfigFile, image)
-
- // run upgrade cmd to upgrade edge node
- // use nohup command to start a child progress
- command := fmt.Sprintf("nohup %s &", upgradeCmd)
- cmd := exec.Command("bash", "-c", command)
- s, err := cmd.CombinedOutput()
- if err != nil {
- klog.Errorf("run upgrade command %s failed: %v, %s", command, err, s)
- return fmt.Errorf("run upgrade command %s failed: %v, %s", command, err, s)
- }
-
- klog.Infof("!!! Begin to upgrade from Version %s to %s ...", version.Get(), upgradeReq.Version)
-
return nil
}
diff --git a/edge/pkg/edgehub/task/taskexecutor/pre_check.go b/edge/pkg/edgehub/task/taskexecutor/pre_check.go
new file mode 100644
index 000000000..da68a2077
--- /dev/null
+++ b/edge/pkg/edgehub/task/taskexecutor/pre_check.go
@@ -0,0 +1,158 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package taskexecutor
+
+import (
+ "encoding/json"
+ "fmt"
+ "time"
+
+ "github.com/shirou/gopsutil/cpu"
+ "github.com/shirou/gopsutil/disk"
+ "github.com/shirou/gopsutil/v3/mem"
+
+ "github.com/kubeedge/kubeedge/common/types"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+)
+
+const (
+ MaxCPUUsage float64 = 80
+ MaxMemUsage float64 = 80
+ MaxDiskUsage float64 = 80
+)
+
+func preCheck(taskReq types.NodeTaskRequest) fsm.Event {
+ event := fsm.Event{
+ Type: "Check",
+ Action: api.ActionSuccess,
+ Msg: "",
+ }
+
+ data, err := json.Marshal(taskReq.Item)
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.Msg = err.Error()
+ return event
+ }
+ var checkItems types.NodePreCheckRequest
+ err = json.Unmarshal(data, &checkItems)
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.Msg = err.Error()
+ return event
+ }
+
+ var failed bool
+ var checkResult = map[string]string{}
+ var checkFunc = map[string]func() error{
+ "cpu": checkCPU,
+ "mem": checkMem,
+ "disk": checkDisk,
+ }
+ for _, item := range checkItems.CheckItem {
+ f, ok := checkFunc[item]
+ if !ok {
+ checkResult[item] = "check item not support"
+ continue
+ }
+ err = f()
+ if err != nil {
+ failed = true
+ checkResult[item] = err.Error()
+ continue
+ }
+ checkResult[item] = "ok"
+ }
+ if !failed {
+ return event
+ }
+ event.Action = api.ActionFailure
+ result, err := json.Marshal(checkResult)
+ if err != nil {
+ event.Msg = err.Error()
+ return event
+ }
+ event.Msg = string(result)
+ return event
+}
+
+func checkCPU() error {
+ cpuUsage, err := cpu.Percent(100*time.Millisecond, false)
+ if err != nil {
+ return err
+ }
+ var usage float64
+ for _, percpu := range cpuUsage {
+ usage += percpu / float64(len(cpuUsage))
+ }
+ if usage > MaxCPUUsage {
+ return fmt.Errorf("current cpu usage is %f, which exceeds the maximum allowed usage %f", usage, MaxCPUUsage)
+ }
+ return nil
+}
+
+func checkMem() error {
+ memInfo, err := mem.VirtualMemory()
+ if err != nil {
+ return err
+ }
+ memUsedPercent := memInfo.UsedPercent
+ if memUsedPercent > MaxMemUsage {
+ return fmt.Errorf("current mem usage is %f, which exceeds the maximum allowed usage %f", memUsedPercent, MaxMemUsage)
+ }
+ return nil
+}
+
+func checkDisk() error {
+ partitions, err := disk.Partitions(true)
+ if err != nil {
+ return err
+ }
+ var failed bool
+ var diskUsages = map[string]string{}
+ for _, part := range partitions {
+ usage, err := disk.Usage(part.Mountpoint)
+ if err != nil {
+ failed = true
+ diskUsages[part.Device] = err.Error()
+ continue
+ }
+ if usage.UsedPercent > MaxDiskUsage {
+ failed = true
+ diskUsages[part.Device] = fmt.Sprintf("current disk usage is %f, which exceeds the maximum allowed usage %f", usage.UsedPercent, MaxMemUsage)
+ continue
+ }
+ diskUsages[part.Device] = fmt.Sprintf("%f", usage.UsedPercent)
+ }
+ if !failed {
+ return nil
+ }
+ result, err := json.Marshal(diskUsages)
+ if err != nil {
+ return err
+ }
+ return fmt.Errorf(string(result))
+}
+
+func normalInit(types.NodeTaskRequest) fsm.Event {
+ return fsm.Event{
+ Type: "Init",
+ Action: api.ActionSuccess,
+ Msg: "",
+ }
+}
diff --git a/edge/pkg/edgehub/task/taskexecutor/task_executor.go b/edge/pkg/edgehub/task/taskexecutor/task_executor.go
new file mode 100644
index 000000000..fd3be1f88
--- /dev/null
+++ b/edge/pkg/edgehub/task/taskexecutor/task_executor.go
@@ -0,0 +1,93 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package taskexecutor
+
+import (
+ "fmt"
+
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/kubeedge/common/types"
+ "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+)
+
+func init() {
+ Register(TaskUpgrade, NewUpgradeExecutor())
+ Register(TaskPrePull, NewPrePullExecutor())
+}
+
+type Executor interface {
+ Name() string
+ Do(types.NodeTaskRequest) (fsm.Event, error)
+}
+
+type BaseExecutor struct {
+ name string
+ methods map[string]func(types.NodeTaskRequest) fsm.Event
+}
+
+func (be *BaseExecutor) Name() string {
+ return be.name
+}
+
+func NewBaseExecutor(name string, methods map[string]func(types.NodeTaskRequest) fsm.Event) *BaseExecutor {
+ return &BaseExecutor{
+ name: name,
+ methods: methods,
+ }
+}
+
+func (be *BaseExecutor) Do(taskReq types.NodeTaskRequest) (fsm.Event, error) {
+ method, ok := be.methods[taskReq.State]
+ if !ok {
+ err := fmt.Errorf("method %s in executor %s is not implemented", taskReq.State, taskReq.Type)
+ klog.Warning(err.Error())
+ return fsm.Event{}, err
+ }
+ return method(taskReq), nil
+}
+
+var (
+ executors = make(map[string]Executor)
+ CommonMethods = map[string]func(types.NodeTaskRequest) fsm.Event{
+ string(v1alpha1.TaskChecking): preCheck,
+ string(v1alpha1.TaskInit): normalInit,
+ }
+)
+
+func Register(name string, executor Executor) {
+ if _, ok := executors[name]; ok {
+ klog.Warning("executor %s exists ", name)
+ }
+ executors[name] = executor
+}
+
+func GetExecutor(name string) (Executor, error) {
+ executor, ok := executors[name]
+ if !ok {
+ return nil, fmt.Errorf("executor %s is not registered", name)
+ }
+ return executor, nil
+}
+
+func emptyInit(_ types.NodeTaskRequest) (event fsm.Event) {
+ return fsm.Event{
+ Type: "Init",
+ Action: v1alpha1.ActionSuccess,
+ }
+}
diff --git a/edge/pkg/edgehub/upgrade/image_prepull.go b/edge/pkg/edgehub/upgrade/image_prepull.go
deleted file mode 100644
index d02a4cdc9..000000000
--- a/edge/pkg/edgehub/upgrade/image_prepull.go
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
-Copyright 2023 The KubeEdge Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package upgrade
-
-import (
- "encoding/json"
- "fmt"
- "strings"
-
- v1 "k8s.io/api/core/v1"
- runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
- "k8s.io/klog/v2"
-
- beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
- "github.com/kubeedge/beehive/pkg/core/model"
- cloudmodules "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
- "github.com/kubeedge/kubeedge/common/constants"
- commontypes "github.com/kubeedge/kubeedge/common/types"
- "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options"
- "github.com/kubeedge/kubeedge/edge/pkg/common/modules"
- "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients"
- "github.com/kubeedge/kubeedge/edge/pkg/edgehub/common/msghandler"
- metaclient "github.com/kubeedge/kubeedge/edge/pkg/metamanager/client"
- "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util"
- "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
-)
-
-func init() {
- handler := &prepullHandler{}
- msghandler.RegisterHandler(handler)
-}
-
-type prepullHandler struct{}
-
-func (uh *prepullHandler) Filter(message *model.Message) bool {
- name := message.GetGroup()
- return name == cloudmodules.ImagePrePullControllerModuleGroup
-}
-
-func (uh *prepullHandler) Process(message *model.Message, clientHub clients.Adapter) error {
- // get edgecore config
- edgecoreconfig := options.GetEdgeCoreConfig()
- if edgecoreconfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint == "" {
- edgecoreconfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint = edgecoreconfig.Modules.Edged.RemoteRuntimeEndpoint
- }
- nodeName := edgecoreconfig.Modules.Edged.HostnameOverride
-
- var errmsg, jobName string
- defer func() {
- if errmsg != "" {
- sendResponseMessage(nodeName, jobName, errmsg, nil, v1alpha1.PrePullFailed)
- }
- }()
-
- jobName, err := parseJobName(message.GetResource())
- if err != nil {
- errmsg = fmt.Sprintf("failed to parse prepull resource, err: %v", err)
- return fmt.Errorf(errmsg)
- }
-
- // parse message request
- var prePullReq commontypes.ImagePrePullJobRequest
- data, err := message.GetContentData()
- if err != nil {
- errmsg = fmt.Sprintf("failed to get message content, err: %v", err)
- return fmt.Errorf(errmsg)
- }
- err = json.Unmarshal(data, &prePullReq)
- if err != nil {
- errmsg = fmt.Sprintf("failed to unmarshal message content, err: %v", err)
- return fmt.Errorf(errmsg)
- }
- if nodeName != prePullReq.NodeName {
- errmsg = fmt.Sprintf("request node name %s is not match with the edge node %s", prePullReq.NodeName, nodeName)
- return fmt.Errorf(errmsg)
- }
-
- // todo check items such as disk according to req.checkitem config
-
- // pull images
- container, err := util.NewContainerRuntime(edgecoreconfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint, edgecoreconfig.Modules.Edged.TailoredKubeletConfig.CgroupDriver)
- if err != nil {
- errmsg = fmt.Sprintf("failed to new container runtime: %v", err)
- return fmt.Errorf(errmsg)
- }
-
- go func() {
- pullMsg, resp, state := prePullImages(prePullReq, container)
- sendResponseMessage(nodeName, jobName, pullMsg, resp, state)
- }()
- return nil
-}
-
-func sendResponseMessage(nodeName, jobName, pullMsg string, imageStatus []v1alpha1.ImageStatus, state v1alpha1.PrePullState) {
- resp := commontypes.ImagePrePullJobResponse{
- NodeName: nodeName,
- State: state,
- Reason: pullMsg,
- ImageStatus: imageStatus,
- }
- msg := model.NewMessage("").SetRoute(modules.EdgeHubModuleName, modules.HubGroup).
- SetResourceOperation(fmt.Sprintf("node/%s/prepull/%s", nodeName, jobName), "prepull").FillBody(resp)
- beehiveContext.Send(modules.EdgeHubModuleName, *msg)
-}
-
-func prePullImages(prePullReq commontypes.ImagePrePullJobRequest, container util.ContainerRuntime) (string, []v1alpha1.ImageStatus, v1alpha1.PrePullState) {
- var errmsg string
- authConfig, err := makeAuthConfig(prePullReq.Secret)
- if err != nil {
- errmsg = fmt.Sprintf("failed to get prepull secret, err: %v", err)
- return errmsg, nil, v1alpha1.PrePullFailed
- }
-
- state := v1alpha1.PrePullSuccessful
- var imageStatus []v1alpha1.ImageStatus
- for _, image := range prePullReq.Images {
- prePullStatus := v1alpha1.ImageStatus{
- Image: image,
- }
- for i := 0; i < int(prePullReq.RetryTimes); i++ {
- err = container.PullImage(image, authConfig, nil)
- if err == nil {
- break
- }
- }
- if err != nil {
- klog.Errorf("pull image %s failed, err: %v", image, err)
- errmsg = "prepull images failed"
- state = v1alpha1.PrePullFailed
- prePullStatus.State = v1alpha1.PrePullFailed
- prePullStatus.Reason = err.Error()
- } else {
- klog.Infof("pull image %s successfully!", image)
- prePullStatus.State = v1alpha1.PrePullSuccessful
- }
- imageStatus = append(imageStatus, prePullStatus)
- }
-
- return errmsg, imageStatus, state
-}
-
-func parseJobName(resource string) (string, error) {
- var jobName string
- sli := strings.Split(resource, constants.ResourceSep)
- if len(sli) != 2 {
- return jobName, fmt.Errorf("the resource %s is not the standard type", resource)
- }
- return sli[1], nil
-}
-
-func makeAuthConfig(pullsecret string) (*runtimeapi.AuthConfig, error) {
- if pullsecret == "" {
- return nil, nil
- }
-
- secretSli := strings.Split(pullsecret, constants.ResourceSep)
- if len(secretSli) != 2 {
- return nil, fmt.Errorf("pull secret format is not correct")
- }
-
- client := metaclient.New()
- secret, err := client.Secrets(secretSli[0]).Get(secretSli[1])
- if err != nil {
- return nil, fmt.Errorf("get secret %s failed, %v", secretSli[1], err)
- }
-
- var auth runtimeapi.AuthConfig
- err = json.Unmarshal(secret.Data[v1.DockerConfigJsonKey], &auth)
- if err != nil {
- return nil, fmt.Errorf("unmarshal secret %s to auth file failed, %v", secretSli[1], err)
- }
-
- return &auth, nil
-}
diff --git a/edge/pkg/edgehub/upgrade/upgrade_test.go b/edge/pkg/edgehub/upgrade/upgrade_test.go
deleted file mode 100644
index 2660f9d79..000000000
--- a/edge/pkg/edgehub/upgrade/upgrade_test.go
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-Copyright 2022 The KubeEdge Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package upgrade
-
-import (
- "testing"
-
- "github.com/kubeedge/beehive/pkg/core/model"
-)
-
-func TestFilter(t *testing.T) {
- uh := &upgradeHandler{}
-
- tests := []struct {
- msg *model.Message
- want bool
- name string
- }{
- {
- msg: &model.Message{
- Router: model.MessageRoute{Group: "nodeupgradejobcontroller"},
- },
- want: true,
- name: "Node upgrade job controlled",
- },
- {
- msg: &model.Message{
- Router: model.MessageRoute{Group: "devicecontroller"},
- },
- want: false,
- name: "Device controller",
- },
- }
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- if got := uh.Filter(tt.msg); got != tt.want {
- t.Errorf("upgradeHandler.Filter() retuned unexpected result. got = %v, want = %v", got, tt.want)
- }
- })
- }
-}
diff --git a/keadm/cmd/keadm/app/cmd/cmd_others.go b/keadm/cmd/keadm/app/cmd/cmd_others.go
index 71c699d52..56c394608 100644
--- a/keadm/cmd/keadm/app/cmd/cmd_others.go
+++ b/keadm/cmd/keadm/app/cmd/cmd_others.go
@@ -89,5 +89,7 @@ func NewKubeedgeCommand() *cobra.Command {
cmds.AddCommand(beta.NewBeta())
cmds.AddCommand(edge.NewEdgeUpgrade())
+ cmds.AddCommand(edge.NewEdgeRollback())
+
return cmds
}
diff --git a/keadm/cmd/keadm/app/cmd/edge/rollback.go b/keadm/cmd/keadm/app/cmd/edge/rollback.go
new file mode 100644
index 000000000..cacd279a3
--- /dev/null
+++ b/keadm/cmd/keadm/app/cmd/edge/rollback.go
@@ -0,0 +1,113 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package edge
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/spf13/cobra"
+ "k8s.io/klog/v2"
+ "sigs.k8s.io/yaml"
+
+ "github.com/kubeedge/kubeedge/common/constants"
+ "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util"
+ "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha2"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+ "github.com/kubeedge/kubeedge/pkg/version"
+)
+
+// NewEdgeUpgrade returns KubeEdge edge upgrade command.
+func NewEdgeRollback() *cobra.Command {
+ rollbackOptions := newRollbackOptions()
+
+ cmd := &cobra.Command{
+ Use: "rollback",
+ Short: "rollback edge component. Rollback the edge node to the desired version.",
+ Long: "Rollback edge component. Rollback the edge node to the desired version.",
+ RunE: func(cmd *cobra.Command, args []string) error {
+ // rollback edge core
+ return rollbackEdgeCore(rollbackOptions)
+ },
+ }
+
+ addRollbackFlags(cmd, rollbackOptions)
+ return cmd
+}
+
+// newJoinOptions returns a struct ready for being used for creating cmd join flags.
+func newRollbackOptions() *RollbackOptions {
+ opts := &RollbackOptions{}
+ opts.HistoryVersion = version.Get().String()
+ opts.Config = constants.DefaultConfigDir + "edgecore.yaml"
+
+ return opts
+}
+
+func rollbackEdgeCore(ro *RollbackOptions) error {
+ // get EdgeCore configuration from edgecore.yaml config file
+ data, err := os.ReadFile(ro.Config)
+ if err != nil {
+ return fmt.Errorf("failed to read config file %s: %v", ro.Config, err)
+ }
+
+ configure := &v1alpha2.EdgeCoreConfig{}
+ err = yaml.Unmarshal(data, configure)
+ if err != nil {
+ return fmt.Errorf("failed to unmarshal config file %s: %v", ro.Config, err)
+ }
+ event := &fsm.Event{
+ Type: "RollBack",
+ Action: api.ActionSuccess,
+ }
+ defer func() {
+ // report upgrade result to cloudhub
+ if err = util.ReportTaskResult(configure, ro.TaskType, ro.TaskName, *event); err != nil {
+ klog.Warning("failed to report upgrade result to cloud: %v", err)
+ }
+ }()
+
+ rbErr := rollback(ro.HistoryVersion, configure.DataBase.DataSource, ro.Config)
+ if rbErr != nil {
+ event.Action = api.ActionFailure
+ event.Msg = fmt.Sprintf("upgrade error: %v, rollback error: %v", err, rbErr)
+ }
+
+ return nil
+}
+
+type RollbackOptions struct {
+ HistoryVersion string
+ TaskType string
+ TaskName string
+ Config string
+}
+
+func addRollbackFlags(cmd *cobra.Command, rollbackOptions *RollbackOptions) {
+ cmd.Flags().StringVar(&rollbackOptions.HistoryVersion, "history", rollbackOptions.HistoryVersion,
+ "Use this key to specify the origin version before upgrade")
+
+ cmd.Flags().StringVar(&rollbackOptions.Config, "config", rollbackOptions.Config,
+ "Use this key to specify the path to the edgecore configuration file.")
+
+ cmd.Flags().StringVar(&rollbackOptions.TaskType, "type", "rollback",
+ "Use this key to specify the task type for reporting status.")
+
+ cmd.Flags().StringVar(&rollbackOptions.TaskName, "name", rollbackOptions.TaskName,
+ "Use this key to specify the task name for reporting status.")
+}
diff --git a/keadm/cmd/keadm/app/cmd/edge/upgrade.go b/keadm/cmd/keadm/app/cmd/edge/upgrade.go
index 001323679..2d5f934d8 100644
--- a/keadm/cmd/keadm/app/cmd/edge/upgrade.go
+++ b/keadm/cmd/keadm/app/cmd/edge/upgrade.go
@@ -17,28 +17,21 @@ limitations under the License.
package edge
import (
- "bytes"
- "crypto/tls"
- "crypto/x509"
- "encoding/json"
"fmt"
"io"
- "net"
- "net/http"
"os"
"path/filepath"
- "time"
"github.com/spf13/cobra"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"
"github.com/kubeedge/kubeedge/common/constants"
- commontypes "github.com/kubeedge/kubeedge/common/types"
"github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/common"
"github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util"
"github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha2"
- upgradev1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
)
var (
@@ -93,91 +86,96 @@ func (up *UpgradeOptions) upgrade() error {
HistoryID: up.HistoryID,
FromVersion: up.FromVersion,
ToVersion: up.ToVersion,
+ TaskType: up.TaskType,
Image: up.Image,
+ DisableBackup: up.DisableBackup,
ConfigFilePath: up.Config,
EdgeCoreConfig: configure,
}
+ event := &fsm.Event{
+ Type: "Upgrade",
+ Action: api.ActionSuccess,
+ }
defer func() {
// report upgrade result to cloudhub
- if err := upgrade.reportUpgradeResult(); err != nil {
+ if err = util.ReportTaskResult(configure, upgrade.TaskType, upgrade.UpgradeID, *event); err != nil {
klog.Errorf("failed to report upgrade result to cloud: %v", err)
}
// cleanup idempotency record
- if err := os.Remove(idempotencyRecord); err != nil {
+ if err = os.Remove(idempotencyRecord); err != nil {
klog.Errorf("failed to remove idempotency_record file(%s): %v", idempotencyRecord, err)
}
}()
// only allow upgrade when last upgrade finished
if util.FileExists(idempotencyRecord) {
- upgrade.UpdateStatus(string(upgradev1alpha1.UpgradeFailedRollbackSuccess))
- upgrade.UpdateFailureReason("last upgrade not finished, not allowed upgrade again")
+ event.Action = api.ActionFailure
+ event.Msg = "last upgrade not finished, not allowed upgrade again"
return fmt.Errorf("last upgrade not finished, not allowed upgrade again")
}
// create idempotency_record file
if err := os.MkdirAll(filepath.Dir(idempotencyRecord), 0750); err != nil {
- upgrade.UpdateStatus(string(upgradev1alpha1.UpgradeFailedRollbackSuccess))
reason := fmt.Sprintf("failed to create idempotency_record dir: %v", err)
- upgrade.UpdateFailureReason(reason)
+ event.Action = api.ActionFailure
+ event.Msg = reason
return fmt.Errorf(reason)
}
if _, err := os.Create(idempotencyRecord); err != nil {
- upgrade.UpdateStatus(string(upgradev1alpha1.UpgradeFailedRollbackSuccess))
reason := fmt.Sprintf("failed to create idempotency_record file: %v", err)
- upgrade.UpdateFailureReason(reason)
+ event.Action = api.ActionFailure
+ event.Msg = reason
return fmt.Errorf(reason)
}
// run script to do upgrade operation
err = upgrade.PreProcess()
if err != nil {
- upgrade.UpdateStatus(string(upgradev1alpha1.UpgradeFailedRollbackSuccess))
- upgrade.UpdateFailureReason(fmt.Sprintf("upgrade error: %v", err))
+ event.Action = api.ActionFailure
+ event.Msg = fmt.Sprintf("upgrade pre process failed: %v", err)
return fmt.Errorf("upgrade pre process failed: %v", err)
}
err = upgrade.Process()
if err != nil {
+ event.Type = "Rollback"
rbErr := upgrade.Rollback()
if rbErr != nil {
- upgrade.UpdateStatus(string(upgradev1alpha1.UpgradeFailedRollbackFailed))
- upgrade.UpdateFailureReason(fmt.Sprintf("upgrade error: %v, rollback error: %v", err, rbErr))
+ event.Action = api.ActionFailure
+ event.Msg = rbErr.Error()
} else {
- upgrade.UpdateStatus(string(upgradev1alpha1.UpgradeFailedRollbackSuccess))
- upgrade.UpdateFailureReason(fmt.Sprintf("upgrade error: %v", err))
+ event.Msg = err.Error()
}
return fmt.Errorf("upgrade process failed: %v", err)
}
- upgrade.UpdateStatus(string(upgradev1alpha1.UpgradeSuccess))
-
return nil
}
func (up *Upgrade) PreProcess() error {
- klog.Infof("upgrade preprocess start")
- backupPath := filepath.Join(util.KubeEdgeBackupPath, up.FromVersion)
- if err := os.MkdirAll(backupPath, 0750); err != nil {
- return fmt.Errorf("mkdirall failed: %v", err)
- }
+ // download the request version edgecore
+ klog.Infof("Begin to download version %s edgecore", up.ToVersion)
+ if !up.DisableBackup {
+ backupPath := filepath.Join(util.KubeEdgeBackupPath, up.FromVersion)
+ if err := os.MkdirAll(backupPath, 0750); err != nil {
+ return fmt.Errorf("mkdirall failed: %v", err)
+ }
- // backup edgecore.db: copy from origin path to backup path
- if err := copy(up.EdgeCoreConfig.DataBase.DataSource, filepath.Join(backupPath, "edgecore.db")); err != nil {
- return fmt.Errorf("failed to backup db: %v", err)
- }
- // backup edgecore.yaml: copy from origin path to backup path
- if err := copy(up.ConfigFilePath, filepath.Join(backupPath, "edgecore.yaml")); err != nil {
- return fmt.Errorf("failed to back config: %v", err)
- }
- // backup edgecore: copy from origin path to backup path
- if err := copy(filepath.Join(util.KubeEdgeUsrBinPath, util.KubeEdgeBinaryName), filepath.Join(backupPath, util.KubeEdgeBinaryName)); err != nil {
- return fmt.Errorf("failed to backup edgecore: %v", err)
+ // backup edgecore.db: copy from origin path to backup path
+ if err := copy(up.EdgeCoreConfig.DataBase.DataSource, filepath.Join(backupPath, "edgecore.db")); err != nil {
+ return fmt.Errorf("failed to backup db: %v", err)
+ }
+ // backup edgecore.yaml: copy from origin path to backup path
+ if err := copy(up.ConfigFilePath, filepath.Join(backupPath, "edgecore.yaml")); err != nil {
+ return fmt.Errorf("failed to back config: %v", err)
+ }
+ // backup edgecore: copy from origin path to backup path
+ if err := copy(filepath.Join(util.KubeEdgeUsrBinPath, util.KubeEdgeBinaryName), filepath.Join(backupPath, util.KubeEdgeBinaryName)); err != nil {
+ return fmt.Errorf("failed to backup edgecore: %v", err)
+ }
}
- // download the request version edgecore
- klog.Infof("Begin to download version %s edgecore", up.ToVersion)
upgradePath := filepath.Join(util.KubeEdgeUpgradePath, up.ToVersion)
if up.EdgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint == "" {
up.EdgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint = up.EdgeCoreConfig.Modules.Edged.RemoteRuntimeEndpoint
@@ -266,6 +264,10 @@ func (up *Upgrade) Process() error {
}
func (up *Upgrade) Rollback() error {
+ return rollback(up.FromVersion, up.EdgeCoreConfig.DataBase.DataSource, up.ConfigFilePath)
+}
+
+func rollback(HistoryVersion, dataSource, configFilePath string) error {
klog.Infof("upgrade rollback process start")
// stop edgecore
@@ -277,12 +279,12 @@ func (up *Upgrade) Rollback() error {
// rollback origin config/db/binary
// backup edgecore.db: copy from backup path to origin path
- backupPath := filepath.Join(util.KubeEdgeBackupPath, up.FromVersion)
- if err := copy(filepath.Join(backupPath, "edgecore.db"), up.EdgeCoreConfig.DataBase.DataSource); err != nil {
+ backupPath := filepath.Join(util.KubeEdgeBackupPath, HistoryVersion)
+ if err := copy(filepath.Join(backupPath, "edgecore.db"), dataSource); err != nil {
return fmt.Errorf("failed to rollback db: %v", err)
}
// backup edgecore.yaml: copy from backup path to origin path
- if err := copy(filepath.Join(backupPath, "edgecore.yaml"), up.ConfigFilePath); err != nil {
+ if err := copy(filepath.Join(backupPath, "edgecore.yaml"), configFilePath); err != nil {
return fmt.Errorf("failed to back config: %v", err)
}
// backup edgecore: copy from backup path to origin path
@@ -292,7 +294,7 @@ func (up *Upgrade) Rollback() error {
// generate edgecore.service
if util.HasSystemd() {
- err = common.GenerateServiceFile(util.KubeEdgeBinaryName, fmt.Sprintf("%s --config %s", filepath.Join(util.KubeEdgeUsrBinPath, util.KubeEdgeBinaryName), up.ConfigFilePath), false)
+ err = common.GenerateServiceFile(util.KubeEdgeBinaryName, fmt.Sprintf("%s --config %s", filepath.Join(util.KubeEdgeUsrBinPath, util.KubeEdgeBinaryName), configFilePath), false)
if err != nil {
return fmt.Errorf("failed to create edgecore.service file: %v", err)
}
@@ -303,7 +305,6 @@ func (up *Upgrade) Rollback() error {
if err != nil {
return fmt.Errorf("failed to start origin edgecore: %v", err)
}
-
return nil
}
@@ -315,67 +316,15 @@ func (up *Upgrade) UpdateFailureReason(reason string) {
up.Reason = reason
}
-func (up *Upgrade) reportUpgradeResult() error {
- resp := &commontypes.NodeUpgradeJobResponse{
- UpgradeID: up.UpgradeID,
- HistoryID: up.HistoryID,
- NodeName: up.EdgeCoreConfig.Modules.Edged.HostnameOverride,
- FromVersion: up.FromVersion,
- ToVersion: up.ToVersion,
- Status: up.Status,
- Reason: up.Reason,
- }
-
- var caCrt []byte
- caCertPath := up.EdgeCoreConfig.Modules.EdgeHub.TLSCAFile
- caCrt, err := os.ReadFile(caCertPath)
- if err != nil {
- return fmt.Errorf("failed to read ca: %v", err)
- }
-
- rootCAs := x509.NewCertPool()
- rootCAs.AppendCertsFromPEM(caCrt)
-
- certFile := up.EdgeCoreConfig.Modules.EdgeHub.TLSCertFile
- keyFile := up.EdgeCoreConfig.Modules.EdgeHub.TLSPrivateKeyFile
- cliCrt, err := tls.LoadX509KeyPair(certFile, keyFile)
-
- transport := &http.Transport{
- DialContext: (&net.Dialer{
- Timeout: 30 * time.Second,
- KeepAlive: 30 * time.Second,
- }).DialContext,
- // use TLS configuration
- TLSClientConfig: &tls.Config{
- RootCAs: rootCAs,
- InsecureSkipVerify: false,
- Certificates: []tls.Certificate{cliCrt},
- },
- }
-
- client := &http.Client{Transport: transport, Timeout: 30 * time.Second}
-
- respData, err := json.Marshal(resp)
- if err != nil {
- return fmt.Errorf("marshal failed: %v", err)
- }
- url := up.EdgeCoreConfig.Modules.EdgeHub.HTTPServer + constants.DefaultNodeUpgradeURL
- result, err := client.Post(url, "application/json", bytes.NewReader(respData))
- if err != nil {
- return fmt.Errorf("post http request failed: %v", err)
- }
- defer result.Body.Close()
-
- return nil
-}
-
type UpgradeOptions struct {
- UpgradeID string
- HistoryID string
- FromVersion string
- ToVersion string
- Config string
- Image string
+ UpgradeID string
+ HistoryID string
+ FromVersion string
+ ToVersion string
+ Config string
+ Image string
+ DisableBackup bool
+ TaskType string
}
type Upgrade struct {
@@ -384,7 +333,9 @@ type Upgrade struct {
FromVersion string
ToVersion string
Image string
+ DisableBackup bool
ConfigFilePath string
+ TaskType string
EdgeCoreConfig *v1alpha2.EdgeCoreConfig
Status string
@@ -409,4 +360,10 @@ func addUpgradeFlags(cmd *cobra.Command, upgradeOptions *UpgradeOptions) {
cmd.Flags().StringVar(&upgradeOptions.Image, "image", upgradeOptions.Image,
"Use this key to specify installation image to download.")
+
+ cmd.Flags().StringVar(&upgradeOptions.TaskType, "type", "upgrade",
+ "Use this key to specify the task type for reporting status.")
+
+ cmd.Flags().BoolVar(&upgradeOptions.DisableBackup, "disable-backup", upgradeOptions.DisableBackup,
+ "Use this key to specify the backup enable for upgrade.")
}
diff --git a/keadm/cmd/keadm/app/cmd/util/common.go b/keadm/cmd/keadm/app/cmd/util/common.go
index 4cbc2c243..a5065484e 100644
--- a/keadm/cmd/keadm/app/cmd/util/common.go
+++ b/keadm/cmd/keadm/app/cmd/util/common.go
@@ -18,16 +18,22 @@ package util
import (
"archive/tar"
+ "bytes"
"compress/gzip"
"crypto/sha512"
+ "crypto/tls"
+ "crypto/x509"
+ "encoding/json"
"fmt"
"io"
+ "net"
"net/http"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
+ "time"
"github.com/blang/semver"
"github.com/spf13/pflag"
@@ -39,8 +45,11 @@ import (
"k8s.io/klog/v2"
"github.com/kubeedge/kubeedge/common/constants"
+ commontypes "github.com/kubeedge/kubeedge/common/types"
types "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/common"
+ "github.com/kubeedge/kubeedge/pkg/apis"
"github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha2"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
pkgversion "github.com/kubeedge/kubeedge/pkg/version"
)
@@ -639,3 +648,57 @@ func downloadServiceFile(componentType types.ComponentType, version semver.Versi
}
return nil
}
+
+func ReportTaskResult(config *v1alpha2.EdgeCoreConfig, taskType, taskID string, event fsm.Event) error {
+ resp := &commontypes.NodeTaskResponse{
+ NodeName: config.Modules.Edged.HostnameOverride,
+ Event: event.Type,
+ Action: event.Action,
+ Time: time.Now().Format(apis.ISO8601UTC),
+ Reason: event.Msg,
+ }
+ edgeHub := config.Modules.EdgeHub
+ var caCrt []byte
+ caCertPath := edgeHub.TLSCAFile
+ caCrt, err := os.ReadFile(caCertPath)
+ if err != nil {
+ return fmt.Errorf("failed to read ca: %v", err)
+ }
+
+ rootCAs := x509.NewCertPool()
+ rootCAs.AppendCertsFromPEM(caCrt)
+
+ certFile := edgeHub.TLSCertFile
+ keyFile := edgeHub.TLSPrivateKeyFile
+ cliCrt, err := tls.LoadX509KeyPair(certFile, keyFile)
+
+ transport := &http.Transport{
+ DialContext: (&net.Dialer{
+ Timeout: 30 * time.Second,
+ KeepAlive: 30 * time.Second,
+ }).DialContext,
+ // use TLS configuration
+ TLSClientConfig: &tls.Config{
+ RootCAs: rootCAs,
+ InsecureSkipVerify: false,
+ Certificates: []tls.Certificate{cliCrt},
+ },
+ }
+
+ client := &http.Client{Transport: transport, Timeout: 30 * time.Second}
+
+ respData, err := json.Marshal(resp)
+ if err != nil {
+ return fmt.Errorf("marshal failed: %v", err)
+ }
+ url := edgeHub.HTTPServer + fmt.Sprintf("/task/%s/name/%s/node/%s/status", taskType, taskID, config.Modules.Edged.HostnameOverride)
+ result, err := client.Post(url, "application/json", bytes.NewReader(respData))
+
+ if err != nil {
+ return fmt.Errorf("post http request failed: %v", err)
+ }
+ klog.Error("report result ", result)
+ defer result.Body.Close()
+
+ return nil
+}
diff --git a/manifests/charts/cloudcore/crds/operations_v1alpha1_imageprepulljob.yaml b/manifests/charts/cloudcore/crds/operations_v1alpha1_imageprepulljob.yaml
index 903516d32..a044a1532 100644
--- a/manifests/charts/cloudcore/crds/operations_v1alpha1_imageprepulljob.yaml
+++ b/manifests/charts/cloudcore/crds/operations_v1alpha1_imageprepulljob.yaml
@@ -48,6 +48,16 @@ spec:
items:
type: string
type: array
+ concurrency:
+ description: Concurrency specifies the maximum number of edge
+ nodes that can pull images at the same time. The default Concurrency
+ value is 1.
+ format: int32
+ type: integer
+ failureTolerate:
+ description: FailureTolerate specifies the task tolerance failure
+ ratio. The default FailureTolerate value is 0.1.
+ type: string
imageSecrets:
description: ImageSecret specifies the secret for image pull if
private registry used. Use {namespace}/{secretName} in format.
@@ -119,10 +129,10 @@ spec:
failed on each edgenode. Default to 0
format: int32
type: integer
- timeoutSecondsOnEachNode:
- description: TimeoutSecondsOnEachNode limits the duration of the
- image prepull job on each edgenode. Default to 360. If set to
- 0, we'll use the default value 360.
+ timeoutSeconds:
+ description: TimeoutSeconds limits the duration of the node prepull
+ job on each edgenode. Default to 300. If set to 0, we'll use
+ the default value 300.
format: int32
type: integer
type: object
@@ -130,14 +140,21 @@ spec:
status:
description: Status represents the status of ImagePrePullJob.
properties:
+ action:
+ description: 'Action represents for the action of the ImagePrePullJob.
+ There are two possible action values: Success, Failure.'
+ type: string
+ event:
+ description: 'Event represents for the event of the ImagePrePullJob.
+ There are four possible event values: Init, Check, Pull, TimeOut.'
+ type: string
+ reason:
+ description: Reason represents for the reason of the ImagePrePullJob.
+ type: string
state:
description: 'State represents for the state phase of the ImagePrePullJob.
- There are four possible state values: "", prechecking, prepulling,
- successful, failed.'
- enum:
- - prepulling
- - successful
- - failed
+ There are five possible state values: "", checking, pulling, successful,
+ failed.'
type: string
status:
description: Status contains image prepull status for each edge node.
@@ -163,31 +180,42 @@ spec:
description: 'State represents for the state phase of
this image pull on the edge node There are two possible
state values: successful, failed.'
- enum:
- - prepulling
- - successful
- - failed
type: string
type: object
type: array
- nodeName:
- description: NodeName is the name of edge node.
- type: string
- reason:
- description: Reason represents the fail reason if images prepull
- failed on the edge node
- type: string
- state:
- description: 'State represents for the state phase of the ImagePrepullJob
- on the edge node. There are five possible state values: "",
- prepulling, successful, failed.'
- enum:
- - prepulling
- - successful
- - failed
- type: string
+ nodeStatus:
+ description: TaskStatus represents the status for each node
+ properties:
+ action:
+ description: 'Action represents for the action of the ImagePrePullJob.
+ There are three possible action values: Success, Failure,
+ TimeOut.'
+ type: string
+ event:
+ description: 'Event represents for the event of the ImagePrePullJob.
+ There are three possible event values: Init, Check, Pull.'
+ type: string
+ nodeName:
+ description: NodeName is the name of edge node.
+ type: string
+ reason:
+ description: Reason represents for the reason of the ImagePrePullJob.
+ type: string
+ state:
+ description: 'State represents for the upgrade state phase
+ of the edge node. There are several possible state values:
+ "", Upgrading, BackingUp, RollingBack and Checking.'
+ type: string
+ time:
+ description: Time represents for the running time of the
+ ImagePrePullJob.
+ type: string
+ type: object
type: object
type: array
+ time:
+ description: Time represents for the running time of the ImagePrePullJob.
+ type: string
type: object
required:
- spec
diff --git a/manifests/charts/cloudcore/crds/operations_v1alpha1_nodeupgradejob.yaml b/manifests/charts/cloudcore/crds/operations_v1alpha1_nodeupgradejob.yaml
index 130df65ac..958272424 100644
--- a/manifests/charts/cloudcore/crds/operations_v1alpha1_nodeupgradejob.yaml
+++ b/manifests/charts/cloudcore/crds/operations_v1alpha1_nodeupgradejob.yaml
@@ -36,12 +36,22 @@ spec:
spec:
description: Specification of the desired behavior of NodeUpgradeJob.
properties:
+ checkItems:
+ description: CheckItems specifies the items need to be checked before
+ the task is executed. The default CheckItems value is nil.
+ items:
+ type: string
+ type: array
concurrency:
description: Concurrency specifies the max number of edge nodes that
can be upgraded at the same time. The default Concurrency value
is 1.
format: int32
type: integer
+ failureTolerate:
+ description: FailureTolerate specifies the task tolerance failure
+ ratio. The default FailureTolerate value is 0.1.
+ type: string
image:
description: 'Image specifies a container image name, the image contains:
keadm and edgecore. keadm is used as upgradetool, to install the
@@ -111,75 +121,71 @@ spec:
job. Default to 300. If set to 0, we'll use the default value 300.
format: int32
type: integer
- upgradeTool:
- description: UpgradeTool is a request to decide use which upgrade
- tool. If it is empty, the upgrade job simply use default upgrade
- tool keadm to do upgrade operation.
- type: string
version:
type: string
type: object
status:
description: Most recently observed status of the NodeUpgradeJob.
properties:
- state:
- description: 'State represents for the state phase of the NodeUpgradeJob.
- There are three possible state values: "", upgrading and completed.'
- enum:
- - upgrading
- - completed
+ action:
+ description: 'Action represents for the action of the ImagePrePullJob.
+ There are two possible action values: Success, Failure.'
type: string
- status:
+ currentVersion:
+ description: CurrentVersion represents for the current status of the
+ EdgeCore.
+ type: string
+ event:
+ description: 'Event represents for the event of the ImagePrePullJob.
+ There are six possible event values: Init, Check, BackUp, Upgrade,
+ TimeOut, Rollback.'
+ type: string
+ historicVersion:
+ description: HistoricVersion represents for the historic status of
+ the EdgeCore.
+ type: string
+ nodeStatus:
description: Status contains upgrade Status for each edge node.
items:
- description: UpgradeStatus stores the status of Upgrade for each
- edge node.
+ description: TaskStatus stores the status of Upgrade for each edge
+ node.
properties:
- history:
- description: History is the last upgrade result of the edge
- node.
- properties:
- fromVersion:
- description: FromVersion is the version which the edge node
- is upgraded from.
- type: string
- historyID:
- description: HistoryID is to uniquely identify an Upgrade
- Operation.
- type: string
- reason:
- description: Reason is the error reason of Upgrade failure.
- If the upgrade is successful, this reason is an empty
- string.
- type: string
- result:
- description: Result represents the result of upgrade.
- enum:
- - upgrade_success
- - upgrade_failed_rollback_success
- - upgrade_failed_rollback_failed
- type: string
- toVersion:
- description: ToVersion is the version which the edge node
- is upgraded to.
- type: string
- upgradeTime:
- description: UpgradeTime is the time of this Upgrade.
- type: string
- type: object
+ action:
+ description: 'Action represents for the action of the ImagePrePullJob.
+ There are three possible action values: Success, Failure,
+ TimeOut.'
+ type: string
+ event:
+ description: 'Event represents for the event of the ImagePrePullJob.
+ There are three possible event values: Init, Check, Pull.'
+ type: string
nodeName:
description: NodeName is the name of edge node.
type: string
+ reason:
+ description: Reason represents for the reason of the ImagePrePullJob.
+ type: string
state:
description: 'State represents for the upgrade state phase of
- the edge node. There are three possible state values: "",
- upgrading and completed.'
- enum:
- - upgrading
- - completed
+ the edge node. There are several possible state values: "",
+ Upgrading, BackingUp, RollingBack and Checking.'
+ type: string
+ time:
+ description: Time represents for the running time of the ImagePrePullJob.
type: string
type: object
type: array
+ reason:
+ description: Reason represents for the reason of the ImagePrePullJob.
+ type: string
+ state:
+ description: 'State represents for the state phase of the NodeUpgradeJob.
+ There are several possible state values: "", Upgrading, BackingUp,
+ RollingBack and Checking.'
+ type: string
+ time:
+ description: Time represents for the running time of the ImagePrePullJob.
+ type: string
type: object
type: object
served: true
diff --git a/manifests/charts/cloudcore/templates/configmap_cloudcore.yaml b/manifests/charts/cloudcore/templates/configmap_cloudcore.yaml
index 2986cfd91..23ca97d38 100644
--- a/manifests/charts/cloudcore/templates/configmap_cloudcore.yaml
+++ b/manifests/charts/cloudcore/templates/configmap_cloudcore.yaml
@@ -53,7 +53,5 @@ data:
iptablesManager:
enable: {{ .Values.iptablesManager.enable }}
mode: {{ .Values.iptablesManager.mode }}
- nodeUpgradeJobController:
- enable: {{ .Values.cloudCore.modules.nodeUpgradeJobController.enable }}
- imagePrePullController:
- enable: {{ .Values.cloudCore.modules.imagePrePullController.enable }}
+ taskManager:
+ enable: {{ .Values.cloudCore.modules.taskManager.enable }}
diff --git a/manifests/charts/cloudcore/values.yaml b/manifests/charts/cloudcore/values.yaml
index a09482c03..db61c8e14 100644
--- a/manifests/charts/cloudcore/values.yaml
+++ b/manifests/charts/cloudcore/values.yaml
@@ -57,9 +57,7 @@ cloudCore:
enable: false
router:
enable: false
- nodeUpgradeJobController:
- enable: false
- imagePrePullController:
+ taskManager:
enable: false
service:
enable: true
diff --git a/manifests/profiles/version.yaml b/manifests/profiles/version.yaml
index 5b3deb7e1..d5a1bf5c2 100644
--- a/manifests/profiles/version.yaml
+++ b/manifests/profiles/version.yaml
@@ -51,9 +51,7 @@ cloudCore:
enable: false
router:
enable: false
- nodeUpgradeJobController:
- enable: false
- imagePrePullController:
+ taskManager:
enable: false
service:
enable: false
diff --git a/pkg/apis/common.go b/pkg/apis/common.go
new file mode 100644
index 000000000..6f556a651
--- /dev/null
+++ b/pkg/apis/common.go
@@ -0,0 +1,3 @@
+package apis
+
+const ISO8601UTC = "2006-01-02T15:04:05Z"
diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go
index 66128ec49..9e58cd38a 100644
--- a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go
+++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go
@@ -98,24 +98,14 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig {
UpdateDeviceStatusWorkers: constants.DefaultUpdateDeviceStatusWorkers,
},
},
- NodeUpgradeJobController: &NodeUpgradeJobController{
+ TaskManager: &TaskManager{
Enable: false,
- Buffer: &NodeUpgradeJobControllerBuffer{
- UpdateNodeUpgradeJobStatus: constants.DefaultNodeUpgradeJobStatusBuffer,
- NodeUpgradeJobEvent: constants.DefaultNodeUpgradeJobEventBuffer,
+ Buffer: &TaskManagerBuffer{
+ TaskStatus: constants.DefaultNodeUpgradeJobStatusBuffer,
+ TaskEvent: constants.DefaultNodeUpgradeJobEventBuffer,
},
- Load: &NodeUpgradeJobControllerLoad{
- NodeUpgradeJobWorkers: constants.DefaultNodeUpgradeJobWorkers,
- },
- },
- ImagePrePullController: &ImagePrePullController{
- Enable: false,
- Buffer: &ImagePrePullControllerBuffer{
- ImagePrePullJobStatus: constants.DefaultImagePrePullJobStatusBuffer,
- ImagePrePullJobEvent: constants.DefaultImagePrePullJobEventBuffer,
- },
- Load: &ImagePrePullControllerLoad{
- ImagePrePullJobWorkers: constants.DefaultImagePrePullJobWorkers,
+ Load: &TaskManagerLoad{
+ TaskWorkers: constants.DefaultNodeUpgradeJobWorkers,
},
},
SyncController: &SyncController{
diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go
index 955960fe0..db46c094b 100644
--- a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go
+++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go
@@ -88,10 +88,8 @@ type Modules struct {
EdgeController *EdgeController `json:"edgeController,omitempty"`
// DeviceController indicates DeviceController module config
DeviceController *DeviceController `json:"deviceController,omitempty"`
- // NodeUpgradeJobController indicates NodeUpgradeJobController module config
- NodeUpgradeJobController *NodeUpgradeJobController `json:"nodeUpgradeJobController,omitempty"`
- // ImagePrePullController indicates ImagePrePullController module config
- ImagePrePullController *ImagePrePullController `json:"imagePrePullController,omitempty"`
+ // TaskManager indicates TaskManager module config
+ TaskManager *TaskManager `json:"taskManager,omitempty"`
// SyncController indicates SyncController module config
SyncController *SyncController `json:"syncController,omitempty"`
// DynamicController indicates DynamicController module config
@@ -381,33 +379,33 @@ type DeviceControllerLoad struct {
UpdateDeviceStatusWorkers int32 `json:"updateDeviceStatusWorkers,omitempty"`
}
-// NodeUpgradeJobController indicates the operations controller
-type NodeUpgradeJobController struct {
- // Enable indicates whether NodeUpgradeJobController is enabled,
- // if set to false (for debugging etc.), skip checking other NodeUpgradeJobController configs.
+// TaskManager indicates the operations controller
+type TaskManager struct {
+ // Enable indicates whether TaskManager is enabled,
+ // if set to false (for debugging etc.), skip checking other TaskManager configs.
// default false
Enable bool `json:"enable"`
// Buffer indicates Operation Controller buffer
- Buffer *NodeUpgradeJobControllerBuffer `json:"buffer,omitempty"`
+ Buffer *TaskManagerBuffer `json:"buffer,omitempty"`
// Load indicates Operation Controller Load
- Load *NodeUpgradeJobControllerLoad `json:"load,omitempty"`
+ Load *TaskManagerLoad `json:"load,omitempty"`
}
-// NodeUpgradeJobControllerBuffer indicates NodeUpgradeJobController buffer
-type NodeUpgradeJobControllerBuffer struct {
- // UpdateNodeUpgradeJobStatus indicates the buffer of update NodeUpgradeJob status
+// TaskManagerBuffer indicates TaskManager buffer
+type TaskManagerBuffer struct {
+ // TaskStatus indicates the buffer of update NodeUpgradeJob status
// default 1024
- UpdateNodeUpgradeJobStatus int32 `json:"updateNodeUpgradeJobStatus,omitempty"`
- // NodeUpgradeJobEvent indicates the buffer of NodeUpgradeJob event
+ TaskStatus int32 `json:"taskStatus,omitempty"`
+ // TaskEvent indicates the buffer of NodeUpgradeJob event
// default 1
- NodeUpgradeJobEvent int32 `json:"nodeUpgradeJobEvent,omitempty"`
+ TaskEvent int32 `json:"taskEvent,omitempty"`
}
-// NodeUpgradeJobControllerLoad indicates the NodeUpgradeJobController load
-type NodeUpgradeJobControllerLoad struct {
- // NodeUpgradeJobWorkers indicates the load of update NodeUpgradeJob workers
+// TaskManagerLoad indicates the TaskManager load
+type TaskManagerLoad struct {
+ // TaskWorkers indicates the load of update NodeUpgradeJob workers
// default 1
- NodeUpgradeJobWorkers int32 `json:"nodeUpgradeJobWorkers,omitempty"`
+ TaskWorkers int32 `json:"taskWorkers,omitempty"`
}
// ImagePrePullController indicates the operations controller
diff --git a/cloud/pkg/nodeupgradejobcontroller/config/config.go b/pkg/apis/fsm/v1alpha1/backup_task.go
index b98f9ea89..1fd6c3983 100644
--- a/cloud/pkg/nodeupgradejobcontroller/config/config.go
+++ b/pkg/apis/fsm/v1alpha1/backup_task.go
@@ -1,5 +1,5 @@
/*
-Copyright 2022 The KubeEdge Authors.
+Copyright 2023 The KubeEdge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -14,25 +14,18 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package config
+package v1alpha1
-import (
- "sync"
-
- "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1"
+const (
+ BackingUpState State = "BackingUp"
)
-var Config Configure
-var once sync.Once
+var BackupRule = map[string]State{
+ "/Init/Success": TaskChecking,
-type Configure struct {
- v1alpha1.NodeUpgradeJobController
-}
+ "Checking/Check/Success": BackingUpState,
+ "Checking/Check/Failure": TaskFailed,
-func InitConfigure(dc *v1alpha1.NodeUpgradeJobController) {
- once.Do(func() {
- Config = Configure{
- NodeUpgradeJobController: *dc,
- }
- })
+ "BackingUp/Backup/Success": TaskSuccessful,
+ "BackingUp/Backup/Failure": TaskFailed,
}
diff --git a/pkg/apis/fsm/v1alpha1/fsm.go b/pkg/apis/fsm/v1alpha1/fsm.go
new file mode 100644
index 000000000..f93c6a870
--- /dev/null
+++ b/pkg/apis/fsm/v1alpha1/fsm.go
@@ -0,0 +1,45 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package v1alpha1
+
+type State string
+
+type Action string
+
+const (
+ NodeAvailable State = "Available"
+ NodeUpgrading State = "Upgrading"
+ NodeRollingBack State = "RollingBack"
+ NodeConfigUpdating State = "ConfigUpdating"
+)
+
+const (
+ TaskInit State = "Init"
+ TaskChecking State = "Checking"
+ TaskSuccessful State = "Successful"
+ TaskFailed State = "Failed"
+ TaskPause State = "Pause"
+)
+
+const (
+ ActionSuccess Action = "Success"
+ ActionFailure Action = "Failure"
+)
+
+const (
+ EventTimeOut = "TimeOut"
+)
diff --git a/pkg/apis/fsm/v1alpha1/image_prepull_task.go b/pkg/apis/fsm/v1alpha1/image_prepull_task.go
new file mode 100644
index 000000000..30b91be3a
--- /dev/null
+++ b/pkg/apis/fsm/v1alpha1/image_prepull_task.go
@@ -0,0 +1,42 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package v1alpha1
+
+const (
+ PullingState State = "Pulling"
+)
+
+// CurrentState/Event/Action: NextState
+var PrePullRule = map[string]State{
+ "Init/Init/Success": TaskChecking,
+ "Init/Init/Failure": TaskFailed,
+ "Init/TimeOut/Failure": TaskFailed,
+
+ "Checking/Check/Success": PullingState,
+ "Checking/Check/Failure": TaskFailed,
+ "Checking/TimeOut/Failure": TaskFailed,
+
+ "Pulling/Pull/Success": TaskSuccessful,
+ "Pulling/Pull/Failure": TaskFailed,
+ "Pulling/TimeOut/Failure": TaskFailed,
+}
+
+var PrePullStageSequence = map[State]State{
+ "": TaskChecking,
+ TaskInit: TaskChecking,
+ TaskChecking: PullingState,
+}
diff --git a/pkg/apis/fsm/v1alpha1/rollback_task.go b/pkg/apis/fsm/v1alpha1/rollback_task.go
new file mode 100644
index 000000000..f90d50583
--- /dev/null
+++ b/pkg/apis/fsm/v1alpha1/rollback_task.go
@@ -0,0 +1,31 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package v1alpha1
+
+const (
+ RollingBackState State = "RollingBack"
+)
+
+var RollbackRule = map[string]State{
+ "/Init/Success": TaskChecking,
+
+ "Checking/Check/Success": RollingBackState,
+ "Checking/Check/Failure": TaskFailed,
+
+ "RollingBack/Rollback/Failure": TaskFailed,
+ "RollingBack/Rollback/Success": TaskFailed,
+}
diff --git a/pkg/apis/fsm/v1alpha1/upgrade_task.go b/pkg/apis/fsm/v1alpha1/upgrade_task.go
new file mode 100644
index 000000000..0f1102fb4
--- /dev/null
+++ b/pkg/apis/fsm/v1alpha1/upgrade_task.go
@@ -0,0 +1,61 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package v1alpha1
+
+const (
+ UpgradingState State = "Upgrading"
+)
+
+// CurrentState/Event/Action: NextState
+var UpgradeRule = map[string]State{
+ "Init/Init/Success": TaskChecking,
+ "Init/Init/Failure": TaskFailed,
+ "Init/TimeOut/Failure": TaskFailed,
+ "Init/Upgrade/Success": TaskSuccessful,
+
+ "Checking/Check/Success": BackingUpState,
+ "Checking/Check/Failure": TaskFailed,
+ "Checking/TimeOut/Failure": TaskFailed,
+
+ "BackingUp/Backup/Success": UpgradingState,
+ "BackingUp/Backup/Failure": TaskFailed,
+ "BackingUp/TimeOut/Failure": TaskFailed,
+
+ "Upgrading/Upgrade/Success": TaskSuccessful,
+ "Upgrading/Upgrade/Failure": TaskFailed,
+ "Upgrading/TimeOut/Failure": TaskFailed,
+
+ // TODO provide options for task failure, such as successful node upgrade rollback.
+ "RollingBack/Rollback/Failure": TaskFailed,
+ "RollingBack/TimeOut/Failure": TaskFailed,
+ "RollingBack/Rollback/Success": TaskFailed,
+
+ "Upgrading/Rollback/Failure": TaskFailed,
+ "Upgrading/Rollback/Success": TaskFailed,
+
+ //TODO delete in version 1.18
+ "Init/Rollback/Failure": TaskFailed,
+ "Init/Rollback/Success": TaskFailed,
+}
+
+var UpdateStageSequence = map[State]State{
+ "": TaskChecking,
+ TaskInit: TaskChecking,
+ TaskChecking: BackingUpState,
+ BackingUpState: UpgradingState,
+ UpgradingState: RollingBackState,
+}
diff --git a/pkg/apis/operations/v1alpha1/imageprepull_types.go b/pkg/apis/operations/v1alpha1/imageprepull_types.go
index 0ac53a975..7518c5d4a 100644
--- a/pkg/apis/operations/v1alpha1/imageprepull_types.go
+++ b/pkg/apis/operations/v1alpha1/imageprepull_types.go
@@ -18,6 +18,8 @@ package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
)
// +genclient
@@ -84,42 +86,54 @@ type ImagePrePullTemplate struct {
// +optional
CheckItems []string `json:"checkItems,omitempty"`
+ // FailureTolerate specifies the task tolerance failure ratio.
+ // The default FailureTolerate value is 0.1.
+ // +optional
+ FailureTolerate string `json:"failureTolerate,omitempty"`
+
+ // Concurrency specifies the maximum number of edge nodes that can pull images at the same time.
+ // The default Concurrency value is 1.
+ // +optional
+ Concurrency int32 `json:"concurrency,omitempty"`
+
+ // TimeoutSeconds limits the duration of the node prepull job on each edgenode.
+ // Default to 300.
+ // If set to 0, we'll use the default value 300.
+ // +optional
+ TimeoutSeconds *uint32 `json:"timeoutSeconds,omitempty"`
+
// ImageSecret specifies the secret for image pull if private registry used.
// Use {namespace}/{secretName} in format.
// +optional
ImageSecret string `json:"imageSecrets,omitempty"`
- // TimeoutSecondsOnEachNode limits the duration of the image prepull job on each edgenode.
- // Default to 360.
- // If set to 0, we'll use the default value 360.
- // +optional
- TimeoutSecondsOnEachNode *uint32 `json:"timeoutSecondsOnEachNode,omitempty"`
-
// RetryTimes specifies the retry times if image pull failed on each edgenode.
// Default to 0
// +optional
RetryTimes int32 `json:"retryTimes,omitempty"`
}
-// PrePullState describe the PrePullState of image prepull operation on edge nodes.
-// +kubebuilder:validation:Enum=prepulling;successful;failed
-type PrePullState string
-
-// Valid values of PrepullState
-const (
- PrePullInitialValue PrePullState = ""
- PrePulling PrePullState = "prepulling"
- PrePullSuccessful PrePullState = "successful"
- PrePullFailed PrePullState = "failed"
-)
-
// ImagePrePullJobStatus stores the status of ImagePrePullJob.
// contains images prepull status on multiple edge nodes.
// +kubebuilder:validation:Type=object
type ImagePrePullJobStatus struct {
// State represents for the state phase of the ImagePrePullJob.
- // There are four possible state values: "", prechecking, prepulling, successful, failed.
- State PrePullState `json:"state,omitempty"`
+ // There are five possible state values: "", checking, pulling, successful, failed.
+ State api.State `json:"state,omitempty"`
+
+ // Event represents for the event of the ImagePrePullJob.
+ // There are four possible event values: Init, Check, Pull, TimeOut.
+ Event string `json:"event,omitempty"`
+
+ // Action represents for the action of the ImagePrePullJob.
+ // There are two possible action values: Success, Failure.
+ Action api.Action `json:"action,omitempty"`
+
+ // Reason represents for the reason of the ImagePrePullJob.
+ Reason string `json:"reason,omitempty"`
+
+ // Time represents for the running time of the ImagePrePullJob.
+ Time string `json:"time,omitempty"`
// Status contains image prepull status for each edge node.
Status []ImagePrePullStatus `json:"status,omitempty"`
@@ -128,16 +142,8 @@ type ImagePrePullJobStatus struct {
// ImagePrePullStatus stores image prepull status for each edge node.
// +kubebuilder:validation:Type=object
type ImagePrePullStatus struct {
- // NodeName is the name of edge node.
- NodeName string `json:"nodeName,omitempty"`
-
- // State represents for the state phase of the ImagePrepullJob on the edge node.
- // There are five possible state values: "", prepulling, successful, failed.
- State PrePullState `json:"state,omitempty"`
-
- // Reason represents the fail reason if images prepull failed on the edge node
- Reason string `json:"reason,omitempty"`
-
+ // TaskStatus represents the status for each node
+ *TaskStatus `json:"nodeStatus,omitempty"`
// ImageStatus represents the prepull status for each image
ImageStatus []ImageStatus `json:"imageStatus,omitempty"`
}
@@ -150,7 +156,7 @@ type ImageStatus struct {
// State represents for the state phase of this image pull on the edge node
// There are two possible state values: successful, failed.
- State PrePullState `json:"state,omitempty"`
+ State api.State `json:"state,omitempty"`
// Reason represents the fail reason if image pull failed
// +optional
diff --git a/pkg/apis/operations/v1alpha1/type.go b/pkg/apis/operations/v1alpha1/type.go
index 1b45d5860..09b42c13f 100644
--- a/pkg/apis/operations/v1alpha1/type.go
+++ b/pkg/apis/operations/v1alpha1/type.go
@@ -18,6 +18,8 @@ package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
)
// +genclient
@@ -58,10 +60,6 @@ type NodeUpgradeJobList struct {
type NodeUpgradeJobSpec struct {
// +Required: Version is the EdgeCore version to upgrade.
Version string `json:"version,omitempty"`
- // UpgradeTool is a request to decide use which upgrade tool. If it is empty,
- // the upgrade job simply use default upgrade tool keadm to do upgrade operation.
- // +optional
- UpgradeTool string `json:"upgradeTool,omitempty"`
// TimeoutSeconds limits the duration of the node upgrade job.
// Default to 300.
// If set to 0, we'll use the default value 300.
@@ -91,67 +89,60 @@ type NodeUpgradeJobSpec struct {
// The default Concurrency value is 1.
// +optional
Concurrency int32 `json:"concurrency,omitempty"`
-}
-
-// UpgradeResult describe the result status of upgrade operation on edge nodes.
-// +kubebuilder:validation:Enum=upgrade_success;upgrade_failed_rollback_success;upgrade_failed_rollback_failed
-type UpgradeResult string
-// upgrade operation status
-const (
- UpgradeSuccess UpgradeResult = "upgrade_success"
- UpgradeFailedRollbackSuccess UpgradeResult = "upgrade_failed_rollback_success"
- UpgradeFailedRollbackFailed UpgradeResult = "upgrade_failed_rollback_failed"
-)
-
-// UpgradeState describe the UpgradeState of upgrade operation on edge nodes.
-// +kubebuilder:validation:Enum=upgrading;completed
-type UpgradeState string
+ // CheckItems specifies the items need to be checked before the task is executed.
+ // The default CheckItems value is nil.
+ // +optional
+ CheckItems []string `json:"checkItems,omitempty"`
-// Valid values of UpgradeState
-const (
- InitialValue UpgradeState = ""
- Upgrading UpgradeState = "upgrading"
- Completed UpgradeState = "completed"
-)
+ // FailureTolerate specifies the task tolerance failure ratio.
+ // The default FailureTolerate value is 0.1.
+ // +optional
+ FailureTolerate string `json:"failureTolerate,omitempty"`
+}
// NodeUpgradeJobStatus stores the status of NodeUpgradeJob.
// contains multiple edge nodes upgrade status.
// +kubebuilder:validation:Type=object
type NodeUpgradeJobStatus struct {
// State represents for the state phase of the NodeUpgradeJob.
- // There are three possible state values: "", upgrading and completed.
- State UpgradeState `json:"state,omitempty"`
+ // There are several possible state values: "", Upgrading, BackingUp, RollingBack and Checking.
+ State api.State `json:"state,omitempty"`
+
+ // CurrentVersion represents for the current status of the EdgeCore.
+ CurrentVersion string `json:"currentVersion,omitempty"`
+ // HistoricVersion represents for the historic status of the EdgeCore.
+ HistoricVersion string `json:"historicVersion,omitempty"`
+ // Event represents for the event of the ImagePrePullJob.
+ // There are six possible event values: Init, Check, BackUp, Upgrade, TimeOut, Rollback.
+ Event string `json:"event,omitempty"`
+ // Action represents for the action of the ImagePrePullJob.
+ // There are two possible action values: Success, Failure.
+ Action api.Action `json:"action,omitempty"`
+ // Reason represents for the reason of the ImagePrePullJob.
+ Reason string `json:"reason,omitempty"`
+ // Time represents for the running time of the ImagePrePullJob.
+ Time string `json:"time,omitempty"`
// Status contains upgrade Status for each edge node.
- Status []UpgradeStatus `json:"status,omitempty"`
+ Status []TaskStatus `json:"nodeStatus,omitempty"`
}
-// UpgradeStatus stores the status of Upgrade for each edge node.
+// TaskStatus stores the status of Upgrade for each edge node.
// +kubebuilder:validation:Type=object
-type UpgradeStatus struct {
+type TaskStatus struct {
// NodeName is the name of edge node.
NodeName string `json:"nodeName,omitempty"`
// State represents for the upgrade state phase of the edge node.
- // There are three possible state values: "", upgrading and completed.
- State UpgradeState `json:"state,omitempty"`
- // History is the last upgrade result of the edge node.
- History History `json:"history,omitempty"`
-}
-
-// History stores the information about upgrade history record.
-// +kubebuilder:validation:Type=object
-type History struct {
- // HistoryID is to uniquely identify an Upgrade Operation.
- HistoryID string `json:"historyID,omitempty"`
- // FromVersion is the version which the edge node is upgraded from.
- FromVersion string `json:"fromVersion,omitempty"`
- // ToVersion is the version which the edge node is upgraded to.
- ToVersion string `json:"toVersion,omitempty"`
- // Result represents the result of upgrade.
- Result UpgradeResult `json:"result,omitempty"`
- // Reason is the error reason of Upgrade failure.
- // If the upgrade is successful, this reason is an empty string.
+ // There are several possible state values: "", Upgrading, BackingUp, RollingBack and Checking.
+ State api.State `json:"state,omitempty"`
+ // Event represents for the event of the ImagePrePullJob.
+ // There are three possible event values: Init, Check, Pull.
+ Event string `json:"event,omitempty"`
+ // Action represents for the action of the ImagePrePullJob.
+ // There are three possible action values: Success, Failure, TimeOut.
+ Action api.Action `json:"action,omitempty"`
+ // Reason represents for the reason of the ImagePrePullJob.
Reason string `json:"reason,omitempty"`
- // UpgradeTime is the time of this Upgrade.
- UpgradeTime string `json:"upgradeTime,omitempty"`
+ // Time represents for the running time of the ImagePrePullJob.
+ Time string `json:"time,omitempty"`
}
diff --git a/pkg/apis/operations/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/operations/v1alpha1/zz_generated.deepcopy.go
index 8b38fd4ec..bf26dabe8 100644
--- a/pkg/apis/operations/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/operations/v1alpha1/zz_generated.deepcopy.go
@@ -27,22 +27,6 @@ import (
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
-func (in *History) DeepCopyInto(out *History) {
- *out = *in
- return
-}
-
-// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new History.
-func (in *History) DeepCopy() *History {
- if in == nil {
- return nil
- }
- out := new(History)
- in.DeepCopyInto(out)
- return out
-}
-
-// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ImagePrePullJob) DeepCopyInto(out *ImagePrePullJob) {
*out = *in
out.TypeMeta = in.TypeMeta
@@ -146,6 +130,11 @@ func (in *ImagePrePullJobStatus) DeepCopy() *ImagePrePullJobStatus {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ImagePrePullStatus) DeepCopyInto(out *ImagePrePullStatus) {
*out = *in
+ if in.TaskStatus != nil {
+ in, out := &in.TaskStatus, &out.TaskStatus
+ *out = new(TaskStatus)
+ **out = **in
+ }
if in.ImageStatus != nil {
in, out := &in.ImageStatus, &out.ImageStatus
*out = make([]ImageStatus, len(*in))
@@ -187,8 +176,8 @@ func (in *ImagePrePullTemplate) DeepCopyInto(out *ImagePrePullTemplate) {
*out = make([]string, len(*in))
copy(*out, *in)
}
- if in.TimeoutSecondsOnEachNode != nil {
- in, out := &in.TimeoutSecondsOnEachNode, &out.TimeoutSecondsOnEachNode
+ if in.TimeoutSeconds != nil {
+ in, out := &in.TimeoutSeconds, &out.TimeoutSeconds
*out = new(uint32)
**out = **in
}
@@ -300,6 +289,11 @@ func (in *NodeUpgradeJobSpec) DeepCopyInto(out *NodeUpgradeJobSpec) {
*out = new(v1.LabelSelector)
(*in).DeepCopyInto(*out)
}
+ if in.CheckItems != nil {
+ in, out := &in.CheckItems, &out.CheckItems
+ *out = make([]string, len(*in))
+ copy(*out, *in)
+ }
return
}
@@ -318,7 +312,7 @@ func (in *NodeUpgradeJobStatus) DeepCopyInto(out *NodeUpgradeJobStatus) {
*out = *in
if in.Status != nil {
in, out := &in.Status, &out.Status
- *out = make([]UpgradeStatus, len(*in))
+ *out = make([]TaskStatus, len(*in))
copy(*out, *in)
}
return
@@ -335,18 +329,17 @@ func (in *NodeUpgradeJobStatus) DeepCopy() *NodeUpgradeJobStatus {
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
-func (in *UpgradeStatus) DeepCopyInto(out *UpgradeStatus) {
+func (in *TaskStatus) DeepCopyInto(out *TaskStatus) {
*out = *in
- out.History = in.History
return
}
-// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpgradeStatus.
-func (in *UpgradeStatus) DeepCopy() *UpgradeStatus {
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskStatus.
+func (in *TaskStatus) DeepCopy() *TaskStatus {
if in == nil {
return nil
}
- out := new(UpgradeStatus)
+ out := new(TaskStatus)
in.DeepCopyInto(out)
return out
}
diff --git a/pkg/util/fsm/fsm.go b/pkg/util/fsm/fsm.go
new file mode 100644
index 000000000..03bbd21f0
--- /dev/null
+++ b/pkg/util/fsm/fsm.go
@@ -0,0 +1,139 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package fsm
+
+import (
+ "fmt"
+
+ "k8s.io/klog/v2"
+
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+)
+
+type FSM struct {
+ id string
+ nodeName string
+ lastState api.State
+ currentFunc func(id, nodeName string) (api.State, error)
+ updateFunc func(id, nodeName string, state api.State, event Event) error
+ guard map[string]api.State
+ stageSequence map[api.State]api.State
+}
+
+func (F *FSM) NodeName(nodeName string) *FSM {
+ F.nodeName = nodeName
+ return F
+}
+
+type Event struct {
+ Type string
+ Action api.Action
+ Msg string
+ ExternalMessage string
+}
+
+func (e Event) UniqueName() string {
+ return e.Type + "/" + string(e.Action)
+}
+
+func (F *FSM) ID(id string) *FSM {
+ F.id = id
+ return F
+}
+
+func (F *FSM) LastState(lastState api.State) {
+ F.lastState = lastState
+}
+
+func (F *FSM) CurrentFunc(currentFunc func(id, nodeName string) (api.State, error)) *FSM {
+ F.currentFunc = currentFunc
+ return F
+}
+
+func (F *FSM) UpdateFunc(updateFunc func(id, nodeName string, state api.State, event Event) error) *FSM {
+ F.updateFunc = updateFunc
+ return F
+}
+
+func (F *FSM) Guard(guard map[string]api.State) *FSM {
+ F.guard = guard
+ return F
+}
+
+func (F *FSM) StageSequence(stageSequence map[api.State]api.State) *FSM {
+ F.stageSequence = stageSequence
+ return F
+}
+
+func (F *FSM) CurrentState() (api.State, error) {
+ if F.currentFunc == nil {
+ return "", fmt.Errorf("currentFunc is nil")
+ }
+ return F.currentFunc(F.id, F.nodeName)
+}
+
+func (F *FSM) transitCheck(event Event) (api.State, api.State, error) {
+ currentState, err := F.CurrentState()
+ if err != nil {
+ return "", "", err
+ }
+ if F.guard == nil {
+ return "", "", fmt.Errorf("guard is nil ")
+ }
+ nextState, ok := F.guard[string(currentState)+"/"+event.UniqueName()]
+ if !ok {
+ return "", "", fmt.Errorf(string(currentState)+"/"+event.UniqueName(), " unsupported event")
+ }
+ return currentState, nextState, nil
+}
+
+func (F *FSM) AllowTransit(event Event) error {
+ _, _, err := F.transitCheck(event)
+ return err
+}
+
+func (F *FSM) Transit(event Event) error {
+ currentState, nextState, err := F.transitCheck(event)
+ if err != nil {
+ return err
+ }
+ if F.updateFunc == nil {
+ return fmt.Errorf("updateFunc is nil")
+ }
+ err = F.updateFunc(F.id, F.nodeName, nextState, event)
+ if err != nil {
+ return err
+ }
+ F.lastState = currentState
+ return nil
+}
+
+func TaskFinish(state api.State) bool {
+ return state == api.TaskFailed || state == api.TaskSuccessful
+}
+
+func (F *FSM) TaskStagCompleted(state api.State) bool {
+ currentState, err := F.CurrentState()
+ if err != nil {
+ klog.Error("get %s current state failed: %s", F.id, err.Error())
+ return false
+ }
+ if F.stageSequence[currentState] == state || TaskFinish(state) {
+ return true
+ }
+ return false
+}