summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com>2020-06-22 23:32:58 +0800
committerGitHub <noreply@github.com>2020-06-22 23:32:58 +0800
commit2a64a950a791d8ff62f4540a02f1110e3e80d0da (patch)
treed15378338f0aaa8648c6756a709e6aa795fadc8c
parentMerge pull request #1852 from GsssC/automated-cherry-pick-of-#1809-upstream-r... (diff)
parentclean up goroutines when delete egdenode (diff)
downloadkubeedge-origin/release-1.3.tar.gz
Merge pull request #1860 from fisherxu/automated-cherry-pick-of-#1670-upstream-release-1.3origin/release-1.3
Automated cherry pick of #1670: fix too long time to get node ready when reconnect
-rw-r--r--cloud/pkg/cloudhub/channelq/channelq.go50
-rw-r--r--cloud/pkg/cloudhub/common/model/types.go27
-rw-r--r--cloud/pkg/cloudhub/common/model/types_test.go20
-rw-r--r--cloud/pkg/cloudhub/handler/messagehandler.go246
-rw-r--r--cloud/pkg/edgecontroller/controller/downstream.go14
-rw-r--r--edge/pkg/edged/edged_status.go4
6 files changed, 187 insertions, 174 deletions
diff --git a/cloud/pkg/cloudhub/channelq/channelq.go b/cloud/pkg/cloudhub/channelq/channelq.go
index 8c2b28c77..653b85f21 100644
--- a/cloud/pkg/cloudhub/channelq/channelq.go
+++ b/cloud/pkg/cloudhub/channelq/channelq.go
@@ -70,17 +70,8 @@ func (q *ChannelMessageQueue) DispatchMessage() {
}
func (q *ChannelMessageQueue) addListMessageToQueue(nodeID string, msg *beehiveModel.Message) {
- nodeListQueue, err := q.GetNodeListQueue(nodeID)
- if err != nil {
- klog.Errorf("fail to get nodeListQueue for Node: %s", nodeID)
- return
- }
-
- nodeListStore, err := q.GetNodeListStore(nodeID)
- if err != nil {
- klog.Errorf("fail to get nodeListStore for Node: %s", nodeID)
- return
- }
+ nodeListQueue := q.GetNodeListQueue(nodeID)
+ nodeListStore := q.GetNodeListStore(nodeID)
messageKey, _ := getListMsgKey(msg)
@@ -93,17 +84,8 @@ func (q *ChannelMessageQueue) addMessageToQueue(nodeID string, msg *beehiveModel
return
}
- nodeQueue, err := q.GetNodeQueue(nodeID)
- if err != nil {
- klog.Errorf("fail to get nodeQueue for Node: %s", nodeID)
- return
- }
-
- nodeStore, err := q.GetNodeStore(nodeID)
- if err != nil {
- klog.Errorf("fail to get nodeStore for Node: %s", nodeID)
- return
- }
+ nodeQueue := q.GetNodeQueue(nodeID)
+ nodeStore := q.GetNodeStore(nodeID)
messageKey, err := getMsgKey(msg)
if err != nil {
@@ -289,59 +271,59 @@ func (q *ChannelMessageQueue) Publish(msg *beehiveModel.Message) error {
}
// GetNodeQueue returns the queue for given node
-func (q *ChannelMessageQueue) GetNodeQueue(nodeID string) (workqueue.RateLimitingInterface, error) {
+func (q *ChannelMessageQueue) GetNodeQueue(nodeID string) workqueue.RateLimitingInterface {
queue, ok := q.queuePool.Load(nodeID)
if !ok {
klog.Warningf("nodeQueue for edge node %s not found and created now", nodeID)
nodeQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), nodeID)
q.queuePool.Store(nodeID, nodeQueue)
- return nodeQueue, nil
+ return nodeQueue
}
nodeQueue := queue.(workqueue.RateLimitingInterface)
- return nodeQueue, nil
+ return nodeQueue
}
// GetNodeListQueue returns the listQueue for given node
-func (q *ChannelMessageQueue) GetNodeListQueue(nodeID string) (workqueue.RateLimitingInterface, error) {
+func (q *ChannelMessageQueue) GetNodeListQueue(nodeID string) workqueue.RateLimitingInterface {
queue, ok := q.listQueuePool.Load(nodeID)
if !ok {
klog.Warningf("nodeListQueue for edge node %s not found and created now", nodeID)
nodeListQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), nodeID)
q.listQueuePool.Store(nodeID, nodeListQueue)
- return nodeListQueue, nil
+ return nodeListQueue
}
nodeListQueue := queue.(workqueue.RateLimitingInterface)
- return nodeListQueue, nil
+ return nodeListQueue
}
// GetNodeStore returns the store for given node
-func (q *ChannelMessageQueue) GetNodeStore(nodeID string) (cache.Store, error) {
+func (q *ChannelMessageQueue) GetNodeStore(nodeID string) cache.Store {
store, ok := q.storePool.Load(nodeID)
if !ok {
klog.Warningf("nodeStore for edge node %s not found and created now", nodeID)
nodeStore := cache.NewStore(getMsgKey)
q.storePool.Store(nodeID, nodeStore)
- return nodeStore, nil
+ return nodeStore
}
nodeStore := store.(cache.Store)
- return nodeStore, nil
+ return nodeStore
}
// GetNodeListStore returns the listStore for given node
-func (q *ChannelMessageQueue) GetNodeListStore(nodeID string) (cache.Store, error) {
+func (q *ChannelMessageQueue) GetNodeListStore(nodeID string) cache.Store {
store, ok := q.listStorePool.Load(nodeID)
if !ok {
klog.Warningf("nodeListStore for edge node %s not found and created now", nodeID)
nodeListStore := cache.NewStore(getListMsgKey)
q.listStorePool.Store(nodeID, nodeListStore)
- return nodeListStore, nil
+ return nodeListStore
}
nodeListStore := store.(cache.Store)
- return nodeListStore, nil
+ return nodeListStore
}
// GetMessageUID returns the UID of the object in message
diff --git a/cloud/pkg/cloudhub/common/model/types.go b/cloud/pkg/cloudhub/common/model/types.go
index 7e8f52673..00bdd5dae 100644
--- a/cloud/pkg/cloudhub/common/model/types.go
+++ b/cloud/pkg/cloudhub/common/model/types.go
@@ -6,9 +6,9 @@ import (
"fmt"
"strings"
- "k8s.io/klog"
-
"github.com/kubeedge/beehive/pkg/core/model"
+ beehiveModel "github.com/kubeedge/beehive/pkg/core/model"
+ edgemessagelayer "github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller/messagelayer"
)
// constants for resource types
@@ -75,28 +75,15 @@ func NewResource(resType, resID string, info *HubInfo) string {
// IsNodeStopped indicates if the node is stopped or running
func IsNodeStopped(msg *model.Message) bool {
- tokens := strings.Split(msg.Router.Resource, "/")
- if len(tokens) != 2 || tokens[0] != ResNode {
+ resourceType, _ := edgemessagelayer.GetResourceType(*msg)
+ if resourceType != beehiveModel.ResourceTypeNode {
return false
}
- if msg.Router.Operation == OpDelete {
+
+ if msg.Router.Operation == model.DeleteOperation {
return true
}
- if msg.Router.Operation != OpUpdate || msg.Content == nil {
- return false
- }
- body, ok := msg.Content.(map[string]interface{})
- if !ok {
- klog.Errorf("fail to decode node update message: %s, type is %T", msg.GetContent(), msg.Content)
- // it can't be determined if the node has stopped
- return false
- }
- // trust struct of json body
- action, ok := body["action"]
- if !ok || action.(string) != "stop" {
- return false
- }
- return true
+ return false
}
// IsFromEdge judges if the event is sent from edge
diff --git a/cloud/pkg/cloudhub/common/model/types_test.go b/cloud/pkg/cloudhub/common/model/types_test.go
index ec21b6378..a5d848fe9 100644
--- a/cloud/pkg/cloudhub/common/model/types_test.go
+++ b/cloud/pkg/cloudhub/common/model/types_test.go
@@ -83,17 +83,12 @@ func TestIsNodeStopped(t *testing.T) {
"timestamp": time.Now().Unix(),
}
content, _ := json.Marshal(body)
- bodyAction := map[string]interface{}{
- "event_type": OpConnect,
- "timestamp": time.Now().Unix(),
- "action": "stop",
- }
+
msgResource := modelMessage("", "", 0, "", "", "", "Resource1", nil)
- msgOpDelete := modelMessage("", "", 0, "", "", OpDelete, "node/Node1", nil)
- msgNoContent := modelMessage("", "", 0, "", "", "", "node/Node1", nil)
- msgContent := modelMessage("", "", 0, "", "", OpUpdate, "node/Node1", content)
- msgNoAction := modelMessage("", "", 0, "", "", OpUpdate, "node/Node1", body)
- msgActionStop := modelMessage("", "", 0, "", "", OpUpdate, "node/Node1", bodyAction)
+ msgOpDelete := modelMessage("", "", 0, "", "", model.DeleteOperation, "node/edge-node/default/node/Node1", nil)
+ msgNoContent := modelMessage("", "", 0, "", "", "", "node/edge-node/default/node/Node1", nil)
+ msgContent := modelMessage("", "", 0, "", "", model.UpdateOperation, "node/edge-node/default/node/Node1", content)
+ msgNoAction := modelMessage("", "", 0, "", "", model.UpdateOperation, "node/edge-node/default/node/Node1", body)
tests := []struct {
name string
msg *model.Message
@@ -124,11 +119,6 @@ func TestIsNodeStopped(t *testing.T) {
msg: &msgNoAction,
errorWant: false,
},
- {
- name: "TestIsNodeStopped(): Case 6: msg.Content[action]=stop",
- msg: &msgActionStop,
- errorWant: true,
- },
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
diff --git a/cloud/pkg/cloudhub/handler/messagehandler.go b/cloud/pkg/cloudhub/handler/messagehandler.go
index 529499939..af23817a0 100644
--- a/cloud/pkg/cloudhub/handler/messagehandler.go
+++ b/cloud/pkg/cloudhub/handler/messagehandler.go
@@ -11,7 +11,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
- "k8s.io/client-go/util/workqueue"
"k8s.io/klog"
beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
@@ -57,6 +56,7 @@ type MessageHandle struct {
Nodes sync.Map
nodeConns sync.Map
nodeLocks sync.Map
+ nodeRegistered sync.Map
MessageQueue *channelq.ChannelMessageQueue
Handlers []HandleFunc
NodeLimit int
@@ -64,7 +64,7 @@ type MessageHandle struct {
MessageAcks sync.Map
}
-type HandleFunc func(hi hubio.CloudHubIO, info *model.HubInfo, exitServe chan ExitCode, stopSendMsg chan struct{})
+type HandleFunc func(info *model.HubInfo, exitServe chan ExitCode)
var once sync.Once
@@ -145,26 +145,36 @@ func (mh *MessageHandle) OnRegister(connection conn.Connection) {
}
io := &hubio.JSONIO{Connection: connection}
- go mh.ServeConn(io, &model.HubInfo{ProjectID: projectID, NodeID: nodeID})
+
+ if _, ok := mh.nodeRegistered.Load(nodeID); ok {
+ if conn, exist := mh.nodeConns.Load(nodeID); exist {
+ conn.(hubio.CloudHubIO).Close()
+ }
+ mh.nodeConns.Store(nodeID, io)
+ return
+ }
+ mh.nodeConns.Store(nodeID, io)
+ go mh.ServeConn(&model.HubInfo{ProjectID: projectID, NodeID: nodeID})
}
// KeepaliveCheckLoop checks whether the edge node is still alive
-func (mh *MessageHandle) KeepaliveCheckLoop(hi hubio.CloudHubIO, info *model.HubInfo, stopServe chan ExitCode, stopSendMsg chan struct{}) {
+func (mh *MessageHandle) KeepaliveCheckLoop(info *model.HubInfo, stopServe chan ExitCode) {
keepaliveTicker := time.NewTimer(time.Duration(mh.KeepaliveInterval) * time.Second)
for {
select {
case _, ok := <-mh.KeepaliveChannel[info.NodeID]:
if !ok {
+ klog.Warningf("Stop keepalive check for node: %s", info.NodeID)
return
}
klog.Infof("Node %s is still alive", info.NodeID)
keepaliveTicker.Reset(time.Duration(mh.KeepaliveInterval) * time.Second)
case <-keepaliveTicker.C:
- klog.Warningf("Timeout to receive heart beat from edge node %s for project %s",
- info.NodeID, info.ProjectID)
- stopServe <- nodeDisconnect
- close(stopSendMsg)
- return
+ if conn, ok := mh.nodeConns.Load(info.NodeID); ok {
+ klog.Warningf("Timeout to receive heart beat from edge node %s for project %s", info.NodeID, info.ProjectID)
+ conn.(hubio.CloudHubIO).Close()
+ mh.nodeConns.Delete(info.NodeID)
+ }
}
}
}
@@ -241,8 +251,8 @@ func (mh *MessageHandle) hubIoWrite(hi hubio.CloudHubIO, nodeID string, msg *bee
}
// ServeConn starts serving the incoming connection
-func (mh *MessageHandle) ServeConn(hi hubio.CloudHubIO, info *model.HubInfo) {
- err := mh.RegisterNode(hi, info)
+func (mh *MessageHandle) ServeConn(info *model.HubInfo) {
+ err := mh.RegisterNode(info)
if err != nil {
klog.Errorf("fail to register node %s, reason %s", info.NodeID, err.Error())
return
@@ -250,21 +260,24 @@ func (mh *MessageHandle) ServeConn(hi hubio.CloudHubIO, info *model.HubInfo) {
klog.Infof("edge node %s for project %s connected", info.NodeID, info.ProjectID)
exitServe := make(chan ExitCode, 3)
- stopSendMsg := make(chan struct{})
for _, handle := range mh.Handlers {
- go handle(hi, info, exitServe, stopSendMsg)
+ go handle(info, exitServe)
}
code := <-exitServe
- mh.UnregisterNode(hi, info, code)
+ mh.UnregisterNode(info, code)
}
// RegisterNode register node in cloudhub for the incoming connection
-func (mh *MessageHandle) RegisterNode(hi hubio.CloudHubIO, info *model.HubInfo) error {
+func (mh *MessageHandle) RegisterNode(info *model.HubInfo) error {
+ hi, err := mh.getNodeConnection(info.NodeID)
+ if err != nil {
+ return err
+ }
mh.MessageQueue.Connect(info)
- err := mh.MessageQueue.Publish(constructConnectMessage(info, true))
+ err = mh.MessageQueue.Publish(constructConnectMessage(info, true))
if err != nil {
klog.Errorf("fail to publish node connect event for node %s, reason %s", info.NodeID, err.Error())
notifyEventQueueError(hi, messageQueueDisconnect, info.NodeID)
@@ -275,16 +288,22 @@ func (mh *MessageHandle) RegisterNode(hi hubio.CloudHubIO, info *model.HubInfo)
return err
}
- mh.nodeConns.Store(info.NodeID, hi)
mh.nodeLocks.Store(info.NodeID, &sync.Mutex{})
mh.Nodes.Store(info.NodeID, true)
+ mh.nodeRegistered.Store(info.NodeID, true)
return nil
}
// UnregisterNode unregister node in cloudhub
-func (mh *MessageHandle) UnregisterNode(hi hubio.CloudHubIO, info *model.HubInfo, code ExitCode) {
+func (mh *MessageHandle) UnregisterNode(info *model.HubInfo, code ExitCode) {
+ if hi, err := mh.getNodeConnection(info.NodeID); err == nil {
+ notifyEventQueueError(hi, code, info.NodeID)
+ hi.Close()
+ }
+
mh.nodeLocks.Delete(info.NodeID)
mh.nodeConns.Delete(info.NodeID)
+ mh.nodeRegistered.Delete(info.NodeID)
close(mh.KeepaliveChannel[info.NodeID])
delete(mh.KeepaliveChannel, info.NodeID)
@@ -292,9 +311,9 @@ func (mh *MessageHandle) UnregisterNode(hi hubio.CloudHubIO, info *model.HubInfo
if err != nil {
klog.Errorf("fail to publish node disconnect event for node %s, reason %s", info.NodeID, err.Error())
}
- notifyEventQueueError(hi, code, info.NodeID)
+
mh.Nodes.Delete(info.NodeID)
- err = hi.Close()
+
if err != nil {
klog.Errorf("fail to close connection, reason: %s", err.Error())
}
@@ -317,103 +336,105 @@ func (mh *MessageHandle) GetNodeCount() int {
}
// ListMessageWriteLoop processes all list type resource write requests
-func (mh *MessageHandle) ListMessageWriteLoop(hi hubio.CloudHubIO, info *model.HubInfo, stopServe chan ExitCode, stopSendMsg chan struct{}) {
- nodeListQueue, err := mh.MessageQueue.GetNodeListQueue(info.NodeID)
- if err != nil {
- klog.Errorf("Failed to get nodeQueue for node %s: %v", info.NodeID, err)
- stopServe <- messageQueueDisconnect
- return
- }
- nodeListStore, err := mh.MessageQueue.GetNodeListStore(info.NodeID)
- if err != nil {
- klog.Errorf("Failed to get nodeStore for node %s: %v", info.NodeID, err)
- stopServe <- messageQueueDisconnect
- return
- }
+func (mh *MessageHandle) ListMessageWriteLoop(info *model.HubInfo, stopServe chan ExitCode) {
+ nodeListQueue := mh.MessageQueue.GetNodeListQueue(info.NodeID)
+ nodeListStore := mh.MessageQueue.GetNodeListStore(info.NodeID)
+ nodeQueue := mh.MessageQueue.GetNodeQueue(info.NodeID)
+
for {
- select {
- case <-stopSendMsg:
- klog.Errorf("Node %s disconnected and stopped sending messages", info.NodeID)
+ key, quit := nodeListQueue.Get()
+ if quit {
+ klog.Errorf("nodeListQueue for node %s has shutdown", info.NodeID)
+ return
+ }
+
+ obj, exist, _ := nodeListStore.GetByKey(key.(string))
+ if !exist {
+ klog.Errorf("nodeListStore for node %s doesn't exist", info.NodeID)
+ continue
+ }
+ msg := obj.(*beehiveModel.Message)
+
+ if model.IsNodeStopped(msg) {
+ klog.Warningf("node %s is deleted, data for node will be cleaned up", info.NodeID)
+ nodeQueue.ShutDown()
+ nodeListQueue.ShutDown()
+ stopServe <- nodeStop
return
- default:
- mh.handleMessage(nodeListQueue, nodeListStore, hi, info, stopServe, "listMessage")
}
+ if !model.IsToEdge(msg) {
+ klog.Infof("skip only to cloud event for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)
+ continue
+ }
+ klog.V(4).Infof("event to send for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)
+
+ trimMessage(msg)
+
+ conn, ok := mh.nodeConns.Load(info.NodeID)
+ if !ok {
+ continue
+ }
+
+ mh.send(conn.(hubio.CloudHubIO), info, msg)
+
+ // delete successfully sent events from the queue/store
+ nodeListStore.Delete(msg)
+
+ nodeListQueue.Forget(key.(string))
+ nodeListQueue.Done(key)
}
}
// MessageWriteLoop processes all write requests
-func (mh *MessageHandle) MessageWriteLoop(hi hubio.CloudHubIO, info *model.HubInfo, stopServe chan ExitCode, stopSendMsg chan struct{}) {
- nodeQueue, err := mh.MessageQueue.GetNodeQueue(info.NodeID)
- if err != nil {
- klog.Errorf("Failed to get nodeQueue for node %s: %v", info.NodeID, err)
- stopServe <- messageQueueDisconnect
- return
- }
- nodeStore, err := mh.MessageQueue.GetNodeStore(info.NodeID)
- if err != nil {
- klog.Errorf("Failed to get nodeStore for node %s: %v", info.NodeID, err)
- stopServe <- messageQueueDisconnect
- return
- }
+func (mh *MessageHandle) MessageWriteLoop(info *model.HubInfo, stopServe chan ExitCode) {
+ nodeQueue := mh.MessageQueue.GetNodeQueue(info.NodeID)
+ nodeStore := mh.MessageQueue.GetNodeStore(info.NodeID)
for {
- select {
- case <-stopSendMsg:
- klog.Errorf("Node %s disconnected and stopped sending messages", info.NodeID)
+ key, quit := nodeQueue.Get()
+ if quit {
+ klog.Errorf("nodeQueue for node %s has shutdown", info.NodeID)
return
- default:
- mh.handleMessage(nodeQueue, nodeStore, hi, info, stopServe, "message")
}
- }
-}
-func (mh *MessageHandle) handleMessage(nodeQueue workqueue.RateLimitingInterface,
- nodeStore cache.Store, hi hubio.CloudHubIO,
- info *model.HubInfo, stopServe chan ExitCode, msgType string) {
- key, quit := nodeQueue.Get()
- if quit {
- klog.Errorf("nodeQueue for node %s has shutdown", info.NodeID)
- return
- }
- obj, exist, _ := nodeStore.GetByKey(key.(string))
- if !exist {
- klog.Errorf("nodeStore for node %s doesn't exist", info.NodeID)
- return
- }
+ obj, exist, _ := nodeStore.GetByKey(key.(string))
+ if !exist {
+ klog.Errorf("nodeStore for node %s doesn't exist", info.NodeID)
+ continue
+ }
+ msg := obj.(*beehiveModel.Message)
- msg := obj.(*beehiveModel.Message)
+ if !model.IsToEdge(msg) {
+ klog.Infof("skip only to cloud event for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)
+ continue
+ }
+ klog.V(4).Infof("event to send for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)
- if model.IsNodeStopped(msg) {
- klog.Infof("node %s is stopped, will disconnect", info.NodeID)
- stopServe <- nodeStop
- return
- }
- if !model.IsToEdge(msg) {
- klog.Infof("skip only to cloud event for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)
- return
- }
- klog.V(4).Infof("event to send for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)
+ copyMsg := deepcopy(msg)
+ trimMessage(msg)
- copyMsg := deepcopy(msg)
- trimMessage(msg)
- err := hi.SetWriteDeadline(time.Now().Add(time.Duration(mh.WriteTimeout) * time.Second))
- if err != nil {
- klog.Errorf("SetWriteDeadline error, %s", err.Error())
- stopServe <- hubioWriteFail
- return
- }
- if msgType == "listMessage" {
- mh.send(hi, info, msg)
- // delete successfully sent events from the queue/store
- nodeStore.Delete(msg)
- } else {
- mh.sendMsg(hi, info, msg, copyMsg, nodeStore)
- }
+ for {
+ conn, ok := mh.nodeConns.Load(info.NodeID)
+ if !ok {
+ time.Sleep(time.Second * 2)
+ continue
+ }
+ err := mh.sendMsg(conn.(hubio.CloudHubIO), info, msg, copyMsg, nodeStore)
+ if err != nil {
+ klog.Errorf("Failed to send event to node: %s, affected event: %s, err: %s",
+ info.NodeID, dumpMessageMetadata(msg), err.Error())
+ nodeQueue.AddRateLimited(key.(string))
+ time.Sleep(time.Second * 2)
+ }
+ break
+ }
- nodeQueue.Done(key)
+ nodeQueue.Forget(key.(string))
+ nodeQueue.Done(key)
+ }
}
-func (mh *MessageHandle) sendMsg(hi hubio.CloudHubIO, info *model.HubInfo, msg, copyMsg *beehiveModel.Message, nodeStore cache.Store) {
+func (mh *MessageHandle) sendMsg(hi hubio.CloudHubIO, info *model.HubInfo, msg, copyMsg *beehiveModel.Message, nodeStore cache.Store) error {
ackChan := make(chan struct{})
mh.MessageAcks.Store(msg.GetID(), ackChan)
@@ -423,7 +444,10 @@ func (mh *MessageHandle) sendMsg(hi hubio.CloudHubIO, info *model.HubInfo, msg,
retryInterval time.Duration = 5
)
ticker := time.NewTimer(retryInterval * time.Second)
- mh.send(hi, info, msg)
+ err := mh.send(hi, info, msg)
+ if err != nil {
+ return err
+ }
LOOP:
for {
@@ -435,20 +459,23 @@ LOOP:
if retry == 4 {
break LOOP
}
- mh.send(hi, info, msg)
+ err := mh.send(hi, info, msg)
+ if err != nil {
+ return err
+ }
retry++
ticker.Reset(time.Second * retryInterval)
}
}
+ return nil
}
-func (mh *MessageHandle) send(hi hubio.CloudHubIO, info *model.HubInfo, msg *beehiveModel.Message) {
+func (mh *MessageHandle) send(hi hubio.CloudHubIO, info *model.HubInfo, msg *beehiveModel.Message) error {
err := mh.hubIoWrite(hi, info.NodeID, msg)
if err != nil {
- klog.Errorf("write error, connection for node %s will be closed, affected event %s, reason %s",
- info.NodeID, dumpMessageMetadata(msg), err.Error())
- return
+ return err
}
+ return nil
}
func (mh *MessageHandle) saveSuccessPoint(msg *beehiveModel.Message, info *model.HubInfo, nodeStore cache.Store) {
@@ -513,6 +540,15 @@ func (mh *MessageHandle) deleteSuccessPoint(resourceNamespace, objectSyncName st
mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Delete(objectSyncName, metav1.NewDeleteOptions(0))
}
+func (mh *MessageHandle) getNodeConnection(nodeid string) (hubio.CloudHubIO, error) {
+ conn, ok := mh.nodeConns.Load(nodeid)
+ if !ok {
+ return nil, fmt.Errorf("Failed to get connection for node: %s", nodeid)
+ }
+
+ return conn.(hubio.CloudHubIO), nil
+}
+
func deepcopy(msg *beehiveModel.Message) *beehiveModel.Message {
if msg == nil {
return nil
diff --git a/cloud/pkg/edgecontroller/controller/downstream.go b/cloud/pkg/edgecontroller/controller/downstream.go
index 3d1c0c562..18227cb5c 100644
--- a/cloud/pkg/edgecontroller/controller/downstream.go
+++ b/cloud/pkg/edgecontroller/controller/downstream.go
@@ -266,6 +266,20 @@ func (dc *DownstreamController) syncEdgeNodes() {
}
case watch.Deleted:
dc.lc.DeleteNode(node.ObjectMeta.Name)
+
+ msg := model.NewMessage("")
+ resource, err := messagelayer.BuildResource(node.Name, "namespace", constants.ResourceNode, node.Name)
+ if err != nil {
+ klog.Warningf("Built message resource failed with error: %s", err)
+ break
+ }
+ msg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.DeleteOperation)
+ err = dc.messageLayer.Send(*msg)
+ if err != nil {
+ klog.Warningf("send message failed with error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource())
+ } else {
+ klog.V(4).Infof("send message successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())
+ }
default:
// unsupported operation, no need to send to any node
klog.Warningf("Node event type: %s unsupported", e.Type)
diff --git a/edge/pkg/edged/edged_status.go b/edge/pkg/edged/edged_status.go
index 6910e99fa..fa9614ce7 100644
--- a/edge/pkg/edged/edged_status.go
+++ b/edge/pkg/edged/edged_status.go
@@ -423,6 +423,10 @@ func (e *edged) syncNodeStatus() {
if !e.registrationCompleted {
if err := e.registerNode(); err != nil {
klog.Errorf("Register node failed: %v", err)
+ return
+ }
+ if err := e.updateNodeStatus(); err != nil {
+ klog.Errorf("Unable to update node status: %v", err)
}
} else {
if err := e.updateNodeStatus(); err != nil {