diff options
| -rw-r--r-- | cloud/pkg/cloudhub/channelq/channelq.go | 50 | ||||
| -rw-r--r-- | cloud/pkg/cloudhub/common/model/types.go | 27 | ||||
| -rw-r--r-- | cloud/pkg/cloudhub/common/model/types_test.go | 20 | ||||
| -rw-r--r-- | cloud/pkg/cloudhub/handler/messagehandler.go | 246 | ||||
| -rw-r--r-- | cloud/pkg/edgecontroller/controller/downstream.go | 14 | ||||
| -rw-r--r-- | edge/pkg/edged/edged_status.go | 4 |
6 files changed, 187 insertions, 174 deletions
diff --git a/cloud/pkg/cloudhub/channelq/channelq.go b/cloud/pkg/cloudhub/channelq/channelq.go index 8c2b28c77..653b85f21 100644 --- a/cloud/pkg/cloudhub/channelq/channelq.go +++ b/cloud/pkg/cloudhub/channelq/channelq.go @@ -70,17 +70,8 @@ func (q *ChannelMessageQueue) DispatchMessage() { } func (q *ChannelMessageQueue) addListMessageToQueue(nodeID string, msg *beehiveModel.Message) { - nodeListQueue, err := q.GetNodeListQueue(nodeID) - if err != nil { - klog.Errorf("fail to get nodeListQueue for Node: %s", nodeID) - return - } - - nodeListStore, err := q.GetNodeListStore(nodeID) - if err != nil { - klog.Errorf("fail to get nodeListStore for Node: %s", nodeID) - return - } + nodeListQueue := q.GetNodeListQueue(nodeID) + nodeListStore := q.GetNodeListStore(nodeID) messageKey, _ := getListMsgKey(msg) @@ -93,17 +84,8 @@ func (q *ChannelMessageQueue) addMessageToQueue(nodeID string, msg *beehiveModel return } - nodeQueue, err := q.GetNodeQueue(nodeID) - if err != nil { - klog.Errorf("fail to get nodeQueue for Node: %s", nodeID) - return - } - - nodeStore, err := q.GetNodeStore(nodeID) - if err != nil { - klog.Errorf("fail to get nodeStore for Node: %s", nodeID) - return - } + nodeQueue := q.GetNodeQueue(nodeID) + nodeStore := q.GetNodeStore(nodeID) messageKey, err := getMsgKey(msg) if err != nil { @@ -289,59 +271,59 @@ func (q *ChannelMessageQueue) Publish(msg *beehiveModel.Message) error { } // GetNodeQueue returns the queue for given node -func (q *ChannelMessageQueue) GetNodeQueue(nodeID string) (workqueue.RateLimitingInterface, error) { +func (q *ChannelMessageQueue) GetNodeQueue(nodeID string) workqueue.RateLimitingInterface { queue, ok := q.queuePool.Load(nodeID) if !ok { klog.Warningf("nodeQueue for edge node %s not found and created now", nodeID) nodeQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), nodeID) q.queuePool.Store(nodeID, nodeQueue) - return nodeQueue, nil + return nodeQueue } nodeQueue := queue.(workqueue.RateLimitingInterface) - return nodeQueue, nil + return nodeQueue } // GetNodeListQueue returns the listQueue for given node -func (q *ChannelMessageQueue) GetNodeListQueue(nodeID string) (workqueue.RateLimitingInterface, error) { +func (q *ChannelMessageQueue) GetNodeListQueue(nodeID string) workqueue.RateLimitingInterface { queue, ok := q.listQueuePool.Load(nodeID) if !ok { klog.Warningf("nodeListQueue for edge node %s not found and created now", nodeID) nodeListQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), nodeID) q.listQueuePool.Store(nodeID, nodeListQueue) - return nodeListQueue, nil + return nodeListQueue } nodeListQueue := queue.(workqueue.RateLimitingInterface) - return nodeListQueue, nil + return nodeListQueue } // GetNodeStore returns the store for given node -func (q *ChannelMessageQueue) GetNodeStore(nodeID string) (cache.Store, error) { +func (q *ChannelMessageQueue) GetNodeStore(nodeID string) cache.Store { store, ok := q.storePool.Load(nodeID) if !ok { klog.Warningf("nodeStore for edge node %s not found and created now", nodeID) nodeStore := cache.NewStore(getMsgKey) q.storePool.Store(nodeID, nodeStore) - return nodeStore, nil + return nodeStore } nodeStore := store.(cache.Store) - return nodeStore, nil + return nodeStore } // GetNodeListStore returns the listStore for given node -func (q *ChannelMessageQueue) GetNodeListStore(nodeID string) (cache.Store, error) { +func (q *ChannelMessageQueue) GetNodeListStore(nodeID string) cache.Store { store, ok := q.listStorePool.Load(nodeID) if !ok { klog.Warningf("nodeListStore for edge node %s not found and created now", nodeID) nodeListStore := cache.NewStore(getListMsgKey) q.listStorePool.Store(nodeID, nodeListStore) - return nodeListStore, nil + return nodeListStore } nodeListStore := store.(cache.Store) - return nodeListStore, nil + return nodeListStore } // GetMessageUID returns the UID of the object in message diff --git a/cloud/pkg/cloudhub/common/model/types.go b/cloud/pkg/cloudhub/common/model/types.go index 7e8f52673..00bdd5dae 100644 --- a/cloud/pkg/cloudhub/common/model/types.go +++ b/cloud/pkg/cloudhub/common/model/types.go @@ -6,9 +6,9 @@ import ( "fmt" "strings" - "k8s.io/klog" - "github.com/kubeedge/beehive/pkg/core/model" + beehiveModel "github.com/kubeedge/beehive/pkg/core/model" + edgemessagelayer "github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller/messagelayer" ) // constants for resource types @@ -75,28 +75,15 @@ func NewResource(resType, resID string, info *HubInfo) string { // IsNodeStopped indicates if the node is stopped or running func IsNodeStopped(msg *model.Message) bool { - tokens := strings.Split(msg.Router.Resource, "/") - if len(tokens) != 2 || tokens[0] != ResNode { + resourceType, _ := edgemessagelayer.GetResourceType(*msg) + if resourceType != beehiveModel.ResourceTypeNode { return false } - if msg.Router.Operation == OpDelete { + + if msg.Router.Operation == model.DeleteOperation { return true } - if msg.Router.Operation != OpUpdate || msg.Content == nil { - return false - } - body, ok := msg.Content.(map[string]interface{}) - if !ok { - klog.Errorf("fail to decode node update message: %s, type is %T", msg.GetContent(), msg.Content) - // it can't be determined if the node has stopped - return false - } - // trust struct of json body - action, ok := body["action"] - if !ok || action.(string) != "stop" { - return false - } - return true + return false } // IsFromEdge judges if the event is sent from edge diff --git a/cloud/pkg/cloudhub/common/model/types_test.go b/cloud/pkg/cloudhub/common/model/types_test.go index ec21b6378..a5d848fe9 100644 --- a/cloud/pkg/cloudhub/common/model/types_test.go +++ b/cloud/pkg/cloudhub/common/model/types_test.go @@ -83,17 +83,12 @@ func TestIsNodeStopped(t *testing.T) { "timestamp": time.Now().Unix(), } content, _ := json.Marshal(body) - bodyAction := map[string]interface{}{ - "event_type": OpConnect, - "timestamp": time.Now().Unix(), - "action": "stop", - } + msgResource := modelMessage("", "", 0, "", "", "", "Resource1", nil) - msgOpDelete := modelMessage("", "", 0, "", "", OpDelete, "node/Node1", nil) - msgNoContent := modelMessage("", "", 0, "", "", "", "node/Node1", nil) - msgContent := modelMessage("", "", 0, "", "", OpUpdate, "node/Node1", content) - msgNoAction := modelMessage("", "", 0, "", "", OpUpdate, "node/Node1", body) - msgActionStop := modelMessage("", "", 0, "", "", OpUpdate, "node/Node1", bodyAction) + msgOpDelete := modelMessage("", "", 0, "", "", model.DeleteOperation, "node/edge-node/default/node/Node1", nil) + msgNoContent := modelMessage("", "", 0, "", "", "", "node/edge-node/default/node/Node1", nil) + msgContent := modelMessage("", "", 0, "", "", model.UpdateOperation, "node/edge-node/default/node/Node1", content) + msgNoAction := modelMessage("", "", 0, "", "", model.UpdateOperation, "node/edge-node/default/node/Node1", body) tests := []struct { name string msg *model.Message @@ -124,11 +119,6 @@ func TestIsNodeStopped(t *testing.T) { msg: &msgNoAction, errorWant: false, }, - { - name: "TestIsNodeStopped(): Case 6: msg.Content[action]=stop", - msg: &msgActionStop, - errorWant: true, - }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { diff --git a/cloud/pkg/cloudhub/handler/messagehandler.go b/cloud/pkg/cloudhub/handler/messagehandler.go index 529499939..af23817a0 100644 --- a/cloud/pkg/cloudhub/handler/messagehandler.go +++ b/cloud/pkg/cloudhub/handler/messagehandler.go @@ -11,7 +11,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" "k8s.io/klog" beehiveContext "github.com/kubeedge/beehive/pkg/core/context" @@ -57,6 +56,7 @@ type MessageHandle struct { Nodes sync.Map nodeConns sync.Map nodeLocks sync.Map + nodeRegistered sync.Map MessageQueue *channelq.ChannelMessageQueue Handlers []HandleFunc NodeLimit int @@ -64,7 +64,7 @@ type MessageHandle struct { MessageAcks sync.Map } -type HandleFunc func(hi hubio.CloudHubIO, info *model.HubInfo, exitServe chan ExitCode, stopSendMsg chan struct{}) +type HandleFunc func(info *model.HubInfo, exitServe chan ExitCode) var once sync.Once @@ -145,26 +145,36 @@ func (mh *MessageHandle) OnRegister(connection conn.Connection) { } io := &hubio.JSONIO{Connection: connection} - go mh.ServeConn(io, &model.HubInfo{ProjectID: projectID, NodeID: nodeID}) + + if _, ok := mh.nodeRegistered.Load(nodeID); ok { + if conn, exist := mh.nodeConns.Load(nodeID); exist { + conn.(hubio.CloudHubIO).Close() + } + mh.nodeConns.Store(nodeID, io) + return + } + mh.nodeConns.Store(nodeID, io) + go mh.ServeConn(&model.HubInfo{ProjectID: projectID, NodeID: nodeID}) } // KeepaliveCheckLoop checks whether the edge node is still alive -func (mh *MessageHandle) KeepaliveCheckLoop(hi hubio.CloudHubIO, info *model.HubInfo, stopServe chan ExitCode, stopSendMsg chan struct{}) { +func (mh *MessageHandle) KeepaliveCheckLoop(info *model.HubInfo, stopServe chan ExitCode) { keepaliveTicker := time.NewTimer(time.Duration(mh.KeepaliveInterval) * time.Second) for { select { case _, ok := <-mh.KeepaliveChannel[info.NodeID]: if !ok { + klog.Warningf("Stop keepalive check for node: %s", info.NodeID) return } klog.Infof("Node %s is still alive", info.NodeID) keepaliveTicker.Reset(time.Duration(mh.KeepaliveInterval) * time.Second) case <-keepaliveTicker.C: - klog.Warningf("Timeout to receive heart beat from edge node %s for project %s", - info.NodeID, info.ProjectID) - stopServe <- nodeDisconnect - close(stopSendMsg) - return + if conn, ok := mh.nodeConns.Load(info.NodeID); ok { + klog.Warningf("Timeout to receive heart beat from edge node %s for project %s", info.NodeID, info.ProjectID) + conn.(hubio.CloudHubIO).Close() + mh.nodeConns.Delete(info.NodeID) + } } } } @@ -241,8 +251,8 @@ func (mh *MessageHandle) hubIoWrite(hi hubio.CloudHubIO, nodeID string, msg *bee } // ServeConn starts serving the incoming connection -func (mh *MessageHandle) ServeConn(hi hubio.CloudHubIO, info *model.HubInfo) { - err := mh.RegisterNode(hi, info) +func (mh *MessageHandle) ServeConn(info *model.HubInfo) { + err := mh.RegisterNode(info) if err != nil { klog.Errorf("fail to register node %s, reason %s", info.NodeID, err.Error()) return @@ -250,21 +260,24 @@ func (mh *MessageHandle) ServeConn(hi hubio.CloudHubIO, info *model.HubInfo) { klog.Infof("edge node %s for project %s connected", info.NodeID, info.ProjectID) exitServe := make(chan ExitCode, 3) - stopSendMsg := make(chan struct{}) for _, handle := range mh.Handlers { - go handle(hi, info, exitServe, stopSendMsg) + go handle(info, exitServe) } code := <-exitServe - mh.UnregisterNode(hi, info, code) + mh.UnregisterNode(info, code) } // RegisterNode register node in cloudhub for the incoming connection -func (mh *MessageHandle) RegisterNode(hi hubio.CloudHubIO, info *model.HubInfo) error { +func (mh *MessageHandle) RegisterNode(info *model.HubInfo) error { + hi, err := mh.getNodeConnection(info.NodeID) + if err != nil { + return err + } mh.MessageQueue.Connect(info) - err := mh.MessageQueue.Publish(constructConnectMessage(info, true)) + err = mh.MessageQueue.Publish(constructConnectMessage(info, true)) if err != nil { klog.Errorf("fail to publish node connect event for node %s, reason %s", info.NodeID, err.Error()) notifyEventQueueError(hi, messageQueueDisconnect, info.NodeID) @@ -275,16 +288,22 @@ func (mh *MessageHandle) RegisterNode(hi hubio.CloudHubIO, info *model.HubInfo) return err } - mh.nodeConns.Store(info.NodeID, hi) mh.nodeLocks.Store(info.NodeID, &sync.Mutex{}) mh.Nodes.Store(info.NodeID, true) + mh.nodeRegistered.Store(info.NodeID, true) return nil } // UnregisterNode unregister node in cloudhub -func (mh *MessageHandle) UnregisterNode(hi hubio.CloudHubIO, info *model.HubInfo, code ExitCode) { +func (mh *MessageHandle) UnregisterNode(info *model.HubInfo, code ExitCode) { + if hi, err := mh.getNodeConnection(info.NodeID); err == nil { + notifyEventQueueError(hi, code, info.NodeID) + hi.Close() + } + mh.nodeLocks.Delete(info.NodeID) mh.nodeConns.Delete(info.NodeID) + mh.nodeRegistered.Delete(info.NodeID) close(mh.KeepaliveChannel[info.NodeID]) delete(mh.KeepaliveChannel, info.NodeID) @@ -292,9 +311,9 @@ func (mh *MessageHandle) UnregisterNode(hi hubio.CloudHubIO, info *model.HubInfo if err != nil { klog.Errorf("fail to publish node disconnect event for node %s, reason %s", info.NodeID, err.Error()) } - notifyEventQueueError(hi, code, info.NodeID) + mh.Nodes.Delete(info.NodeID) - err = hi.Close() + if err != nil { klog.Errorf("fail to close connection, reason: %s", err.Error()) } @@ -317,103 +336,105 @@ func (mh *MessageHandle) GetNodeCount() int { } // ListMessageWriteLoop processes all list type resource write requests -func (mh *MessageHandle) ListMessageWriteLoop(hi hubio.CloudHubIO, info *model.HubInfo, stopServe chan ExitCode, stopSendMsg chan struct{}) { - nodeListQueue, err := mh.MessageQueue.GetNodeListQueue(info.NodeID) - if err != nil { - klog.Errorf("Failed to get nodeQueue for node %s: %v", info.NodeID, err) - stopServe <- messageQueueDisconnect - return - } - nodeListStore, err := mh.MessageQueue.GetNodeListStore(info.NodeID) - if err != nil { - klog.Errorf("Failed to get nodeStore for node %s: %v", info.NodeID, err) - stopServe <- messageQueueDisconnect - return - } +func (mh *MessageHandle) ListMessageWriteLoop(info *model.HubInfo, stopServe chan ExitCode) { + nodeListQueue := mh.MessageQueue.GetNodeListQueue(info.NodeID) + nodeListStore := mh.MessageQueue.GetNodeListStore(info.NodeID) + nodeQueue := mh.MessageQueue.GetNodeQueue(info.NodeID) + for { - select { - case <-stopSendMsg: - klog.Errorf("Node %s disconnected and stopped sending messages", info.NodeID) + key, quit := nodeListQueue.Get() + if quit { + klog.Errorf("nodeListQueue for node %s has shutdown", info.NodeID) + return + } + + obj, exist, _ := nodeListStore.GetByKey(key.(string)) + if !exist { + klog.Errorf("nodeListStore for node %s doesn't exist", info.NodeID) + continue + } + msg := obj.(*beehiveModel.Message) + + if model.IsNodeStopped(msg) { + klog.Warningf("node %s is deleted, data for node will be cleaned up", info.NodeID) + nodeQueue.ShutDown() + nodeListQueue.ShutDown() + stopServe <- nodeStop return - default: - mh.handleMessage(nodeListQueue, nodeListStore, hi, info, stopServe, "listMessage") } + if !model.IsToEdge(msg) { + klog.Infof("skip only to cloud event for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content) + continue + } + klog.V(4).Infof("event to send for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content) + + trimMessage(msg) + + conn, ok := mh.nodeConns.Load(info.NodeID) + if !ok { + continue + } + + mh.send(conn.(hubio.CloudHubIO), info, msg) + + // delete successfully sent events from the queue/store + nodeListStore.Delete(msg) + + nodeListQueue.Forget(key.(string)) + nodeListQueue.Done(key) } } // MessageWriteLoop processes all write requests -func (mh *MessageHandle) MessageWriteLoop(hi hubio.CloudHubIO, info *model.HubInfo, stopServe chan ExitCode, stopSendMsg chan struct{}) { - nodeQueue, err := mh.MessageQueue.GetNodeQueue(info.NodeID) - if err != nil { - klog.Errorf("Failed to get nodeQueue for node %s: %v", info.NodeID, err) - stopServe <- messageQueueDisconnect - return - } - nodeStore, err := mh.MessageQueue.GetNodeStore(info.NodeID) - if err != nil { - klog.Errorf("Failed to get nodeStore for node %s: %v", info.NodeID, err) - stopServe <- messageQueueDisconnect - return - } +func (mh *MessageHandle) MessageWriteLoop(info *model.HubInfo, stopServe chan ExitCode) { + nodeQueue := mh.MessageQueue.GetNodeQueue(info.NodeID) + nodeStore := mh.MessageQueue.GetNodeStore(info.NodeID) for { - select { - case <-stopSendMsg: - klog.Errorf("Node %s disconnected and stopped sending messages", info.NodeID) + key, quit := nodeQueue.Get() + if quit { + klog.Errorf("nodeQueue for node %s has shutdown", info.NodeID) return - default: - mh.handleMessage(nodeQueue, nodeStore, hi, info, stopServe, "message") } - } -} -func (mh *MessageHandle) handleMessage(nodeQueue workqueue.RateLimitingInterface, - nodeStore cache.Store, hi hubio.CloudHubIO, - info *model.HubInfo, stopServe chan ExitCode, msgType string) { - key, quit := nodeQueue.Get() - if quit { - klog.Errorf("nodeQueue for node %s has shutdown", info.NodeID) - return - } - obj, exist, _ := nodeStore.GetByKey(key.(string)) - if !exist { - klog.Errorf("nodeStore for node %s doesn't exist", info.NodeID) - return - } + obj, exist, _ := nodeStore.GetByKey(key.(string)) + if !exist { + klog.Errorf("nodeStore for node %s doesn't exist", info.NodeID) + continue + } + msg := obj.(*beehiveModel.Message) - msg := obj.(*beehiveModel.Message) + if !model.IsToEdge(msg) { + klog.Infof("skip only to cloud event for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content) + continue + } + klog.V(4).Infof("event to send for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content) - if model.IsNodeStopped(msg) { - klog.Infof("node %s is stopped, will disconnect", info.NodeID) - stopServe <- nodeStop - return - } - if !model.IsToEdge(msg) { - klog.Infof("skip only to cloud event for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content) - return - } - klog.V(4).Infof("event to send for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content) + copyMsg := deepcopy(msg) + trimMessage(msg) - copyMsg := deepcopy(msg) - trimMessage(msg) - err := hi.SetWriteDeadline(time.Now().Add(time.Duration(mh.WriteTimeout) * time.Second)) - if err != nil { - klog.Errorf("SetWriteDeadline error, %s", err.Error()) - stopServe <- hubioWriteFail - return - } - if msgType == "listMessage" { - mh.send(hi, info, msg) - // delete successfully sent events from the queue/store - nodeStore.Delete(msg) - } else { - mh.sendMsg(hi, info, msg, copyMsg, nodeStore) - } + for { + conn, ok := mh.nodeConns.Load(info.NodeID) + if !ok { + time.Sleep(time.Second * 2) + continue + } + err := mh.sendMsg(conn.(hubio.CloudHubIO), info, msg, copyMsg, nodeStore) + if err != nil { + klog.Errorf("Failed to send event to node: %s, affected event: %s, err: %s", + info.NodeID, dumpMessageMetadata(msg), err.Error()) + nodeQueue.AddRateLimited(key.(string)) + time.Sleep(time.Second * 2) + } + break + } - nodeQueue.Done(key) + nodeQueue.Forget(key.(string)) + nodeQueue.Done(key) + } } -func (mh *MessageHandle) sendMsg(hi hubio.CloudHubIO, info *model.HubInfo, msg, copyMsg *beehiveModel.Message, nodeStore cache.Store) { +func (mh *MessageHandle) sendMsg(hi hubio.CloudHubIO, info *model.HubInfo, msg, copyMsg *beehiveModel.Message, nodeStore cache.Store) error { ackChan := make(chan struct{}) mh.MessageAcks.Store(msg.GetID(), ackChan) @@ -423,7 +444,10 @@ func (mh *MessageHandle) sendMsg(hi hubio.CloudHubIO, info *model.HubInfo, msg, retryInterval time.Duration = 5 ) ticker := time.NewTimer(retryInterval * time.Second) - mh.send(hi, info, msg) + err := mh.send(hi, info, msg) + if err != nil { + return err + } LOOP: for { @@ -435,20 +459,23 @@ LOOP: if retry == 4 { break LOOP } - mh.send(hi, info, msg) + err := mh.send(hi, info, msg) + if err != nil { + return err + } retry++ ticker.Reset(time.Second * retryInterval) } } + return nil } -func (mh *MessageHandle) send(hi hubio.CloudHubIO, info *model.HubInfo, msg *beehiveModel.Message) { +func (mh *MessageHandle) send(hi hubio.CloudHubIO, info *model.HubInfo, msg *beehiveModel.Message) error { err := mh.hubIoWrite(hi, info.NodeID, msg) if err != nil { - klog.Errorf("write error, connection for node %s will be closed, affected event %s, reason %s", - info.NodeID, dumpMessageMetadata(msg), err.Error()) - return + return err } + return nil } func (mh *MessageHandle) saveSuccessPoint(msg *beehiveModel.Message, info *model.HubInfo, nodeStore cache.Store) { @@ -513,6 +540,15 @@ func (mh *MessageHandle) deleteSuccessPoint(resourceNamespace, objectSyncName st mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Delete(objectSyncName, metav1.NewDeleteOptions(0)) } +func (mh *MessageHandle) getNodeConnection(nodeid string) (hubio.CloudHubIO, error) { + conn, ok := mh.nodeConns.Load(nodeid) + if !ok { + return nil, fmt.Errorf("Failed to get connection for node: %s", nodeid) + } + + return conn.(hubio.CloudHubIO), nil +} + func deepcopy(msg *beehiveModel.Message) *beehiveModel.Message { if msg == nil { return nil diff --git a/cloud/pkg/edgecontroller/controller/downstream.go b/cloud/pkg/edgecontroller/controller/downstream.go index 3d1c0c562..18227cb5c 100644 --- a/cloud/pkg/edgecontroller/controller/downstream.go +++ b/cloud/pkg/edgecontroller/controller/downstream.go @@ -266,6 +266,20 @@ func (dc *DownstreamController) syncEdgeNodes() { } case watch.Deleted: dc.lc.DeleteNode(node.ObjectMeta.Name) + + msg := model.NewMessage("") + resource, err := messagelayer.BuildResource(node.Name, "namespace", constants.ResourceNode, node.Name) + if err != nil { + klog.Warningf("Built message resource failed with error: %s", err) + break + } + msg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.DeleteOperation) + err = dc.messageLayer.Send(*msg) + if err != nil { + klog.Warningf("send message failed with error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource()) + } else { + klog.V(4).Infof("send message successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource()) + } default: // unsupported operation, no need to send to any node klog.Warningf("Node event type: %s unsupported", e.Type) diff --git a/edge/pkg/edged/edged_status.go b/edge/pkg/edged/edged_status.go index 6910e99fa..fa9614ce7 100644 --- a/edge/pkg/edged/edged_status.go +++ b/edge/pkg/edged/edged_status.go @@ -423,6 +423,10 @@ func (e *edged) syncNodeStatus() { if !e.registrationCompleted { if err := e.registerNode(); err != nil { klog.Errorf("Register node failed: %v", err) + return + } + if err := e.updateNodeStatus(); err != nil { + klog.Errorf("Unable to update node status: %v", err) } } else { if err := e.updateNodeStatus(); err != nil { |
