diff options
| author | zhengxinwei <zhengxinwei@huawei.com> | 2024-01-04 15:27:04 +0800 |
|---|---|---|
| committer | zhengxinwei-f <zhengxinwei@huawei.com> | 2024-01-16 09:35:44 +0800 |
| commit | b1c1c6a8e5442fff2b4d932060648e009f639df1 (patch) | |
| tree | 575f35f9559894e995060f15b029ff73f2d23282 /edge/pkg | |
| parent | notify the task manager of the task status (diff) | |
| download | kubeedge-b1c1c6a8e5442fff2b4d932060648e009f639df1.tar.gz | |
Implement task manager to complete cloud edge task execution
Signed-off-by: zhengxinwei-f <zhengxinwei@huawei.com>
Diffstat (limited to 'edge/pkg')
| -rw-r--r-- | edge/pkg/common/util/util.go | 75 | ||||
| -rw-r--r-- | edge/pkg/edgehub/edgehub.go | 4 | ||||
| -rw-r--r-- | edge/pkg/edgehub/task/task_handler.go | 57 | ||||
| -rw-r--r-- | edge/pkg/edgehub/task/taskexecutor/pre_check.go | 142 | ||||
| -rw-r--r-- | edge/pkg/edgehub/task/taskexecutor/task_executor.go | 69 |
5 files changed, 345 insertions, 2 deletions
diff --git a/edge/pkg/common/util/util.go b/edge/pkg/common/util/util.go new file mode 100644 index 000000000..9bf697c00 --- /dev/null +++ b/edge/pkg/common/util/util.go @@ -0,0 +1,75 @@ +package util + +import ( + "bytes" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "net" + "net/http" + "os" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha2" + "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" +) + +const ISO8601UTC = "2006-01-02T15:04:05Z" + +func ReportUpgradeResult(config *v1alpha2.EdgeCoreConfig, taskType, taskID string, event fsm.Event) error { + resp := &v1alpha1.TaskStatus{ + NodeName: config.Modules.Edged.HostnameOverride, + Event: event.Type, + Action: event.Action, + Time: time.Now().Format(ISO8601UTC), + Reason: event.ErrorMsg, + } + edgeHub := config.Modules.EdgeHub + var caCrt []byte + caCertPath := edgeHub.TLSCAFile + caCrt, err := os.ReadFile(caCertPath) + if err != nil { + return fmt.Errorf("failed to read ca: %v", err) + } + + rootCAs := x509.NewCertPool() + rootCAs.AppendCertsFromPEM(caCrt) + + certFile := edgeHub.TLSCertFile + keyFile := edgeHub.TLSPrivateKeyFile + cliCrt, err := tls.LoadX509KeyPair(certFile, keyFile) + + transport := &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + // use TLS configuration + TLSClientConfig: &tls.Config{ + RootCAs: rootCAs, + InsecureSkipVerify: false, + Certificates: []tls.Certificate{cliCrt}, + }, + } + + client := &http.Client{Transport: transport, Timeout: 30 * time.Second} + + respData, err := json.Marshal(resp) + if err != nil { + return fmt.Errorf("marshal failed: %v", err) + } + url := edgeHub.HTTPServer + fmt.Sprintf("/task/%s/name/%s/node/%s/status", taskType, taskID, config.Modules.Edged.HostnameOverride) + result, err := client.Post(url, "application/json", bytes.NewReader(respData)) + + if err != nil { + return fmt.Errorf("post http request failed: %v", err) + } + klog.Error("report result ", result) + defer result.Body.Close() + + return nil +} diff --git a/edge/pkg/edgehub/edgehub.go b/edge/pkg/edgehub/edgehub.go index 2498d07f2..bf63f1f9e 100644 --- a/edge/pkg/edgehub/edgehub.go +++ b/edge/pkg/edgehub/edgehub.go @@ -13,8 +13,8 @@ import ( "github.com/kubeedge/kubeedge/edge/pkg/edgehub/certificate" "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients" "github.com/kubeedge/kubeedge/edge/pkg/edgehub/config" - // register Upgrade handler - _ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/upgrade" + // register Task handler + _ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/task" "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha2" ) diff --git a/edge/pkg/edgehub/task/task_handler.go b/edge/pkg/edgehub/task/task_handler.go new file mode 100644 index 000000000..bd1588234 --- /dev/null +++ b/edge/pkg/edgehub/task/task_handler.go @@ -0,0 +1,57 @@ +package task + +import ( + "encoding/json" + "fmt" + + "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" + commontypes "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options" + "github.com/kubeedge/kubeedge/edge/pkg/common/util" + "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients" + "github.com/kubeedge/kubeedge/edge/pkg/edgehub/common/msghandler" + "github.com/kubeedge/kubeedge/edge/pkg/edgehub/task/taskexecutor" +) + +func init() { + handler := &taskHandler{} + msghandler.RegisterHandler(handler) +} + +type taskHandler struct{} + +func (th *taskHandler) Filter(message *model.Message) bool { + name := message.GetGroup() + return name == modules.TaskManagerModuleName +} + +func (th *taskHandler) Process(message *model.Message, clientHub clients.Adapter) error { + taskReq := &commontypes.NodeTaskRequest{} + data, err := message.GetContentData() + if err != nil { + return fmt.Errorf("failed to get content data: %v", err) + } + err = json.Unmarshal(data, taskReq) + if err != nil { + return fmt.Errorf("unmarshal failed: %v", err) + } + executor, err := taskexecutor.GetExecutor(taskReq.Type) + if err != nil { + return err + } + event, err := executor.Do(*taskReq) + if err != nil { + return err + } + + // use external tool like keadm + if event.Action == "" { + return nil + } + err = util.ReportUpgradeResult(options.GetEdgeCoreConfig(), taskReq.Type, taskReq.TaskID, event) + if err != nil { + return err + } + return nil +} diff --git a/edge/pkg/edgehub/task/taskexecutor/pre_check.go b/edge/pkg/edgehub/task/taskexecutor/pre_check.go new file mode 100644 index 000000000..9a96c5a70 --- /dev/null +++ b/edge/pkg/edgehub/task/taskexecutor/pre_check.go @@ -0,0 +1,142 @@ +package taskexecutor + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/disk" + "github.com/shirou/gopsutil/v3/mem" + + "github.com/kubeedge/kubeedge/common/types" + api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" +) + +const ( + MaxCPUUsage float64 = 80 + MaxMemUsage float64 = 80 + MaxDiskUsage float64 = 80 +) + +func preCheck(taskReq types.NodeTaskRequest) fsm.Event { + event := fsm.Event{ + Type: "Check", + Action: api.ActionSuccess, + ErrorMsg: "", + } + + data, err := json.Marshal(taskReq.Item) + if err != nil { + event.Action = api.ActionFailure + event.ErrorMsg = err.Error() + return event + } + var checkItems types.NodePreCheckRequest + err = json.Unmarshal(data, &checkItems) + if err != nil { + event.Action = api.ActionFailure + event.ErrorMsg = err.Error() + return event + } + + var failed bool + var checkResult = map[string]string{} + var checkFunc = map[string]func() error{ + "cpu": checkCPU, + "mem": checkMem, + "disk": checkDisk, + } + for _, item := range checkItems.CheckItem { + f, ok := checkFunc[item] + if !ok { + checkResult[item] = "check item not support" + continue + } + err = f() + if err != nil { + failed = true + checkResult[item] = err.Error() + continue + } + checkResult[item] = "ok" + } + if !failed { + return event + } + event.Action = api.ActionFailure + result, err := json.Marshal(checkResult) + if err != nil { + event.ErrorMsg = err.Error() + return event + } + event.ErrorMsg = string(result) + return event +} + +func checkCPU() error { + cpuUsage, err := cpu.Percent(100*time.Millisecond, false) + if err != nil { + return err + } + var usage float64 + for _, percpu := range cpuUsage { + usage += percpu / float64(len(cpuUsage)) + } + if usage > MaxCPUUsage { + return fmt.Errorf("current cpu usage is %f, which exceeds the maximum allowed usage %f", usage, MaxCPUUsage) + } + return nil +} + +func checkMem() error { + memInfo, err := mem.VirtualMemory() + if err != nil { + return err + } + memUsedPercent := memInfo.UsedPercent + if memUsedPercent > MaxMemUsage { + return fmt.Errorf("current mem usage is %f, which exceeds the maximum allowed usage %f", memUsedPercent, MaxMemUsage) + } + return nil +} + +func checkDisk() error { + partitions, err := disk.Partitions(true) + if err != nil { + return err + } + var failed bool + var diskUsages = map[string]string{} + for _, part := range partitions { + usage, err := disk.Usage(part.Mountpoint) + if err != nil { + failed = true + diskUsages[part.Device] = err.Error() + continue + } + if usage.UsedPercent > MaxDiskUsage { + failed = true + diskUsages[part.Device] = fmt.Sprintf("current disk usage is %f, which exceeds the maximum allowed usage %f", usage.UsedPercent, MaxMemUsage) + continue + } + diskUsages[part.Device] = fmt.Sprintf("%f", usage.UsedPercent) + } + if !failed { + return nil + } + result, err := json.Marshal(diskUsages) + if err != nil { + return err + } + return fmt.Errorf(string(result)) +} + +func normalInit(types.NodeTaskRequest) fsm.Event { + return fsm.Event{ + Type: "Init", + Action: api.ActionSuccess, + ErrorMsg: "", + } +} diff --git a/edge/pkg/edgehub/task/taskexecutor/task_executor.go b/edge/pkg/edgehub/task/taskexecutor/task_executor.go new file mode 100644 index 000000000..bf88fdc37 --- /dev/null +++ b/edge/pkg/edgehub/task/taskexecutor/task_executor.go @@ -0,0 +1,69 @@ +package taskexecutor + +import ( + "fmt" + + "k8s.io/klog/v2" + + "github.com/kubeedge/kubeedge/common/types" + "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1" + "github.com/kubeedge/kubeedge/pkg/util/fsm" +) + +func init() { + Register(TaskUpgrade, NewUpgradeExecutor()) +} + +type Executor interface { + Name() string + Do(types.NodeTaskRequest) (fsm.Event, error) +} + +type BaseExecutor struct { + name string + methods map[string]func(types.NodeTaskRequest) fsm.Event +} + +func (be *BaseExecutor) Name() string { + return be.name +} + +func NewBaseExecutor(name string, methods map[string]func(types.NodeTaskRequest) fsm.Event) *BaseExecutor { + return &BaseExecutor{ + name: name, + methods: methods, + } +} + +func (be *BaseExecutor) Do(taskReq types.NodeTaskRequest) (fsm.Event, error) { + method, ok := be.methods[taskReq.State] + if !ok { + err := fmt.Errorf("method %s in executor %s is not implemented", taskReq.State, taskReq.Type) + klog.Warning(err.Error()) + return fsm.Event{}, err + } + return method(taskReq), nil +} + +var ( + executors = make(map[string]Executor) + CommonMethods = map[string]func(types.NodeTaskRequest) fsm.Event{ + string(v1alpha1.TaskChecking): preCheck, + string(v1alpha1.TaskInit): normalInit, + } +) + +func Register(name string, executor Executor) { + if _, ok := executors[name]; ok { + klog.Warning("executor %s exists ", name) + } + executors[name] = executor +} + +func GetExecutor(name string) (Executor, error) { + executor, ok := executors[name] + if !ok { + return nil, fmt.Errorf("executor %s is not registered", name) + } + return executor, nil +} |
