summaryrefslogtreecommitdiff
path: root/edge
diff options
context:
space:
mode:
authorKubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com>2024-01-17 17:50:27 +0800
committerGitHub <noreply@github.com>2024-01-17 17:50:27 +0800
commitdadbeb0e72565822afca48cb3f66a17473d7d585 (patch)
tree253eded79262e2f0d83a3da66ae945c9985eec0f /edge
parentMerge pull request #5321 from luomengY/ci-cri (diff)
parentsupport edge nodes upgrade and image pre pull (diff)
downloadkubeedge-dadbeb0e72565822afca48cb3f66a17473d7d585.tar.gz
Merge pull request #5330 from ZhengXinwei-F/task-manager
Implement task manager to complete cloud edge task execution
Diffstat (limited to 'edge')
-rw-r--r--edge/pkg/common/util/util.go32
-rw-r--r--edge/pkg/edgehub/edgehub.go4
-rw-r--r--edge/pkg/edgehub/task/task_handler.go78
-rw-r--r--edge/pkg/edgehub/task/taskexecutor/image_prepull.go184
-rw-r--r--edge/pkg/edgehub/task/taskexecutor/node_backup.go106
-rw-r--r--edge/pkg/edgehub/task/taskexecutor/node_rollback.go72
-rw-r--r--edge/pkg/edgehub/task/taskexecutor/node_upgrade.go (renamed from edge/pkg/edgehub/upgrade/upgrade.go)169
-rw-r--r--edge/pkg/edgehub/task/taskexecutor/pre_check.go158
-rw-r--r--edge/pkg/edgehub/task/taskexecutor/task_executor.go93
-rw-r--r--edge/pkg/edgehub/upgrade/image_prepull.go188
-rw-r--r--edge/pkg/edgehub/upgrade/upgrade_test.go56
11 files changed, 819 insertions, 321 deletions
diff --git a/edge/pkg/common/util/util.go b/edge/pkg/common/util/util.go
new file mode 100644
index 000000000..ec978c191
--- /dev/null
+++ b/edge/pkg/common/util/util.go
@@ -0,0 +1,32 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package util
+
+import (
+ "fmt"
+
+ beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
+ "github.com/kubeedge/beehive/pkg/core/model"
+ "github.com/kubeedge/kubeedge/common/types"
+ "github.com/kubeedge/kubeedge/edge/pkg/common/modules"
+)
+
+func ReportTaskResult(taskType, taskID string, resp types.NodeTaskResponse) {
+ msg := model.NewMessage("").SetRoute(modules.EdgeHubModuleName, modules.HubGroup).
+ SetResourceOperation(fmt.Sprintf("task/%s/node/%s", taskID, resp.NodeName), taskType).FillBody(resp)
+ beehiveContext.Send(modules.EdgeHubModuleName, *msg)
+}
diff --git a/edge/pkg/edgehub/edgehub.go b/edge/pkg/edgehub/edgehub.go
index 2498d07f2..bf63f1f9e 100644
--- a/edge/pkg/edgehub/edgehub.go
+++ b/edge/pkg/edgehub/edgehub.go
@@ -13,8 +13,8 @@ import (
"github.com/kubeedge/kubeedge/edge/pkg/edgehub/certificate"
"github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients"
"github.com/kubeedge/kubeedge/edge/pkg/edgehub/config"
- // register Upgrade handler
- _ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/upgrade"
+ // register Task handler
+ _ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/task"
"github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha2"
)
diff --git a/edge/pkg/edgehub/task/task_handler.go b/edge/pkg/edgehub/task/task_handler.go
new file mode 100644
index 000000000..ac42d64bb
--- /dev/null
+++ b/edge/pkg/edgehub/task/task_handler.go
@@ -0,0 +1,78 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package task
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "github.com/kubeedge/beehive/pkg/core/model"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
+ commontypes "github.com/kubeedge/kubeedge/common/types"
+ "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options"
+ "github.com/kubeedge/kubeedge/edge/pkg/common/util"
+ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients"
+ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/common/msghandler"
+ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/task/taskexecutor"
+)
+
+func init() {
+ handler := &taskHandler{}
+ msghandler.RegisterHandler(handler)
+}
+
+type taskHandler struct{}
+
+func (th *taskHandler) Filter(message *model.Message) bool {
+ name := message.GetGroup()
+ return name == modules.TaskManagerModuleName
+}
+
+func (th *taskHandler) Process(message *model.Message, clientHub clients.Adapter) error {
+ taskReq := &commontypes.NodeTaskRequest{}
+ data, err := message.GetContentData()
+ if err != nil {
+ return fmt.Errorf("failed to get content data: %v", err)
+ }
+ err = json.Unmarshal(data, taskReq)
+ if err != nil {
+ return fmt.Errorf("unmarshal failed: %v", err)
+ }
+ executor, err := taskexecutor.GetExecutor(taskReq.Type)
+ if err != nil {
+ return err
+ }
+ event, err := executor.Do(*taskReq)
+ if err != nil {
+ return err
+ }
+
+ // use external tool like keadm
+ //Or the task needs to use the goroutine to control the message reply itself.
+ if event.Action == "" {
+ return nil
+ }
+
+ resp := commontypes.NodeTaskResponse{
+ NodeName: options.GetEdgeCoreConfig().Modules.Edged.HostnameOverride,
+ Event: event.Type,
+ Action: event.Action,
+ Reason: event.Msg,
+ }
+ util.ReportTaskResult(taskReq.Type, taskReq.TaskID, resp)
+ return nil
+}
diff --git a/edge/pkg/edgehub/task/taskexecutor/image_prepull.go b/edge/pkg/edgehub/task/taskexecutor/image_prepull.go
new file mode 100644
index 000000000..a2b19c8e7
--- /dev/null
+++ b/edge/pkg/edgehub/task/taskexecutor/image_prepull.go
@@ -0,0 +1,184 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package taskexecutor
+
+import (
+ "encoding/json"
+ "fmt"
+ "strings"
+
+ v1 "k8s.io/api/core/v1"
+ runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/kubeedge/common/constants"
+ "github.com/kubeedge/kubeedge/common/types"
+ commontypes "github.com/kubeedge/kubeedge/common/types"
+ "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options"
+ edgeutil "github.com/kubeedge/kubeedge/edge/pkg/common/util"
+ metaclient "github.com/kubeedge/kubeedge/edge/pkg/metamanager/client"
+ "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+)
+
+const (
+ TaskPrePull = "prepull"
+)
+
+type PrePull struct {
+ *BaseExecutor
+}
+
+func (p *PrePull) Name() string {
+ return p.name
+}
+
+func NewPrePullExecutor() Executor {
+ methods := map[string]func(types.NodeTaskRequest) fsm.Event{
+ string(api.TaskChecking): preCheck,
+ string(api.TaskInit): emptyInit,
+ "": emptyInit,
+ string(api.PullingState): pullImages,
+ }
+ return &PrePull{
+ BaseExecutor: NewBaseExecutor(TaskPrePull, methods),
+ }
+}
+
+func pullImages(taskReq types.NodeTaskRequest) fsm.Event {
+ event := fsm.Event{
+ Type: "Pull",
+ Action: api.ActionSuccess,
+ }
+
+ // get edgecore config
+ edgeCoreConfig := options.GetEdgeCoreConfig()
+ if edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint == "" {
+ edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint = edgeCoreConfig.Modules.Edged.RemoteRuntimeEndpoint
+ }
+
+ // parse message request
+ prePullReq, err := getImagePrePullJobRequest(taskReq)
+ if err != nil {
+ event.Msg = err.Error()
+ event.Action = api.ActionFailure
+ return event
+ }
+
+ // pull images
+ container, err := util.NewContainerRuntime(edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint, edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.CgroupDriver)
+ if err != nil {
+ event.Msg = err.Error()
+ event.Action = api.ActionFailure
+ return event
+ }
+
+ go func() {
+ errorStr, imageStatus := prePullImages(*prePullReq, container)
+ if errorStr != "" {
+ event.Action = api.ActionFailure
+ event.Msg = errorStr
+ }
+
+ data, err := json.Marshal(imageStatus)
+ if err != nil {
+ klog.Warningf("marshal imageStatus failed: %v", err)
+ }
+ resp := commontypes.NodeTaskResponse{
+ NodeName: edgeCoreConfig.Modules.Edged.HostnameOverride,
+ Event: event.Type,
+ Action: event.Action,
+ Reason: event.Msg,
+ ExternalMessage: string(data),
+ }
+ edgeutil.ReportTaskResult(taskReq.Type, taskReq.TaskID, resp)
+ }()
+ return fsm.Event{}
+}
+
+func getImagePrePullJobRequest(taskReq commontypes.NodeTaskRequest) (*commontypes.ImagePrePullJobRequest, error) {
+ var prePullReq commontypes.ImagePrePullJobRequest
+ data, err := json.Marshal(taskReq.Item)
+ if err != nil {
+ return nil, err
+ }
+ err = json.Unmarshal(data, &prePullReq)
+ if err != nil {
+ return nil, err
+ }
+ return &prePullReq, err
+}
+
+func prePullImages(prePullReq commontypes.ImagePrePullJobRequest, container util.ContainerRuntime) (string, []v1alpha1.ImageStatus) {
+ errorStr := ""
+ authConfig, err := makeAuthConfig(prePullReq.Secret)
+ if err != nil {
+ return errorStr, []v1alpha1.ImageStatus{}
+ }
+
+ var imageStatus []v1alpha1.ImageStatus
+ for _, image := range prePullReq.Images {
+ prePullStatus := v1alpha1.ImageStatus{
+ Image: image,
+ }
+ for i := 0; i < int(prePullReq.RetryTimes); i++ {
+ err = container.PullImage(image, authConfig, nil)
+ if err == nil {
+ break
+ }
+ }
+ if err != nil {
+ klog.Errorf("pull image %s failed, err: %v", image, err)
+ errorStr = fmt.Sprintf("pull image failed, err: %v", err)
+ prePullStatus.State = api.TaskFailed
+ prePullStatus.Reason = err.Error()
+ } else {
+ klog.Infof("pull image %s successfully!", image)
+ prePullStatus.State = api.TaskSuccessful
+ }
+ imageStatus = append(imageStatus, prePullStatus)
+ }
+
+ return errorStr, imageStatus
+}
+
+func makeAuthConfig(pullsecret string) (*runtimeapi.AuthConfig, error) {
+ if pullsecret == "" {
+ return nil, nil
+ }
+
+ secretSli := strings.Split(pullsecret, constants.ResourceSep)
+ if len(secretSli) != 2 {
+ return nil, fmt.Errorf("pull secret format is not correct")
+ }
+
+ client := metaclient.New()
+ secret, err := client.Secrets(secretSli[0]).Get(secretSli[1])
+ if err != nil {
+ return nil, fmt.Errorf("get secret %s failed, %v", secretSli[1], err)
+ }
+
+ var auth runtimeapi.AuthConfig
+ err = json.Unmarshal(secret.Data[v1.DockerConfigJsonKey], &auth)
+ if err != nil {
+ return nil, fmt.Errorf("unmarshal secret %s to auth file failed, %v", secretSli[1], err)
+ }
+
+ return &auth, nil
+}
diff --git a/edge/pkg/edgehub/task/taskexecutor/node_backup.go b/edge/pkg/edgehub/task/taskexecutor/node_backup.go
new file mode 100644
index 000000000..1d4583978
--- /dev/null
+++ b/edge/pkg/edgehub/task/taskexecutor/node_backup.go
@@ -0,0 +1,106 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package taskexecutor
+
+import (
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/kubeedge/common/constants"
+ commontypes "github.com/kubeedge/kubeedge/common/types"
+ "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options"
+ "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+ "github.com/kubeedge/kubeedge/pkg/version"
+)
+
+func backupNode(taskReq commontypes.NodeTaskRequest) (event fsm.Event) {
+ event = fsm.Event{
+ Type: "Backup",
+ Action: api.ActionSuccess,
+ }
+ var err error
+ defer func() {
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.Msg = err.Error()
+ }
+ }()
+ backupPath := filepath.Join(util.KubeEdgeBackupPath, version.Get().String())
+ err = backup(backupPath)
+ if err != nil {
+ cleanErr := os.Remove(backupPath)
+ if cleanErr != nil {
+ klog.Warningf("clean backup path failed: %s", err.Error())
+ }
+ return
+ }
+ return event
+}
+
+func backup(backupPath string) error {
+ config := options.GetEdgeCoreConfig()
+ klog.Infof("backup start, backup path: %s", backupPath)
+ if err := os.MkdirAll(backupPath, 0750); err != nil {
+ return fmt.Errorf("mkdirall failed: %v", err)
+ }
+
+ // backup edgecore.db: copy from origin path to backup path
+ if err := copy(config.DataBase.DataSource, filepath.Join(backupPath, "edgecore.db")); err != nil {
+ return fmt.Errorf("failed to backup db: %v", err)
+ }
+ // backup edgecore.yaml: copy from origin path to backup path
+ if err := copy(constants.DefaultConfigDir+"edgecore.yaml", filepath.Join(backupPath, "edgecore.yaml")); err != nil {
+ return fmt.Errorf("failed to back config: %v", err)
+ }
+ // backup edgecore: copy from origin path to backup path
+ if err := copy(filepath.Join(util.KubeEdgeUsrBinPath, util.KubeEdgeBinaryName), filepath.Join(backupPath, util.KubeEdgeBinaryName)); err != nil {
+ return fmt.Errorf("failed to backup edgecore: %v", err)
+ }
+ return nil
+}
+
+func copy(src, dst string) error {
+ sourceFileStat, err := os.Stat(src)
+ if err != nil {
+ return err
+ }
+
+ if !sourceFileStat.Mode().IsRegular() {
+ return fmt.Errorf("%s is not a regular file", src)
+ }
+
+ source, err := os.Open(src)
+ if err != nil {
+ return err
+ }
+ defer source.Close()
+
+ // copy file using src file mode
+ destination, err := os.OpenFile(dst, os.O_RDWR|os.O_CREATE|os.O_TRUNC, sourceFileStat.Mode())
+ if err != nil {
+ return err
+ }
+ defer destination.Close()
+ _, err = io.Copy(destination, source)
+ return err
+}
diff --git a/edge/pkg/edgehub/task/taskexecutor/node_rollback.go b/edge/pkg/edgehub/task/taskexecutor/node_rollback.go
new file mode 100644
index 000000000..85233dc45
--- /dev/null
+++ b/edge/pkg/edgehub/task/taskexecutor/node_rollback.go
@@ -0,0 +1,72 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package taskexecutor
+
+import (
+ "fmt"
+ "os/exec"
+
+ "k8s.io/klog/v2"
+
+ commontypes "github.com/kubeedge/kubeedge/common/types"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+ "github.com/kubeedge/kubeedge/pkg/version"
+)
+
+func rollbackNode(taskReq commontypes.NodeTaskRequest) (event fsm.Event) {
+ event = fsm.Event{
+ Type: "Rollback",
+ Action: api.ActionSuccess,
+ }
+ var err error
+ defer func() {
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.Msg = err.Error()
+ }
+ }()
+
+ var upgradeReq *commontypes.NodeUpgradeJobRequest
+ upgradeReq, err = getTaskRequest(taskReq)
+ if err != nil {
+ return
+ }
+
+ err = rollback(upgradeReq)
+ if err != nil {
+ return
+ }
+ return event
+}
+
+func rollback(upgradeReq *commontypes.NodeUpgradeJobRequest) error {
+ klog.Infof("Begin to run rollback command")
+ rollBackCmd := fmt.Sprintf("keadm rollback edge --name %s --history %s >> /tmp/keadm.log 2>&1",
+ upgradeReq.UpgradeID, version.Get())
+
+ // run upgrade cmd to upgrade edge node
+ // use nohup command to start a child progress
+ command := fmt.Sprintf("nohup %s &", rollBackCmd)
+ cmd := exec.Command("bash", "-c", command)
+ s, err := cmd.CombinedOutput()
+ if err != nil {
+ return fmt.Errorf("run rollback command %s failed: %v, %s", command, err, s)
+ }
+ klog.Infof("!!! Finish rollback ")
+ return nil
+}
diff --git a/edge/pkg/edgehub/upgrade/upgrade.go b/edge/pkg/edgehub/task/taskexecutor/node_upgrade.go
index e26cd50ca..8546ab96d 100644
--- a/edge/pkg/edgehub/upgrade/upgrade.go
+++ b/edge/pkg/edgehub/task/taskexecutor/node_upgrade.go
@@ -1,5 +1,5 @@
/*
-Copyright 2022 The KubeEdge Authors.
+Copyright 2023 The KubeEdge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -14,100 +14,137 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package upgrade
+package taskexecutor
import (
"encoding/json"
"fmt"
"os/exec"
"path/filepath"
- "strings"
- "sync"
"k8s.io/klog/v2"
- "github.com/kubeedge/beehive/pkg/core/model"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
+ "github.com/kubeedge/kubeedge/common/types"
commontypes "github.com/kubeedge/kubeedge/common/types"
"github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options"
- "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients"
- "github.com/kubeedge/kubeedge/edge/pkg/edgehub/common/msghandler"
"github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
"github.com/kubeedge/kubeedge/pkg/version"
)
-func init() {
- handler := &upgradeHandler{}
- msghandler.RegisterHandler(handler)
+const (
+ TaskUpgrade = "upgrade"
+)
- // register upgrade tool: keadm
- // if not specify it, also use default upgrade tool: keadm
- RegisterUpgradeProvider("", &keadmUpgrade{})
- RegisterUpgradeProvider("keadm", &keadmUpgrade{})
+type Upgrade struct {
+ *BaseExecutor
}
-type upgradeHandler struct{}
-
-func (uh *upgradeHandler) Filter(message *model.Message) bool {
- name := message.GetGroup()
- return name == modules.NodeUpgradeJobControllerModuleGroup
+func (u *Upgrade) Name() string {
+ return u.name
}
-func (uh *upgradeHandler) Process(message *model.Message, clientHub clients.Adapter) error {
- upgradeReq := &commontypes.NodeUpgradeJobRequest{}
- data, err := message.GetContentData()
- if err != nil {
- return fmt.Errorf("failed to get content data: %v", err)
+func NewUpgradeExecutor() Executor {
+ methods := map[string]func(types.NodeTaskRequest) fsm.Event{
+ string(api.TaskChecking): preCheck,
+ string(api.TaskInit): initUpgrade,
+ "": initUpgrade,
+ string(api.BackingUpState): backupNode,
+ string(api.RollingBackState): rollbackNode,
+ string(api.UpgradingState): upgrade,
}
- err = json.Unmarshal(data, upgradeReq)
- if err != nil {
- return fmt.Errorf("unmarshal failed: %v", err)
+ return &Upgrade{
+ BaseExecutor: NewBaseExecutor(TaskUpgrade, methods),
}
+}
- err = validateUpgrade(upgradeReq)
- if err != nil {
- return fmt.Errorf("upgrade request is not valid: %v", err)
+func initUpgrade(taskReq types.NodeTaskRequest) (event fsm.Event) {
+ event = fsm.Event{
+ Type: "Init",
+ Action: api.ActionSuccess,
}
+ var err error
+ defer func() {
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.Msg = err.Error()
+ }
+ }()
- tool := strings.ToLower(upgradeReq.UpgradeTool)
- if _, ok := upgradeToolProviders[tool]; !ok {
- return fmt.Errorf("not supported upgrade tool type: %v", upgradeReq.UpgradeTool)
+ var upgradeReq *commontypes.NodeUpgradeJobRequest
+ upgradeReq, err = getTaskRequest(taskReq)
+ if err != nil {
+ return
}
- return upgradeToolProviders[tool].Upgrade(upgradeReq)
+ if upgradeReq.UpgradeID == "" {
+ err = fmt.Errorf("upgradeID cannot be empty")
+ return
+ }
+ if upgradeReq.Version == version.Get().String() {
+ return fsm.Event{
+ Type: "Upgrading",
+ Action: api.ActionSuccess,
+ }
+ }
+ err = prepareKeadm(upgradeReq)
+ if err != nil {
+ return
+ }
+ return event
}
-func validateUpgrade(upgrade *commontypes.NodeUpgradeJobRequest) error {
- if upgrade.UpgradeID == "" {
- return fmt.Errorf("upgradeID cannot be empty")
+func getTaskRequest(taskReq commontypes.NodeTaskRequest) (*commontypes.NodeUpgradeJobRequest, error) {
+ data, err := json.Marshal(taskReq.Item)
+ if err != nil {
+ return nil, err
}
- if upgrade.Version == version.Get().String() {
- return fmt.Errorf("edge node already on specific version, no need to upgrade")
+ var upgradeReq commontypes.NodeUpgradeJobRequest
+ err = json.Unmarshal(data, &upgradeReq)
+ if err != nil {
+ return nil, err
}
-
- return nil
+ return &upgradeReq, err
}
-type Provider interface {
- Upgrade(upgrade *commontypes.NodeUpgradeJobRequest) error
+func upgrade(taskReq types.NodeTaskRequest) (event fsm.Event) {
+ opts := options.GetEdgeCoreOptions()
+ event = fsm.Event{
+ Type: "Upgrade",
+ }
+ upgradeReq, err := getTaskRequest(taskReq)
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.Msg = err.Error()
+ return
+ }
+ err = keadmUpgrade(*upgradeReq, opts)
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.Msg = err.Error()
+ }
+ return
}
-var (
- upgradeToolProviders = make(map[string]Provider)
- mutex sync.Mutex
-)
+func keadmUpgrade(upgradeReq commontypes.NodeUpgradeJobRequest, opts *options.EdgeCoreOptions) error {
+ klog.Infof("Begin to run upgrade command")
+ upgradeCmd := fmt.Sprintf("keadm upgrade --upgradeID %s --historyID %s --fromVersion %s --toVersion %s --config %s --image %s > /tmp/keadm.log 2>&1",
+ upgradeReq.UpgradeID, upgradeReq.HistoryID, version.Get(), upgradeReq.Version, opts.ConfigFile, upgradeReq.Image)
-func RegisterUpgradeProvider(upgradeToolType string, provider Provider) {
- mutex.Lock()
- defer mutex.Unlock()
- upgradeToolProviders[upgradeToolType] = provider
+ // run upgrade cmd to upgrade edge node
+ // use nohup command to start a child progress
+ command := fmt.Sprintf("nohup %s &", upgradeCmd)
+ cmd := exec.Command("bash", "-c", command)
+ s, err := cmd.CombinedOutput()
+ if err != nil {
+ return fmt.Errorf("run upgrade command %s failed: %v, %s", command, err, s)
+ }
+ klog.Infof("!!! Finish upgrade from Version %s to %s ...", version.Get(), upgradeReq.Version)
+ return nil
}
-type keadmUpgrade struct{}
-
-func (*keadmUpgrade) Upgrade(upgradeReq *commontypes.NodeUpgradeJobRequest) error {
- // get edgecore start options and config
- opts := options.GetEdgeCoreOptions()
+func prepareKeadm(upgradeReq *commontypes.NodeUpgradeJobRequest) error {
config := options.GetEdgeCoreConfig()
// install the requested installer keadm from docker image
@@ -119,7 +156,6 @@ func (*keadmUpgrade) Upgrade(upgradeReq *commontypes.NodeUpgradeJobRequest) erro
if err != nil {
return fmt.Errorf("failed to new container runtime: %v", err)
}
-
image := upgradeReq.Image
// TODO: do some verification 1.sha256(pass in using CRD) 2.image signature verification
@@ -135,22 +171,5 @@ func (*keadmUpgrade) Upgrade(upgradeReq *commontypes.NodeUpgradeJobRequest) erro
if err != nil {
return fmt.Errorf("failed to cp file from image to host: %v", err)
}
-
- klog.Infof("Begin to run upgrade command")
- upgradeCmd := fmt.Sprintf("keadm upgrade --upgradeID %s --historyID %s --fromVersion %s --toVersion %s --config %s --image %s > /tmp/keadm.log 2>&1",
- upgradeReq.UpgradeID, upgradeReq.HistoryID, version.Get(), upgradeReq.Version, opts.ConfigFile, image)
-
- // run upgrade cmd to upgrade edge node
- // use nohup command to start a child progress
- command := fmt.Sprintf("nohup %s &", upgradeCmd)
- cmd := exec.Command("bash", "-c", command)
- s, err := cmd.CombinedOutput()
- if err != nil {
- klog.Errorf("run upgrade command %s failed: %v, %s", command, err, s)
- return fmt.Errorf("run upgrade command %s failed: %v, %s", command, err, s)
- }
-
- klog.Infof("!!! Begin to upgrade from Version %s to %s ...", version.Get(), upgradeReq.Version)
-
return nil
}
diff --git a/edge/pkg/edgehub/task/taskexecutor/pre_check.go b/edge/pkg/edgehub/task/taskexecutor/pre_check.go
new file mode 100644
index 000000000..da68a2077
--- /dev/null
+++ b/edge/pkg/edgehub/task/taskexecutor/pre_check.go
@@ -0,0 +1,158 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package taskexecutor
+
+import (
+ "encoding/json"
+ "fmt"
+ "time"
+
+ "github.com/shirou/gopsutil/cpu"
+ "github.com/shirou/gopsutil/disk"
+ "github.com/shirou/gopsutil/v3/mem"
+
+ "github.com/kubeedge/kubeedge/common/types"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+)
+
+const (
+ MaxCPUUsage float64 = 80
+ MaxMemUsage float64 = 80
+ MaxDiskUsage float64 = 80
+)
+
+func preCheck(taskReq types.NodeTaskRequest) fsm.Event {
+ event := fsm.Event{
+ Type: "Check",
+ Action: api.ActionSuccess,
+ Msg: "",
+ }
+
+ data, err := json.Marshal(taskReq.Item)
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.Msg = err.Error()
+ return event
+ }
+ var checkItems types.NodePreCheckRequest
+ err = json.Unmarshal(data, &checkItems)
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.Msg = err.Error()
+ return event
+ }
+
+ var failed bool
+ var checkResult = map[string]string{}
+ var checkFunc = map[string]func() error{
+ "cpu": checkCPU,
+ "mem": checkMem,
+ "disk": checkDisk,
+ }
+ for _, item := range checkItems.CheckItem {
+ f, ok := checkFunc[item]
+ if !ok {
+ checkResult[item] = "check item not support"
+ continue
+ }
+ err = f()
+ if err != nil {
+ failed = true
+ checkResult[item] = err.Error()
+ continue
+ }
+ checkResult[item] = "ok"
+ }
+ if !failed {
+ return event
+ }
+ event.Action = api.ActionFailure
+ result, err := json.Marshal(checkResult)
+ if err != nil {
+ event.Msg = err.Error()
+ return event
+ }
+ event.Msg = string(result)
+ return event
+}
+
+func checkCPU() error {
+ cpuUsage, err := cpu.Percent(100*time.Millisecond, false)
+ if err != nil {
+ return err
+ }
+ var usage float64
+ for _, percpu := range cpuUsage {
+ usage += percpu / float64(len(cpuUsage))
+ }
+ if usage > MaxCPUUsage {
+ return fmt.Errorf("current cpu usage is %f, which exceeds the maximum allowed usage %f", usage, MaxCPUUsage)
+ }
+ return nil
+}
+
+func checkMem() error {
+ memInfo, err := mem.VirtualMemory()
+ if err != nil {
+ return err
+ }
+ memUsedPercent := memInfo.UsedPercent
+ if memUsedPercent > MaxMemUsage {
+ return fmt.Errorf("current mem usage is %f, which exceeds the maximum allowed usage %f", memUsedPercent, MaxMemUsage)
+ }
+ return nil
+}
+
+func checkDisk() error {
+ partitions, err := disk.Partitions(true)
+ if err != nil {
+ return err
+ }
+ var failed bool
+ var diskUsages = map[string]string{}
+ for _, part := range partitions {
+ usage, err := disk.Usage(part.Mountpoint)
+ if err != nil {
+ failed = true
+ diskUsages[part.Device] = err.Error()
+ continue
+ }
+ if usage.UsedPercent > MaxDiskUsage {
+ failed = true
+ diskUsages[part.Device] = fmt.Sprintf("current disk usage is %f, which exceeds the maximum allowed usage %f", usage.UsedPercent, MaxMemUsage)
+ continue
+ }
+ diskUsages[part.Device] = fmt.Sprintf("%f", usage.UsedPercent)
+ }
+ if !failed {
+ return nil
+ }
+ result, err := json.Marshal(diskUsages)
+ if err != nil {
+ return err
+ }
+ return fmt.Errorf(string(result))
+}
+
+func normalInit(types.NodeTaskRequest) fsm.Event {
+ return fsm.Event{
+ Type: "Init",
+ Action: api.ActionSuccess,
+ Msg: "",
+ }
+}
diff --git a/edge/pkg/edgehub/task/taskexecutor/task_executor.go b/edge/pkg/edgehub/task/taskexecutor/task_executor.go
new file mode 100644
index 000000000..fd3be1f88
--- /dev/null
+++ b/edge/pkg/edgehub/task/taskexecutor/task_executor.go
@@ -0,0 +1,93 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package taskexecutor
+
+import (
+ "fmt"
+
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/kubeedge/common/types"
+ "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+)
+
+func init() {
+ Register(TaskUpgrade, NewUpgradeExecutor())
+ Register(TaskPrePull, NewPrePullExecutor())
+}
+
+type Executor interface {
+ Name() string
+ Do(types.NodeTaskRequest) (fsm.Event, error)
+}
+
+type BaseExecutor struct {
+ name string
+ methods map[string]func(types.NodeTaskRequest) fsm.Event
+}
+
+func (be *BaseExecutor) Name() string {
+ return be.name
+}
+
+func NewBaseExecutor(name string, methods map[string]func(types.NodeTaskRequest) fsm.Event) *BaseExecutor {
+ return &BaseExecutor{
+ name: name,
+ methods: methods,
+ }
+}
+
+func (be *BaseExecutor) Do(taskReq types.NodeTaskRequest) (fsm.Event, error) {
+ method, ok := be.methods[taskReq.State]
+ if !ok {
+ err := fmt.Errorf("method %s in executor %s is not implemented", taskReq.State, taskReq.Type)
+ klog.Warning(err.Error())
+ return fsm.Event{}, err
+ }
+ return method(taskReq), nil
+}
+
+var (
+ executors = make(map[string]Executor)
+ CommonMethods = map[string]func(types.NodeTaskRequest) fsm.Event{
+ string(v1alpha1.TaskChecking): preCheck,
+ string(v1alpha1.TaskInit): normalInit,
+ }
+)
+
+func Register(name string, executor Executor) {
+ if _, ok := executors[name]; ok {
+ klog.Warning("executor %s exists ", name)
+ }
+ executors[name] = executor
+}
+
+func GetExecutor(name string) (Executor, error) {
+ executor, ok := executors[name]
+ if !ok {
+ return nil, fmt.Errorf("executor %s is not registered", name)
+ }
+ return executor, nil
+}
+
+func emptyInit(_ types.NodeTaskRequest) (event fsm.Event) {
+ return fsm.Event{
+ Type: "Init",
+ Action: v1alpha1.ActionSuccess,
+ }
+}
diff --git a/edge/pkg/edgehub/upgrade/image_prepull.go b/edge/pkg/edgehub/upgrade/image_prepull.go
deleted file mode 100644
index d02a4cdc9..000000000
--- a/edge/pkg/edgehub/upgrade/image_prepull.go
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
-Copyright 2023 The KubeEdge Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package upgrade
-
-import (
- "encoding/json"
- "fmt"
- "strings"
-
- v1 "k8s.io/api/core/v1"
- runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
- "k8s.io/klog/v2"
-
- beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
- "github.com/kubeedge/beehive/pkg/core/model"
- cloudmodules "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
- "github.com/kubeedge/kubeedge/common/constants"
- commontypes "github.com/kubeedge/kubeedge/common/types"
- "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options"
- "github.com/kubeedge/kubeedge/edge/pkg/common/modules"
- "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients"
- "github.com/kubeedge/kubeedge/edge/pkg/edgehub/common/msghandler"
- metaclient "github.com/kubeedge/kubeedge/edge/pkg/metamanager/client"
- "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util"
- "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
-)
-
-func init() {
- handler := &prepullHandler{}
- msghandler.RegisterHandler(handler)
-}
-
-type prepullHandler struct{}
-
-func (uh *prepullHandler) Filter(message *model.Message) bool {
- name := message.GetGroup()
- return name == cloudmodules.ImagePrePullControllerModuleGroup
-}
-
-func (uh *prepullHandler) Process(message *model.Message, clientHub clients.Adapter) error {
- // get edgecore config
- edgecoreconfig := options.GetEdgeCoreConfig()
- if edgecoreconfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint == "" {
- edgecoreconfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint = edgecoreconfig.Modules.Edged.RemoteRuntimeEndpoint
- }
- nodeName := edgecoreconfig.Modules.Edged.HostnameOverride
-
- var errmsg, jobName string
- defer func() {
- if errmsg != "" {
- sendResponseMessage(nodeName, jobName, errmsg, nil, v1alpha1.PrePullFailed)
- }
- }()
-
- jobName, err := parseJobName(message.GetResource())
- if err != nil {
- errmsg = fmt.Sprintf("failed to parse prepull resource, err: %v", err)
- return fmt.Errorf(errmsg)
- }
-
- // parse message request
- var prePullReq commontypes.ImagePrePullJobRequest
- data, err := message.GetContentData()
- if err != nil {
- errmsg = fmt.Sprintf("failed to get message content, err: %v", err)
- return fmt.Errorf(errmsg)
- }
- err = json.Unmarshal(data, &prePullReq)
- if err != nil {
- errmsg = fmt.Sprintf("failed to unmarshal message content, err: %v", err)
- return fmt.Errorf(errmsg)
- }
- if nodeName != prePullReq.NodeName {
- errmsg = fmt.Sprintf("request node name %s is not match with the edge node %s", prePullReq.NodeName, nodeName)
- return fmt.Errorf(errmsg)
- }
-
- // todo check items such as disk according to req.checkitem config
-
- // pull images
- container, err := util.NewContainerRuntime(edgecoreconfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint, edgecoreconfig.Modules.Edged.TailoredKubeletConfig.CgroupDriver)
- if err != nil {
- errmsg = fmt.Sprintf("failed to new container runtime: %v", err)
- return fmt.Errorf(errmsg)
- }
-
- go func() {
- pullMsg, resp, state := prePullImages(prePullReq, container)
- sendResponseMessage(nodeName, jobName, pullMsg, resp, state)
- }()
- return nil
-}
-
-func sendResponseMessage(nodeName, jobName, pullMsg string, imageStatus []v1alpha1.ImageStatus, state v1alpha1.PrePullState) {
- resp := commontypes.ImagePrePullJobResponse{
- NodeName: nodeName,
- State: state,
- Reason: pullMsg,
- ImageStatus: imageStatus,
- }
- msg := model.NewMessage("").SetRoute(modules.EdgeHubModuleName, modules.HubGroup).
- SetResourceOperation(fmt.Sprintf("node/%s/prepull/%s", nodeName, jobName), "prepull").FillBody(resp)
- beehiveContext.Send(modules.EdgeHubModuleName, *msg)
-}
-
-func prePullImages(prePullReq commontypes.ImagePrePullJobRequest, container util.ContainerRuntime) (string, []v1alpha1.ImageStatus, v1alpha1.PrePullState) {
- var errmsg string
- authConfig, err := makeAuthConfig(prePullReq.Secret)
- if err != nil {
- errmsg = fmt.Sprintf("failed to get prepull secret, err: %v", err)
- return errmsg, nil, v1alpha1.PrePullFailed
- }
-
- state := v1alpha1.PrePullSuccessful
- var imageStatus []v1alpha1.ImageStatus
- for _, image := range prePullReq.Images {
- prePullStatus := v1alpha1.ImageStatus{
- Image: image,
- }
- for i := 0; i < int(prePullReq.RetryTimes); i++ {
- err = container.PullImage(image, authConfig, nil)
- if err == nil {
- break
- }
- }
- if err != nil {
- klog.Errorf("pull image %s failed, err: %v", image, err)
- errmsg = "prepull images failed"
- state = v1alpha1.PrePullFailed
- prePullStatus.State = v1alpha1.PrePullFailed
- prePullStatus.Reason = err.Error()
- } else {
- klog.Infof("pull image %s successfully!", image)
- prePullStatus.State = v1alpha1.PrePullSuccessful
- }
- imageStatus = append(imageStatus, prePullStatus)
- }
-
- return errmsg, imageStatus, state
-}
-
-func parseJobName(resource string) (string, error) {
- var jobName string
- sli := strings.Split(resource, constants.ResourceSep)
- if len(sli) != 2 {
- return jobName, fmt.Errorf("the resource %s is not the standard type", resource)
- }
- return sli[1], nil
-}
-
-func makeAuthConfig(pullsecret string) (*runtimeapi.AuthConfig, error) {
- if pullsecret == "" {
- return nil, nil
- }
-
- secretSli := strings.Split(pullsecret, constants.ResourceSep)
- if len(secretSli) != 2 {
- return nil, fmt.Errorf("pull secret format is not correct")
- }
-
- client := metaclient.New()
- secret, err := client.Secrets(secretSli[0]).Get(secretSli[1])
- if err != nil {
- return nil, fmt.Errorf("get secret %s failed, %v", secretSli[1], err)
- }
-
- var auth runtimeapi.AuthConfig
- err = json.Unmarshal(secret.Data[v1.DockerConfigJsonKey], &auth)
- if err != nil {
- return nil, fmt.Errorf("unmarshal secret %s to auth file failed, %v", secretSli[1], err)
- }
-
- return &auth, nil
-}
diff --git a/edge/pkg/edgehub/upgrade/upgrade_test.go b/edge/pkg/edgehub/upgrade/upgrade_test.go
deleted file mode 100644
index 2660f9d79..000000000
--- a/edge/pkg/edgehub/upgrade/upgrade_test.go
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-Copyright 2022 The KubeEdge Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package upgrade
-
-import (
- "testing"
-
- "github.com/kubeedge/beehive/pkg/core/model"
-)
-
-func TestFilter(t *testing.T) {
- uh := &upgradeHandler{}
-
- tests := []struct {
- msg *model.Message
- want bool
- name string
- }{
- {
- msg: &model.Message{
- Router: model.MessageRoute{Group: "nodeupgradejobcontroller"},
- },
- want: true,
- name: "Node upgrade job controlled",
- },
- {
- msg: &model.Message{
- Router: model.MessageRoute{Group: "devicecontroller"},
- },
- want: false,
- name: "Device controller",
- },
- }
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- if got := uh.Filter(tt.msg); got != tt.want {
- t.Errorf("upgradeHandler.Filter() retuned unexpected result. got = %v, want = %v", got, tt.want)
- }
- })
- }
-}