summaryrefslogtreecommitdiff
path: root/edge/pkg
diff options
context:
space:
mode:
authorzhengxinwei <zhengxinwei@huawei.com>2024-01-04 15:27:04 +0800
committerzhengxinwei-f <zhengxinwei@huawei.com>2024-01-16 09:35:44 +0800
commitb1c1c6a8e5442fff2b4d932060648e009f639df1 (patch)
tree575f35f9559894e995060f15b029ff73f2d23282 /edge/pkg
parentnotify the task manager of the task status (diff)
downloadkubeedge-b1c1c6a8e5442fff2b4d932060648e009f639df1.tar.gz
Implement task manager to complete cloud edge task execution
Signed-off-by: zhengxinwei-f <zhengxinwei@huawei.com>
Diffstat (limited to 'edge/pkg')
-rw-r--r--edge/pkg/common/util/util.go75
-rw-r--r--edge/pkg/edgehub/edgehub.go4
-rw-r--r--edge/pkg/edgehub/task/task_handler.go57
-rw-r--r--edge/pkg/edgehub/task/taskexecutor/pre_check.go142
-rw-r--r--edge/pkg/edgehub/task/taskexecutor/task_executor.go69
5 files changed, 345 insertions, 2 deletions
diff --git a/edge/pkg/common/util/util.go b/edge/pkg/common/util/util.go
new file mode 100644
index 000000000..9bf697c00
--- /dev/null
+++ b/edge/pkg/common/util/util.go
@@ -0,0 +1,75 @@
+package util
+
+import (
+ "bytes"
+ "crypto/tls"
+ "crypto/x509"
+ "encoding/json"
+ "fmt"
+ "net"
+ "net/http"
+ "os"
+ "time"
+
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha2"
+ "github.com/kubeedge/kubeedge/pkg/apis/operations/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+)
+
+const ISO8601UTC = "2006-01-02T15:04:05Z"
+
+func ReportUpgradeResult(config *v1alpha2.EdgeCoreConfig, taskType, taskID string, event fsm.Event) error {
+ resp := &v1alpha1.TaskStatus{
+ NodeName: config.Modules.Edged.HostnameOverride,
+ Event: event.Type,
+ Action: event.Action,
+ Time: time.Now().Format(ISO8601UTC),
+ Reason: event.ErrorMsg,
+ }
+ edgeHub := config.Modules.EdgeHub
+ var caCrt []byte
+ caCertPath := edgeHub.TLSCAFile
+ caCrt, err := os.ReadFile(caCertPath)
+ if err != nil {
+ return fmt.Errorf("failed to read ca: %v", err)
+ }
+
+ rootCAs := x509.NewCertPool()
+ rootCAs.AppendCertsFromPEM(caCrt)
+
+ certFile := edgeHub.TLSCertFile
+ keyFile := edgeHub.TLSPrivateKeyFile
+ cliCrt, err := tls.LoadX509KeyPair(certFile, keyFile)
+
+ transport := &http.Transport{
+ DialContext: (&net.Dialer{
+ Timeout: 30 * time.Second,
+ KeepAlive: 30 * time.Second,
+ }).DialContext,
+ // use TLS configuration
+ TLSClientConfig: &tls.Config{
+ RootCAs: rootCAs,
+ InsecureSkipVerify: false,
+ Certificates: []tls.Certificate{cliCrt},
+ },
+ }
+
+ client := &http.Client{Transport: transport, Timeout: 30 * time.Second}
+
+ respData, err := json.Marshal(resp)
+ if err != nil {
+ return fmt.Errorf("marshal failed: %v", err)
+ }
+ url := edgeHub.HTTPServer + fmt.Sprintf("/task/%s/name/%s/node/%s/status", taskType, taskID, config.Modules.Edged.HostnameOverride)
+ result, err := client.Post(url, "application/json", bytes.NewReader(respData))
+
+ if err != nil {
+ return fmt.Errorf("post http request failed: %v", err)
+ }
+ klog.Error("report result ", result)
+ defer result.Body.Close()
+
+ return nil
+}
diff --git a/edge/pkg/edgehub/edgehub.go b/edge/pkg/edgehub/edgehub.go
index 2498d07f2..bf63f1f9e 100644
--- a/edge/pkg/edgehub/edgehub.go
+++ b/edge/pkg/edgehub/edgehub.go
@@ -13,8 +13,8 @@ import (
"github.com/kubeedge/kubeedge/edge/pkg/edgehub/certificate"
"github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients"
"github.com/kubeedge/kubeedge/edge/pkg/edgehub/config"
- // register Upgrade handler
- _ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/upgrade"
+ // register Task handler
+ _ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/task"
"github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha2"
)
diff --git a/edge/pkg/edgehub/task/task_handler.go b/edge/pkg/edgehub/task/task_handler.go
new file mode 100644
index 000000000..bd1588234
--- /dev/null
+++ b/edge/pkg/edgehub/task/task_handler.go
@@ -0,0 +1,57 @@
+package task
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "github.com/kubeedge/beehive/pkg/core/model"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
+ commontypes "github.com/kubeedge/kubeedge/common/types"
+ "github.com/kubeedge/kubeedge/edge/cmd/edgecore/app/options"
+ "github.com/kubeedge/kubeedge/edge/pkg/common/util"
+ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients"
+ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/common/msghandler"
+ "github.com/kubeedge/kubeedge/edge/pkg/edgehub/task/taskexecutor"
+)
+
+func init() {
+ handler := &taskHandler{}
+ msghandler.RegisterHandler(handler)
+}
+
+type taskHandler struct{}
+
+func (th *taskHandler) Filter(message *model.Message) bool {
+ name := message.GetGroup()
+ return name == modules.TaskManagerModuleName
+}
+
+func (th *taskHandler) Process(message *model.Message, clientHub clients.Adapter) error {
+ taskReq := &commontypes.NodeTaskRequest{}
+ data, err := message.GetContentData()
+ if err != nil {
+ return fmt.Errorf("failed to get content data: %v", err)
+ }
+ err = json.Unmarshal(data, taskReq)
+ if err != nil {
+ return fmt.Errorf("unmarshal failed: %v", err)
+ }
+ executor, err := taskexecutor.GetExecutor(taskReq.Type)
+ if err != nil {
+ return err
+ }
+ event, err := executor.Do(*taskReq)
+ if err != nil {
+ return err
+ }
+
+ // use external tool like keadm
+ if event.Action == "" {
+ return nil
+ }
+ err = util.ReportUpgradeResult(options.GetEdgeCoreConfig(), taskReq.Type, taskReq.TaskID, event)
+ if err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/edge/pkg/edgehub/task/taskexecutor/pre_check.go b/edge/pkg/edgehub/task/taskexecutor/pre_check.go
new file mode 100644
index 000000000..9a96c5a70
--- /dev/null
+++ b/edge/pkg/edgehub/task/taskexecutor/pre_check.go
@@ -0,0 +1,142 @@
+package taskexecutor
+
+import (
+ "encoding/json"
+ "fmt"
+ "time"
+
+ "github.com/shirou/gopsutil/cpu"
+ "github.com/shirou/gopsutil/disk"
+ "github.com/shirou/gopsutil/v3/mem"
+
+ "github.com/kubeedge/kubeedge/common/types"
+ api "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+)
+
+const (
+ MaxCPUUsage float64 = 80
+ MaxMemUsage float64 = 80
+ MaxDiskUsage float64 = 80
+)
+
+func preCheck(taskReq types.NodeTaskRequest) fsm.Event {
+ event := fsm.Event{
+ Type: "Check",
+ Action: api.ActionSuccess,
+ ErrorMsg: "",
+ }
+
+ data, err := json.Marshal(taskReq.Item)
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.ErrorMsg = err.Error()
+ return event
+ }
+ var checkItems types.NodePreCheckRequest
+ err = json.Unmarshal(data, &checkItems)
+ if err != nil {
+ event.Action = api.ActionFailure
+ event.ErrorMsg = err.Error()
+ return event
+ }
+
+ var failed bool
+ var checkResult = map[string]string{}
+ var checkFunc = map[string]func() error{
+ "cpu": checkCPU,
+ "mem": checkMem,
+ "disk": checkDisk,
+ }
+ for _, item := range checkItems.CheckItem {
+ f, ok := checkFunc[item]
+ if !ok {
+ checkResult[item] = "check item not support"
+ continue
+ }
+ err = f()
+ if err != nil {
+ failed = true
+ checkResult[item] = err.Error()
+ continue
+ }
+ checkResult[item] = "ok"
+ }
+ if !failed {
+ return event
+ }
+ event.Action = api.ActionFailure
+ result, err := json.Marshal(checkResult)
+ if err != nil {
+ event.ErrorMsg = err.Error()
+ return event
+ }
+ event.ErrorMsg = string(result)
+ return event
+}
+
+func checkCPU() error {
+ cpuUsage, err := cpu.Percent(100*time.Millisecond, false)
+ if err != nil {
+ return err
+ }
+ var usage float64
+ for _, percpu := range cpuUsage {
+ usage += percpu / float64(len(cpuUsage))
+ }
+ if usage > MaxCPUUsage {
+ return fmt.Errorf("current cpu usage is %f, which exceeds the maximum allowed usage %f", usage, MaxCPUUsage)
+ }
+ return nil
+}
+
+func checkMem() error {
+ memInfo, err := mem.VirtualMemory()
+ if err != nil {
+ return err
+ }
+ memUsedPercent := memInfo.UsedPercent
+ if memUsedPercent > MaxMemUsage {
+ return fmt.Errorf("current mem usage is %f, which exceeds the maximum allowed usage %f", memUsedPercent, MaxMemUsage)
+ }
+ return nil
+}
+
+func checkDisk() error {
+ partitions, err := disk.Partitions(true)
+ if err != nil {
+ return err
+ }
+ var failed bool
+ var diskUsages = map[string]string{}
+ for _, part := range partitions {
+ usage, err := disk.Usage(part.Mountpoint)
+ if err != nil {
+ failed = true
+ diskUsages[part.Device] = err.Error()
+ continue
+ }
+ if usage.UsedPercent > MaxDiskUsage {
+ failed = true
+ diskUsages[part.Device] = fmt.Sprintf("current disk usage is %f, which exceeds the maximum allowed usage %f", usage.UsedPercent, MaxMemUsage)
+ continue
+ }
+ diskUsages[part.Device] = fmt.Sprintf("%f", usage.UsedPercent)
+ }
+ if !failed {
+ return nil
+ }
+ result, err := json.Marshal(diskUsages)
+ if err != nil {
+ return err
+ }
+ return fmt.Errorf(string(result))
+}
+
+func normalInit(types.NodeTaskRequest) fsm.Event {
+ return fsm.Event{
+ Type: "Init",
+ Action: api.ActionSuccess,
+ ErrorMsg: "",
+ }
+}
diff --git a/edge/pkg/edgehub/task/taskexecutor/task_executor.go b/edge/pkg/edgehub/task/taskexecutor/task_executor.go
new file mode 100644
index 000000000..bf88fdc37
--- /dev/null
+++ b/edge/pkg/edgehub/task/taskexecutor/task_executor.go
@@ -0,0 +1,69 @@
+package taskexecutor
+
+import (
+ "fmt"
+
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/kubeedge/common/types"
+ "github.com/kubeedge/kubeedge/pkg/apis/fsm/v1alpha1"
+ "github.com/kubeedge/kubeedge/pkg/util/fsm"
+)
+
+func init() {
+ Register(TaskUpgrade, NewUpgradeExecutor())
+}
+
+type Executor interface {
+ Name() string
+ Do(types.NodeTaskRequest) (fsm.Event, error)
+}
+
+type BaseExecutor struct {
+ name string
+ methods map[string]func(types.NodeTaskRequest) fsm.Event
+}
+
+func (be *BaseExecutor) Name() string {
+ return be.name
+}
+
+func NewBaseExecutor(name string, methods map[string]func(types.NodeTaskRequest) fsm.Event) *BaseExecutor {
+ return &BaseExecutor{
+ name: name,
+ methods: methods,
+ }
+}
+
+func (be *BaseExecutor) Do(taskReq types.NodeTaskRequest) (fsm.Event, error) {
+ method, ok := be.methods[taskReq.State]
+ if !ok {
+ err := fmt.Errorf("method %s in executor %s is not implemented", taskReq.State, taskReq.Type)
+ klog.Warning(err.Error())
+ return fsm.Event{}, err
+ }
+ return method(taskReq), nil
+}
+
+var (
+ executors = make(map[string]Executor)
+ CommonMethods = map[string]func(types.NodeTaskRequest) fsm.Event{
+ string(v1alpha1.TaskChecking): preCheck,
+ string(v1alpha1.TaskInit): normalInit,
+ }
+)
+
+func Register(name string, executor Executor) {
+ if _, ok := executors[name]; ok {
+ klog.Warning("executor %s exists ", name)
+ }
+ executors[name] = executor
+}
+
+func GetExecutor(name string) (Executor, error) {
+ executor, ok := executors[name]
+ if !ok {
+ return nil, fmt.Errorf("executor %s is not registered", name)
+ }
+ return executor, nil
+}