summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com>2024-04-23 10:11:01 +0800
committerGitHub <noreply@github.com>2024-04-23 10:11:01 +0800
commit574f5d2ca5e8fee0a8c367d79041d2a54bcb0b8d (patch)
tree8b33b676372c25978f267b59e2aac1ff859eadc9
parentMerge pull request #5463 from luomengY/reset (diff)
parentfix: lint: unnecessary leading newline (diff)
downloadkubeedge-574f5d2ca5e8fee0a8c367d79041d2a54bcb0b8d.tar.gz
Merge pull request #5513 from micplus/feat/module-restart
support module restarting feature
-rw-r--r--edge/cmd/edgecore/app/server.go6
-rw-r--r--edge/pkg/edged/edged.go30
-rw-r--r--edge/pkg/metamanager/process.go30
-rw-r--r--pkg/features/features.go8
-rw-r--r--staging/src/github.com/kubeedge/beehive/pkg/core/core.go49
-rw-r--r--staging/src/github.com/kubeedge/beehive/pkg/core/module.go12
6 files changed, 113 insertions, 22 deletions
diff --git a/edge/cmd/edgecore/app/server.go b/edge/cmd/edgecore/app/server.go
index 8cd11902a..6af9719de 100644
--- a/edge/cmd/edgecore/app/server.go
+++ b/edge/cmd/edgecore/app/server.go
@@ -121,6 +121,12 @@ offering HTTP client capabilities to components of cloud to reach HTTP servers r
}
registerModules(config)
+
+ // enable module auto-restart feature
+ if features.DefaultFeatureGate.Enabled(features.ModuleRestart) {
+ core.EnableModuleRestart()
+ }
+
// start all modules
core.Run()
},
diff --git a/edge/pkg/edged/edged.go b/edge/pkg/edged/edged.go
index 11521841d..290e736f9 100644
--- a/edge/pkg/edged/edged.go
+++ b/edge/pkg/edged/edged.go
@@ -60,6 +60,7 @@ import (
metaclient "github.com/kubeedge/kubeedge/edge/pkg/metamanager/client"
"github.com/kubeedge/kubeedge/edge/pkg/metamanager/dao"
"github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha2"
+ kefeatures "github.com/kubeedge/kubeedge/pkg/features"
"github.com/kubeedge/kubeedge/pkg/version"
)
@@ -135,13 +136,33 @@ func (e *edged) Start() {
}
}
+ kubeletErrChan := make(chan error, 1)
go func() {
err := DefaultRunLiteKubelet(e.context, e.KubeletServer, e.KubeletDeps, e.FeatureGate)
if err != nil {
- klog.Errorf("Start edged failed, err: %v", err)
- os.Exit(1)
+ if !kefeatures.DefaultFeatureGate.Enabled(kefeatures.ModuleRestart) {
+ klog.Errorf("Start edged failed, err: %v", err)
+ os.Exit(1)
+ }
+ kubeletErrChan <- err
}
}()
+
+ // block until kubelet is ready to sync pods
+ startWaiter := time.NewTimer(10 * time.Second)
+ defer startWaiter.Stop()
+
+ select {
+ case <-beehiveContext.Done():
+ klog.Warning("Stop sync pod")
+ return
+ case err := <-kubeletErrChan:
+ klog.Errorf("Failed to start edged, err: %v", err)
+ return
+ case <-startWaiter.C:
+ klog.Info("Start sync pod")
+ }
+
e.syncPod(e.KubeletDeps.PodConfig)
}
@@ -221,18 +242,17 @@ func newEdged(enable bool, nodeName, namespace string) (*edged, error) {
}
func (e *edged) syncPod(podCfg *config.PodConfig) {
- time.Sleep(10 * time.Second)
-
//when starting, send msg to metamanager once to get existing pods
info := model.NewMessage("").BuildRouter(e.Name(), e.Group(), e.namespace+"/"+model.ResourceTypePod,
model.QueryOperation)
beehiveContext.Send(modules.MetaManagerModuleName, *info)
// rawUpdateChan receives the update events from metamanager or edgecontroller
rawUpdateChan := podCfg.Channel(beehiveContext.GetContext(), kubelettypes.ApiserverSource)
+
for {
select {
case <-beehiveContext.Done():
- klog.Warning("Sync pod stop")
+ klog.Warning("Stop sync pod")
return
default:
}
diff --git a/edge/pkg/metamanager/process.go b/edge/pkg/metamanager/process.go
index 6ab25e9e2..16bf596c7 100644
--- a/edge/pkg/metamanager/process.go
+++ b/edge/pkg/metamanager/process.go
@@ -421,21 +421,19 @@ func (m *metaManager) process(message model.Message) {
}
func (m *metaManager) runMetaManager() {
- go func() {
- for {
- select {
- case <-beehiveContext.Done():
- klog.Warning("MetaManager main loop stop")
- return
- default:
- }
- msg, err := beehiveContext.Receive(m.Name())
- if err != nil {
- klog.Errorf("get a message %+v: %v", msg, err)
- continue
- }
- klog.V(2).Infof("get a message %+v", msg)
- m.process(msg)
+ for {
+ select {
+ case <-beehiveContext.Done():
+ klog.Warning("MetaManager main loop stop")
+ return
+ default:
}
- }()
+ msg, err := beehiveContext.Receive(m.Name())
+ if err != nil {
+ klog.Errorf("get a message %+v: %v", msg, err)
+ continue
+ }
+ klog.V(2).Infof("get a message %+v", msg)
+ m.process(msg)
+ }
}
diff --git a/pkg/features/features.go b/pkg/features/features.go
index 896acf4a0..46b7548e4 100644
--- a/pkg/features/features.go
+++ b/pkg/features/features.go
@@ -26,6 +26,13 @@ const (
// alpha: v1.12
// owner: @vincentgoat
RequireAuthorization featuregate.Feature = "requireAuthorization"
+ // ModuleRestart supports automatic restarting for modules.
+ // If a module exits when running because of uncaught or external errors, BeeHive will try to keep the module running by restarting it.
+ // If moduleRestart enabled, modules will be kept running forever. The interval between starting a module increases whenever it exits,
+ // with maximum of 30s.
+ // alpha: v1.17
+ // owner: @micplus
+ ModuleRestart featuregate.Feature = "moduleRestart"
)
// defaultFeatureGates consists of all known Kubeedge-specific feature keys.
@@ -33,4 +40,5 @@ const (
// available throughout Kubeedge binaries.
var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
RequireAuthorization: {Default: false, PreRelease: featuregate.Alpha},
+ ModuleRestart: {Default: false, PreRelease: featuregate.Alpha},
}
diff --git a/staging/src/github.com/kubeedge/beehive/pkg/core/core.go b/staging/src/github.com/kubeedge/beehive/pkg/core/core.go
index 941e29da7..0b8ebdcd2 100644
--- a/staging/src/github.com/kubeedge/beehive/pkg/core/core.go
+++ b/staging/src/github.com/kubeedge/beehive/pkg/core/core.go
@@ -4,6 +4,7 @@ import (
"os"
"os/signal"
"syscall"
+ "time"
"k8s.io/klog/v2"
@@ -42,7 +43,12 @@ func StartModules() {
beehiveContext.AddModule(&m)
beehiveContext.AddModuleGroup(name, module.module.Group())
- go moduleKeeper(name, module, m)
+ if module.remote {
+ go moduleKeeper(name, module, m)
+ } else {
+ go localModuleKeeper(module)
+ }
+
klog.Infof("starting module %s", name)
}
}
@@ -84,3 +90,44 @@ func moduleKeeper(name string, moduleInfo *ModuleInfo, m common.ModuleInfo) {
beehiveContext.AddModuleGroup(name, moduleInfo.module.Group())
}
}
+
+// localModuleKeeper starts and tries to keep module running when module exited.
+// Call EnableModuleRestart() to enable auto-restarting feature in alpha version.
+func localModuleKeeper(m *ModuleInfo) {
+ if !moduleRestartEnabled {
+ m.module.Start()
+ return
+ }
+
+ ctx := beehiveContext.GetContext()
+ backoffDuration := time.Second
+
+ // do if module exits
+ afterFunc := func() {
+ if r := recover(); r != nil {
+ klog.Errorf("module %s panicking: %v", m.module.Name(), r)
+ }
+ klog.Errorf("module %s exited, will restart in %ds", m.module.Name(), int(backoffDuration.Seconds()))
+ }
+
+ for {
+ func() {
+ defer afterFunc()
+ m.module.Start()
+ }()
+
+ select {
+ case <-ctx.Done():
+ klog.Infof("module %s shutdown", m.module.Name())
+ return
+ case <-time.After(backoffDuration):
+ }
+
+ if backoffDuration < 30*time.Second {
+ backoffDuration *= 2
+ if backoffDuration > 30*time.Second {
+ backoffDuration = 30 * time.Second
+ }
+ }
+ }
+}
diff --git a/staging/src/github.com/kubeedge/beehive/pkg/core/module.go b/staging/src/github.com/kubeedge/beehive/pkg/core/module.go
index e44dfc255..275b5260e 100644
--- a/staging/src/github.com/kubeedge/beehive/pkg/core/module.go
+++ b/staging/src/github.com/kubeedge/beehive/pkg/core/module.go
@@ -19,6 +19,8 @@ var (
// Modules map
modules map[string]*ModuleInfo
disabledModules map[string]*ModuleInfo
+ // feature gates
+ moduleRestartEnabled bool
)
func init() {
@@ -78,3 +80,13 @@ func GetModuleExchange() *socket.ModuleExchange {
}
return &exchange
}
+
+// EnableModuleRestart enable feature for auto restarting modules
+func EnableModuleRestart() {
+ moduleRestartEnabled = true
+}
+
+// IsModuleRestartEnabled checks whether auto-restart feature is enabled.
+func IsModuleRestartEnabled() bool {
+ return moduleRestartEnabled
+}