diff options
| author | KubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com> | 2024-01-17 17:50:27 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-01-17 17:50:27 +0800 |
| commit | dadbeb0e72565822afca48cb3f66a17473d7d585 (patch) | |
| tree | 253eded79262e2f0d83a3da66ae945c9985eec0f /edge | |
| parent | Merge pull request #5321 from luomengY/ci-cri (diff) | |
| parent | support edge nodes upgrade and image pre pull (diff) | |
| download | kubeedge-dadbeb0e72565822afca48cb3f66a17473d7d585.tar.gz | |
Merge pull request #5330 from ZhengXinwei-F/task-manager
Implement task manager to complete cloud edge task execution
Diffstat (limited to 'edge')
| -rw-r--r-- | edge/pkg/common/util/util.go | 32 | ||||
| -rw-r--r-- | edge/pkg/edgehub/edgehub.go | 4 | ||||
| -rw-r--r-- | edge/pkg/edgehub/task/task_handler.go | 78 | ||||
| -rw-r--r-- | edge/pkg/edgehub/task/taskexecutor/image_prepull.go | 184 | ||||
| -rw-r--r-- | edge/pkg/edgehub/task/taskexecutor/node_backup.go | 106 | ||||
| -rw-r--r-- | edge/pkg/edgehub/task/taskexecutor/node_rollback.go | 72 | ||||
| -rw-r--r-- | edge/pkg/edgehub/task/taskexecutor/node_upgrade.go (renamed from edge/pkg/edgehub/upgrade/upgrade.go) | 169 | ||||
| -rw-r--r-- | edge/pkg/edgehub/task/taskexecutor/pre_check.go | 158 | ||||
| -rw-r--r-- | edge/pkg/edgehub/task/taskexecutor/task_executor.go | 93 | ||||
| -rw-r--r-- | edge/pkg/edgehub/upgrade/image_prepull.go | 188 | ||||
| -rw-r--r-- | edge/pkg/edgehub/upgrade/upgrade_test.go | 56 |
11 files changed, 819 insertions, 321 deletions
diff --git a/edge/pkg/common/util/util.go b/edge/pkg/common/util/util.go new file mode 100644 index 000000000..ec978c191 --- /dev/null +++ b/edge/pkg/common/util/util.go @@ -0,0 +1,32 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + + beehiveContext "github.com/kubeedge/beehive/pkg/core/context" + "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/edge/pkg/common/modules" +) + +func ReportTaskResult(taskType, taskID string, resp types.NodeTaskResponse) { + msg := model.NewMessage("").SetRoute(modules.EdgeHubModuleName, modules.HubGroup). + SetResourceOperation(fmt.Sprintf("task/%s/node/%s", taskID, resp.NodeName), taskType).FillBody(resp) + beehiveContext.Send(modules.EdgeHubModuleName, *msg) +} diff --git a/edge/pkg/edgehub/edgehub.go b/edge/pkg/edgehub/edgehub.go index 2498d07f2..bf63f1f9e 100644 --- a/edge/pkg/edgehub/edgehub.go +++ b/edge/pkg/edgehub/edgehub.go @@ -13,8 +13,8 @@ import ( "github.com/kubeedge/kubeedge/edge/pkg/edgehub/certificate" "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients" "github.com/kubeedge/kubeedge/edge/pkg/edgehub/config" - // register Upgrade handler - _ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/upgrade" + // register Task handler + _ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/task" "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha2" ) diff --git a/edge/pkg/edgehub/task/task_handler.go b/edge/pkg/edgehub/task/task_handler.go new file mode 100644 index 000000000..ac42d64bb --- /dev/null +++ b/edge/pkg/edgehub/task/task_handler.go @@ -0,0 +1,78 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package task + +import ( + "encoding/json" + "fmt" + + "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + commontypes "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options" + "github.com/kubeedge/kubeedge/edge/pkg/common/util" + "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients" + "github.com/kubeedge/kubeedge/edge/pkg/edgehub/common/msghandler" + "github.com/kubeedge/kubeedge/edge/pkg/edgehub/task/taskexecutor" +) + +func init() { + handler := &taskHandler{} + msghandler.RegisterHandler(handler) +} + +type taskHandler struct{} + +func (th *taskHandler) Filter(message *model.Message) bool { + name := message.GetGroup() + return name == modules.TaskManagerModuleName +} + +func (th *taskHandler) Process(message *model.Message, clientHub clients.Adapter) error { + taskReq := &commontypes.NodeTaskRequest{} + data, err := message.GetContentData() + if err != nil { + return fmt.Errorf("failed to get content data: %v", err) + } + err = json.Unmarshal(data, taskReq) + if err != nil { + return fmt.Errorf("unmarshal failed: %v", err) + } + executor, err := taskexecutor.GetExecutor(taskReq.Type) + if err != nil { + return err + } + event, err := executor.Do(*taskReq) + if err != nil { + return err + } + + // use external tool like keadm + //Or the task needs to use the goroutine to control the message reply itself. + if event.Action == "" { + return nil + } + + resp := commontypes.NodeTaskResponse{ + NodeName: options.GetEdgeCoreConfig().Modules.Edged.HostnameOverride, + Event: event.Type, + Action: event.Action, + Reason: event.Msg, + } + util.ReportTaskResult(taskReq.Type, taskReq.TaskID, resp) + return nil +} diff --git a/edge/pkg/edgehub/task/taskexecutor/image_prepull.go b/edge/pkg/edgehub/task/taskexecutor/image_prepull.go new file mode 100644 index 000000000..a2b19c8e7 --- /dev/null +++ b/edge/pkg/edgehub/task/taskexecutor/image_prepull.go @@ -0,0 +1,184 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package taskexecutor + +import ( + "encoding/json" + "fmt" + "strings" + + v1 "k8s.io/api/core/v1" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/klog/v2" + + "github.com/kubeedge/kubeedge/common/constants" + "github.com/kubeedge/kubeedge/common/types" + commontypes "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options" + edgeutil "github.com/kubeedge/kubeedge/edge/pkg/common/util" + metaclient "github.com/kubeedge/kubeedge/edge/pkg/metamanager/client" + "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" +) + +const ( + TaskPrePull = "prepull" +) + +type PrePull struct { + *BaseExecutor +} + +func (p *PrePull) Name() string { + return p.name +} + +func NewPrePullExecutor() Executor { + methods := map[string]func(types.NodeTaskRequest) fsm.Event{ + string(api.TaskChecking): preCheck, + string(api.TaskInit): emptyInit, + "": emptyInit, + string(api.PullingState): pullImages, + } + return &PrePull{ + BaseExecutor: NewBaseExecutor(TaskPrePull, methods), + } +} + +func pullImages(taskReq types.NodeTaskRequest) fsm.Event { + event := fsm.Event{ + Type: "Pull", + Action: api.ActionSuccess, + } + + // get edgecore config + edgeCoreConfig := options.GetEdgeCoreConfig() + if edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint == "" { + edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint = edgeCoreConfig.Modules.Edged.RemoteRuntimeEndpoint + } + + // parse message request + prePullReq, err := getImagePrePullJobRequest(taskReq) + if err != nil { + event.Msg = err.Error() + event.Action = api.ActionFailure + return event + } + + // pull images + container, err := util.NewContainerRuntime(edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint, edgeCoreConfig.Modules.Edged.TailoredKubeletConfig.CgroupDriver) + if err != nil { + event.Msg = err.Error() + event.Action = api.ActionFailure + return event + } + + go func() { + errorStr, imageStatus := prePullImages(*prePullReq, container) + if errorStr != "" { + event.Action = api.ActionFailure + event.Msg = errorStr + } + + data, err := json.Marshal(imageStatus) + if err != nil { + klog.Warningf("marshal imageStatus failed: %v", err) + } + resp := commontypes.NodeTaskResponse{ + NodeName: edgeCoreConfig.Modules.Edged.HostnameOverride, + Event: event.Type, + Action: event.Action, + Reason: event.Msg, + ExternalMessage: string(data), + } + edgeutil.ReportTaskResult(taskReq.Type, taskReq.TaskID, resp) + }() + return fsm.Event{} +} + +func getImagePrePullJobRequest(taskReq commontypes.NodeTaskRequest) (*commontypes.ImagePrePullJobRequest, error) { + var prePullReq commontypes.ImagePrePullJobRequest + data, err := json.Marshal(taskReq.Item) + if err != nil { + return nil, err + } + err = json.Unmarshal(data, &prePullReq) + if err != nil { + return nil, err + } + return &prePullReq, err +} + +func prePullImages(prePullReq commontypes.ImagePrePullJobRequest, container util.ContainerRuntime) (string, []v1alpha1.ImageStatus) { + errorStr := "" + authConfig, err := makeAuthConfig(prePullReq.Secret) + if err != nil { + return errorStr, []v1alpha1.ImageStatus{} + } + + var imageStatus []v1alpha1.ImageStatus + for _, image := range prePullReq.Images { + prePullStatus := v1alpha1.ImageStatus{ + Image: image, + } + for i := 0; i < int(prePullReq.RetryTimes); i++ { + err = container.PullImage(image, authConfig, nil) + if err == nil { + break + } + } + if err != nil { + klog.Errorf("pull image %s failed, err: %v", image, err) + errorStr = fmt.Sprintf("pull image failed, err: %v", err) + prePullStatus.State = api.TaskFailed + prePullStatus.Reason = err.Error() + } else { + klog.Infof("pull image %s successfully!", image) + prePullStatus.State = api.TaskSuccessful + } + imageStatus = append(imageStatus, prePullStatus) + } + + return errorStr, imageStatus +} + +func makeAuthConfig(pullsecret string) (*runtimeapi.AuthConfig, error) { + if pullsecret == "" { + return nil, nil + } + + secretSli := strings.Split(pullsecret, constants.ResourceSep) + if len(secretSli) != 2 { + return nil, fmt.Errorf("pull secret format is not correct") + } + + client := metaclient.New() + secret, err := client.Secrets(secretSli[0]).Get(secretSli[1]) + if err != nil { + return nil, fmt.Errorf("get secret %s failed, %v", secretSli[1], err) + } + + var auth runtimeapi.AuthConfig + err = json.Unmarshal(secret.Data[v1.DockerConfigJsonKey], &auth) + if err != nil { + return nil, fmt.Errorf("unmarshal secret %s to auth file failed, %v", secretSli[1], err) + } + + return &auth, nil +} diff --git a/edge/pkg/edgehub/task/taskexecutor/node_backup.go b/edge/pkg/edgehub/task/taskexecutor/node_backup.go new file mode 100644 index 000000000..1d4583978 --- /dev/null +++ b/edge/pkg/edgehub/task/taskexecutor/node_backup.go @@ -0,0 +1,106 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package taskexecutor + +import ( + "fmt" + "io" + "os" + "path/filepath" + + "k8s.io/klog/v2" + + "github.com/kubeedge/kubeedge/common/constants" + commontypes "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options" + "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" + "github.com/kubeedge/kubeedge/pkg/version" +) + +func backupNode(taskReq commontypes.NodeTaskRequest) (event fsm.Event) { + event = fsm.Event{ + Type: "Backup", + Action: api.ActionSuccess, + } + var err error + defer func() { + if err != nil { + event.Action = api.ActionFailure + event.Msg = err.Error() + } + }() + backupPath := filepath.Join(util.KubeEdgeBackupPath, version.Get().String()) + err = backup(backupPath) + if err != nil { + cleanErr := os.Remove(backupPath) + if cleanErr != nil { + klog.Warningf("clean backup path failed: %s", err.Error()) + } + return + } + return event +} + +func backup(backupPath string) error { + config := options.GetEdgeCoreConfig() + klog.Infof("backup start, backup path: %s", backupPath) + if err := os.MkdirAll(backupPath, 0750); err != nil { + return fmt.Errorf("mkdirall failed: %v", err) + } + + // backup edgecore.db: copy from origin path to backup path + if err := copy(config.DataBase.DataSource, filepath.Join(backupPath, "edgecore.db")); err != nil { + return fmt.Errorf("failed to backup db: %v", err) + } + // backup edgecore.yaml: copy from origin path to backup path + if err := copy(constants.DefaultConfigDir+"edgecore.yaml", filepath.Join(backupPath, "edgecore.yaml")); err != nil { + return fmt.Errorf("failed to back config: %v", err) + } + // backup edgecore: copy from origin path to backup path + if err := copy(filepath.Join(util.KubeEdgeUsrBinPath, util.KubeEdgeBinaryName), filepath.Join(backupPath, util.KubeEdgeBinaryName)); err != nil { + return fmt.Errorf("failed to backup edgecore: %v", err) + } + return nil +} + +func copy(src, dst string) error { + sourceFileStat, err := os.Stat(src) + if err != nil { + return err + } + + if !sourceFileStat.Mode().IsRegular() { + return fmt.Errorf("%s is not a regular file", src) + } + + source, err := os.Open(src) + if err != nil { + return err + } + defer source.Close() + + // copy file using src file mode + destination, err := os.OpenFile(dst, os.O_RDWR|os.O_CREATE|os.O_TRUNC, sourceFileStat.Mode()) + if err != nil { + return err + } + defer destination.Close() + _, err = io.Copy(destination, source) + return err +} diff --git a/edge/pkg/edgehub/task/taskexecutor/node_rollback.go b/edge/pkg/edgehub/task/taskexecutor/node_rollback.go new file mode 100644 index 000000000..85233dc45 --- /dev/null +++ b/edge/pkg/edgehub/task/taskexecutor/node_rollback.go @@ -0,0 +1,72 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package taskexecutor + +import ( + "fmt" + "os/exec" + + "k8s.io/klog/v2" + + commontypes "github.com/kubeedge/kubeedge/common/types" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" + "github.com/kubeedge/kubeedge/pkg/version" +) + +func rollbackNode(taskReq commontypes.NodeTaskRequest) (event fsm.Event) { + event = fsm.Event{ + Type: "Rollback", + Action: api.ActionSuccess, + } + var err error + defer func() { + if err != nil { + event.Action = api.ActionFailure + event.Msg = err.Error() + } + }() + + var upgradeReq *commontypes.NodeUpgradeJobRequest + upgradeReq, err = getTaskRequest(taskReq) + if err != nil { + return + } + + err = rollback(upgradeReq) + if err != nil { + return + } + return event +} + +func rollback(upgradeReq *commontypes.NodeUpgradeJobRequest) error { + klog.Infof("Begin to run rollback command") + rollBackCmd := fmt.Sprintf("keadm rollback edge --name %s --history %s >> /tmp/keadm.log 2>&1", + upgradeReq.UpgradeID, version.Get()) + + // run upgrade cmd to upgrade edge node + // use nohup command to start a child progress + command := fmt.Sprintf("nohup %s &", rollBackCmd) + cmd := exec.Command("bash", "-c", command) + s, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("run rollback command %s failed: %v, %s", command, err, s) + } + klog.Infof("!!! Finish rollback ") + return nil +} diff --git a/edge/pkg/edgehub/upgrade/upgrade.go b/edge/pkg/edgehub/task/taskexecutor/node_upgrade.go index e26cd50ca..8546ab96d 100644 --- a/edge/pkg/edgehub/upgrade/upgrade.go +++ b/edge/pkg/edgehub/task/taskexecutor/node_upgrade.go @@ -1,5 +1,5 @@ /* -Copyright 2022 The KubeEdge Authors. +Copyright 2023 The KubeEdge Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,100 +14,137 @@ See the License for the specific language governing permissions and limitations under the License. */ -package upgrade +package taskexecutor import ( "encoding/json" "fmt" "os/exec" "path/filepath" - "strings" - "sync" "k8s.io/klog/v2" - "github.com/kubeedge/beehive/pkg/core/model" - "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + "github.com/kubeedge/kubeedge/common/types" commontypes "github.com/kubeedge/kubeedge/common/types" "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options" - "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients" - "github.com/kubeedge/kubeedge/edge/pkg/edgehub/common/msghandler" "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" "github.com/kubeedge/kubeedge/pkg/version" ) -func init() { - handler := &upgradeHandler{} - msghandler.RegisterHandler(handler) +const ( + TaskUpgrade = "upgrade" +) - // register upgrade tool: keadm - // if not specify it, also use default upgrade tool: keadm - RegisterUpgradeProvider("", &keadmUpgrade{}) - RegisterUpgradeProvider("keadm", &keadmUpgrade{}) +type Upgrade struct { + *BaseExecutor } -type upgradeHandler struct{} - -func (uh *upgradeHandler) Filter(message *model.Message) bool { - name := message.GetGroup() - return name == modules.NodeUpgradeJobControllerModuleGroup +func (u *Upgrade) Name() string { + return u.name } -func (uh *upgradeHandler) Process(message *model.Message, clientHub clients.Adapter) error { - upgradeReq := &commontypes.NodeUpgradeJobRequest{} - data, err := message.GetContentData() - if err != nil { - return fmt.Errorf("failed to get content data: %v", err) +func NewUpgradeExecutor() Executor { + methods := map[string]func(types.NodeTaskRequest) fsm.Event{ + string(api.TaskChecking): preCheck, + string(api.TaskInit): initUpgrade, + "": initUpgrade, + string(api.BackingUpState): backupNode, + string(api.RollingBackState): rollbackNode, + string(api.UpgradingState): upgrade, } - err = json.Unmarshal(data, upgradeReq) - if err != nil { - return fmt.Errorf("unmarshal failed: %v", err) + return &Upgrade{ + BaseExecutor: NewBaseExecutor(TaskUpgrade, methods), } +} - err = validateUpgrade(upgradeReq) - if err != nil { - return fmt.Errorf("upgrade request is not valid: %v", err) +func initUpgrade(taskReq types.NodeTaskRequest) (event fsm.Event) { + event = fsm.Event{ + Type: "Init", + Action: api.ActionSuccess, } + var err error + defer func() { + if err != nil { + event.Action = api.ActionFailure + event.Msg = err.Error() + } + }() - tool := strings.ToLower(upgradeReq.UpgradeTool) - if _, ok := upgradeToolProviders[tool]; !ok { - return fmt.Errorf("not supported upgrade tool type: %v", upgradeReq.UpgradeTool) + var upgradeReq *commontypes.NodeUpgradeJobRequest + upgradeReq, err = getTaskRequest(taskReq) + if err != nil { + return } - return upgradeToolProviders[tool].Upgrade(upgradeReq) + if upgradeReq.UpgradeID == "" { + err = fmt.Errorf("upgradeID cannot be empty") + return + } + if upgradeReq.Version == version.Get().String() { + return fsm.Event{ + Type: "Upgrading", + Action: api.ActionSuccess, + } + } + err = prepareKeadm(upgradeReq) + if err != nil { + return + } + return event } -func validateUpgrade(upgrade *commontypes.NodeUpgradeJobRequest) error { - if upgrade.UpgradeID == "" { - return fmt.Errorf("upgradeID cannot be empty") +func getTaskRequest(taskReq commontypes.NodeTaskRequest) (*commontypes.NodeUpgradeJobRequest, error) { + data, err := json.Marshal(taskReq.Item) + if err != nil { + return nil, err } - if upgrade.Version == version.Get().String() { - return fmt.Errorf("edge node already on specific version, no need to upgrade") + var upgradeReq commontypes.NodeUpgradeJobRequest + err = json.Unmarshal(data, &upgradeReq) + if err != nil { + return nil, err } - - return nil + return &upgradeReq, err } -type Provider interface { - Upgrade(upgrade *commontypes.NodeUpgradeJobRequest) error +func upgrade(taskReq types.NodeTaskRequest) (event fsm.Event) { + opts := options.GetEdgeCoreOptions() + event = fsm.Event{ + Type: "Upgrade", + } + upgradeReq, err := getTaskRequest(taskReq) + if err != nil { + event.Action = api.ActionFailure + event.Msg = err.Error() + return + } + err = keadmUpgrade(*upgradeReq, opts) + if err != nil { + event.Action = api.ActionFailure + event.Msg = err.Error() + } + return } -var ( - upgradeToolProviders = make(map[string]Provider) - mutex sync.Mutex -) +func keadmUpgrade(upgradeReq commontypes.NodeUpgradeJobRequest, opts *options.EdgeCoreOptions) error { + klog.Infof("Begin to run upgrade command") + upgradeCmd := fmt.Sprintf("keadm upgrade --upgradeID %s --historyID %s --fromVersion %s --toVersion %s --config %s --image %s > /tmp/keadm.log 2>&1", + upgradeReq.UpgradeID, upgradeReq.HistoryID, version.Get(), upgradeReq.Version, opts.ConfigFile, upgradeReq.Image) -func RegisterUpgradeProvider(upgradeToolType string, provider Provider) { - mutex.Lock() - defer mutex.Unlock() - upgradeToolProviders[upgradeToolType] = provider + // run upgrade cmd to upgrade edge node + // use nohup command to start a child progress + command := fmt.Sprintf("nohup %s &", upgradeCmd) + cmd := exec.Command("bash", "-c", command) + s, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("run upgrade command %s failed: %v, %s", command, err, s) + } + klog.Infof("!!! Finish upgrade from Version %s to %s ...", version.Get(), upgradeReq.Version) + return nil } -type keadmUpgrade struct{} - -func (*keadmUpgrade) Upgrade(upgradeReq *commontypes.NodeUpgradeJobRequest) error { - // get edgecore start options and config - opts := options.GetEdgeCoreOptions() +func prepareKeadm(upgradeReq *commontypes.NodeUpgradeJobRequest) error { config := options.GetEdgeCoreConfig() // install the requested installer keadm from docker image @@ -119,7 +156,6 @@ func (*keadmUpgrade) Upgrade(upgradeReq *commontypes.NodeUpgradeJobRequest) erro if err != nil { return fmt.Errorf("failed to new container runtime: %v", err) } - image := upgradeReq.Image // TODO: do some verification 1.sha256(pass in using CRD) 2.image signature verification @@ -135,22 +171,5 @@ func (*keadmUpgrade) Upgrade(upgradeReq *commontypes.NodeUpgradeJobRequest) erro if err != nil { return fmt.Errorf("failed to cp file from image to host: %v", err) } - - klog.Infof("Begin to run upgrade command") - upgradeCmd := fmt.Sprintf("keadm upgrade --upgradeID %s --historyID %s --fromVersion %s --toVersion %s --config %s --image %s > /tmp/keadm.log 2>&1", - upgradeReq.UpgradeID, upgradeReq.HistoryID, version.Get(), upgradeReq.Version, opts.ConfigFile, image) - - // run upgrade cmd to upgrade edge node - // use nohup command to start a child progress - command := fmt.Sprintf("nohup %s &", upgradeCmd) - cmd := exec.Command("bash", "-c", command) - s, err := cmd.CombinedOutput() - if err != nil { - klog.Errorf("run upgrade command %s failed: %v, %s", command, err, s) - return fmt.Errorf("run upgrade command %s failed: %v, %s", command, err, s) - } - - klog.Infof("!!! Begin to upgrade from Version %s to %s ...", version.Get(), upgradeReq.Version) - return nil } diff --git a/edge/pkg/edgehub/task/taskexecutor/pre_check.go b/edge/pkg/edgehub/task/taskexecutor/pre_check.go new file mode 100644 index 000000000..da68a2077 --- /dev/null +++ b/edge/pkg/edgehub/task/taskexecutor/pre_check.go @@ -0,0 +1,158 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package taskexecutor + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/disk" + "github.com/shirou/gopsutil/v3/mem" + + "github.com/kubeedge/kubeedge/common/types" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" +) + +const ( + MaxCPUUsage float64 = 80 + MaxMemUsage float64 = 80 + MaxDiskUsage float64 = 80 +) + +func preCheck(taskReq types.NodeTaskRequest) fsm.Event { + event := fsm.Event{ + Type: "Check", + Action: api.ActionSuccess, + Msg: "", + } + + data, err := json.Marshal(taskReq.Item) + if err != nil { + event.Action = api.ActionFailure + event.Msg = err.Error() + return event + } + var checkItems types.NodePreCheckRequest + err = json.Unmarshal(data, &checkItems) + if err != nil { + event.Action = api.ActionFailure + event.Msg = err.Error() + return event + } + + var failed bool + var checkResult = map[string]string{} + var checkFunc = map[string]func() error{ + "cpu": checkCPU, + "mem": checkMem, + "disk": checkDisk, + } + for _, item := range checkItems.CheckItem { + f, ok := checkFunc[item] + if !ok { + checkResult[item] = "check item not support" + continue + } + err = f() + if err != nil { + failed = true + checkResult[item] = err.Error() + continue + } + checkResult[item] = "ok" + } + if !failed { + return event + } + event.Action = api.ActionFailure + result, err := json.Marshal(checkResult) + if err != nil { + event.Msg = err.Error() + return event + } + event.Msg = string(result) + return event +} + +func checkCPU() error { + cpuUsage, err := cpu.Percent(100*time.Millisecond, false) + if err != nil { + return err + } + var usage float64 + for _, percpu := range cpuUsage { + usage += percpu / float64(len(cpuUsage)) + } + if usage > MaxCPUUsage { + return fmt.Errorf("current cpu usage is %f, which exceeds the maximum allowed usage %f", usage, MaxCPUUsage) + } + return nil +} + +func checkMem() error { + memInfo, err := mem.VirtualMemory() + if err != nil { + return err + } + memUsedPercent := memInfo.UsedPercent + if memUsedPercent > MaxMemUsage { + return fmt.Errorf("current mem usage is %f, which exceeds the maximum allowed usage %f", memUsedPercent, MaxMemUsage) + } + return nil +} + +func checkDisk() error { + partitions, err := disk.Partitions(true) + if err != nil { + return err + } + var failed bool + var diskUsages = map[string]string{} + for _, part := range partitions { + usage, err := disk.Usage(part.Mountpoint) + if err != nil { + failed = true + diskUsages[part.Device] = err.Error() + continue + } + if usage.UsedPercent > MaxDiskUsage { + failed = true + diskUsages[part.Device] = fmt.Sprintf("current disk usage is %f, which exceeds the maximum allowed usage %f", usage.UsedPercent, MaxMemUsage) + continue + } + diskUsages[part.Device] = fmt.Sprintf("%f", usage.UsedPercent) + } + if !failed { + return nil + } + result, err := json.Marshal(diskUsages) + if err != nil { + return err + } + return fmt.Errorf(string(result)) +} + +func normalInit(types.NodeTaskRequest) fsm.Event { + return fsm.Event{ + Type: "Init", + Action: api.ActionSuccess, + Msg: "", + } +} diff --git a/edge/pkg/edgehub/task/taskexecutor/task_executor.go b/edge/pkg/edgehub/task/taskexecutor/task_executor.go new file mode 100644 index 000000000..fd3be1f88 --- /dev/null +++ b/edge/pkg/edgehub/task/taskexecutor/task_executor.go @@ -0,0 +1,93 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package taskexecutor + +import ( + "fmt" + + "k8s.io/klog/v2" + + "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" +) + +func init() { + Register(TaskUpgrade, NewUpgradeExecutor()) + Register(TaskPrePull, NewPrePullExecutor()) +} + +type Executor interface { + Name() string + Do(types.NodeTaskRequest) (fsm.Event, error) +} + +type BaseExecutor struct { + name string + methods map[string]func(types.NodeTaskRequest) fsm.Event +} + +func (be *BaseExecutor) Name() string { + return be.name +} + +func NewBaseExecutor(name string, methods map[string]func(types.NodeTaskRequest) fsm.Event) *BaseExecutor { + return &BaseExecutor{ + name: name, + methods: methods, + } +} + +func (be *BaseExecutor) Do(taskReq types.NodeTaskRequest) (fsm.Event, error) { + method, ok := be.methods[taskReq.State] + if !ok { + err := fmt.Errorf("method %s in executor %s is not implemented", taskReq.State, taskReq.Type) + klog.Warning(err.Error()) + return fsm.Event{}, err + } + return method(taskReq), nil +} + +var ( + executors = make(map[string]Executor) + CommonMethods = map[string]func(types.NodeTaskRequest) fsm.Event{ + string(v1alpha1.TaskChecking): preCheck, + string(v1alpha1.TaskInit): normalInit, + } +) + +func Register(name string, executor Executor) { + if _, ok := executors[name]; ok { + klog.Warning("executor %s exists ", name) + } + executors[name] = executor +} + +func GetExecutor(name string) (Executor, error) { + executor, ok := executors[name] + if !ok { + return nil, fmt.Errorf("executor %s is not registered", name) + } + return executor, nil +} + +func emptyInit(_ types.NodeTaskRequest) (event fsm.Event) { + return fsm.Event{ + Type: "Init", + Action: v1alpha1.ActionSuccess, + } +} diff --git a/edge/pkg/edgehub/upgrade/image_prepull.go b/edge/pkg/edgehub/upgrade/image_prepull.go deleted file mode 100644 index d02a4cdc9..000000000 --- a/edge/pkg/edgehub/upgrade/image_prepull.go +++ /dev/null @@ -1,188 +0,0 @@ -/* -Copyright 2023 The KubeEdge Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package upgrade - -import ( - "encoding/json" - "fmt" - "strings" - - v1 "k8s.io/api/core/v1" - runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" - "k8s.io/klog/v2" - - beehiveContext "github.com/kubeedge/beehive/pkg/core/context" - "github.com/kubeedge/beehive/pkg/core/model" - cloudmodules "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" - "github.com/kubeedge/kubeedge/common/constants" - commontypes "github.com/kubeedge/kubeedge/common/types" - "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options" - "github.com/kubeedge/kubeedge/edge/pkg/common/modules" - "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients" - "github.com/kubeedge/kubeedge/edge/pkg/edgehub/common/msghandler" - metaclient "github.com/kubeedge/kubeedge/edge/pkg/metamanager/client" - "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util" - "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" -) - -func init() { - handler := &prepullHandler{} - msghandler.RegisterHandler(handler) -} - -type prepullHandler struct{} - -func (uh *prepullHandler) Filter(message *model.Message) bool { - name := message.GetGroup() - return name == cloudmodules.ImagePrePullControllerModuleGroup -} - -func (uh *prepullHandler) Process(message *model.Message, clientHub clients.Adapter) error { - // get edgecore config - edgecoreconfig := options.GetEdgeCoreConfig() - if edgecoreconfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint == "" { - edgecoreconfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint = edgecoreconfig.Modules.Edged.RemoteRuntimeEndpoint - } - nodeName := edgecoreconfig.Modules.Edged.HostnameOverride - - var errmsg, jobName string - defer func() { - if errmsg != "" { - sendResponseMessage(nodeName, jobName, errmsg, nil, v1alpha1.PrePullFailed) - } - }() - - jobName, err := parseJobName(message.GetResource()) - if err != nil { - errmsg = fmt.Sprintf("failed to parse prepull resource, err: %v", err) - return fmt.Errorf(errmsg) - } - - // parse message request - var prePullReq commontypes.ImagePrePullJobRequest - data, err := message.GetContentData() - if err != nil { - errmsg = fmt.Sprintf("failed to get message content, err: %v", err) - return fmt.Errorf(errmsg) - } - err = json.Unmarshal(data, &prePullReq) - if err != nil { - errmsg = fmt.Sprintf("failed to unmarshal message content, err: %v", err) - return fmt.Errorf(errmsg) - } - if nodeName != prePullReq.NodeName { - errmsg = fmt.Sprintf("request node name %s is not match with the edge node %s", prePullReq.NodeName, nodeName) - return fmt.Errorf(errmsg) - } - - // todo check items such as disk according to req.checkitem config - - // pull images - container, err := util.NewContainerRuntime(edgecoreconfig.Modules.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint, edgecoreconfig.Modules.Edged.TailoredKubeletConfig.CgroupDriver) - if err != nil { - errmsg = fmt.Sprintf("failed to new container runtime: %v", err) - return fmt.Errorf(errmsg) - } - - go func() { - pullMsg, resp, state := prePullImages(prePullReq, container) - sendResponseMessage(nodeName, jobName, pullMsg, resp, state) - }() - return nil -} - -func sendResponseMessage(nodeName, jobName, pullMsg string, imageStatus []v1alpha1.ImageStatus, state v1alpha1.PrePullState) { - resp := commontypes.ImagePrePullJobResponse{ - NodeName: nodeName, - State: state, - Reason: pullMsg, - ImageStatus: imageStatus, - } - msg := model.NewMessage("").SetRoute(modules.EdgeHubModuleName, modules.HubGroup). - SetResourceOperation(fmt.Sprintf("node/%s/prepull/%s", nodeName, jobName), "prepull").FillBody(resp) - beehiveContext.Send(modules.EdgeHubModuleName, *msg) -} - -func prePullImages(prePullReq commontypes.ImagePrePullJobRequest, container util.ContainerRuntime) (string, []v1alpha1.ImageStatus, v1alpha1.PrePullState) { - var errmsg string - authConfig, err := makeAuthConfig(prePullReq.Secret) - if err != nil { - errmsg = fmt.Sprintf("failed to get prepull secret, err: %v", err) - return errmsg, nil, v1alpha1.PrePullFailed - } - - state := v1alpha1.PrePullSuccessful - var imageStatus []v1alpha1.ImageStatus - for _, image := range prePullReq.Images { - prePullStatus := v1alpha1.ImageStatus{ - Image: image, - } - for i := 0; i < int(prePullReq.RetryTimes); i++ { - err = container.PullImage(image, authConfig, nil) - if err == nil { - break - } - } - if err != nil { - klog.Errorf("pull image %s failed, err: %v", image, err) - errmsg = "prepull images failed" - state = v1alpha1.PrePullFailed - prePullStatus.State = v1alpha1.PrePullFailed - prePullStatus.Reason = err.Error() - } else { - klog.Infof("pull image %s successfully!", image) - prePullStatus.State = v1alpha1.PrePullSuccessful - } - imageStatus = append(imageStatus, prePullStatus) - } - - return errmsg, imageStatus, state -} - -func parseJobName(resource string) (string, error) { - var jobName string - sli := strings.Split(resource, constants.ResourceSep) - if len(sli) != 2 { - return jobName, fmt.Errorf("the resource %s is not the standard type", resource) - } - return sli[1], nil -} - -func makeAuthConfig(pullsecret string) (*runtimeapi.AuthConfig, error) { - if pullsecret == "" { - return nil, nil - } - - secretSli := strings.Split(pullsecret, constants.ResourceSep) - if len(secretSli) != 2 { - return nil, fmt.Errorf("pull secret format is not correct") - } - - client := metaclient.New() - secret, err := client.Secrets(secretSli[0]).Get(secretSli[1]) - if err != nil { - return nil, fmt.Errorf("get secret %s failed, %v", secretSli[1], err) - } - - var auth runtimeapi.AuthConfig - err = json.Unmarshal(secret.Data[v1.DockerConfigJsonKey], &auth) - if err != nil { - return nil, fmt.Errorf("unmarshal secret %s to auth file failed, %v", secretSli[1], err) - } - - return &auth, nil -} diff --git a/edge/pkg/edgehub/upgrade/upgrade_test.go b/edge/pkg/edgehub/upgrade/upgrade_test.go deleted file mode 100644 index 2660f9d79..000000000 --- a/edge/pkg/edgehub/upgrade/upgrade_test.go +++ /dev/null @@ -1,56 +0,0 @@ -/* -Copyright 2022 The KubeEdge Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package upgrade - -import ( - "testing" - - "github.com/kubeedge/beehive/pkg/core/model" -) - -func TestFilter(t *testing.T) { - uh := &upgradeHandler{} - - tests := []struct { - msg *model.Message - want bool - name string - }{ - { - msg: &model.Message{ - Router: model.MessageRoute{Group: "nodeupgradejobcontroller"}, - }, - want: true, - name: "Node upgrade job controlled", - }, - { - msg: &model.Message{ - Router: model.MessageRoute{Group: "devicecontroller"}, - }, - want: false, - name: "Device controller", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := uh.Filter(tt.msg); got != tt.want { - t.Errorf("upgradeHandler.Filter() retuned unexpected result. got = %v, want = %v", got, tt.want) - } - }) - } -} |
