summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com>2024-07-18 15:34:01 +0800
committerGitHub <noreply@github.com>2024-07-18 15:34:01 +0800
commit78aa2cce5285b00c3124ab342c7349bc068fc2d2 (patch)
tree074eacde6a760dbd7d52e7e4dacc3f6f92ce902a
parentMerge pull request #5585 from spambot000/cloudhub_enhance (diff)
parentSupports high availability of router messages. (diff)
downloadkubeedge-78aa2cce5285b00c3124ab342c7349bc068fc2d2.tar.gz
Merge pull request #5619 from luomengY/harouter
Supports high availability of router messages.
-rw-r--r--cloud/pkg/cloudhub/cloudhub.go10
-rw-r--r--cloud/pkg/cloudhub/handler/message_handler.go16
-rw-r--r--cloud/pkg/edgecontroller/controller/upstream.go39
-rw-r--r--cloud/pkg/router/listener/http.go244
-rw-r--r--cloud/pkg/router/provider/eventbus/eventbus.go9
-rw-r--r--cloud/pkg/router/provider/servicebus/servicebus.go9
-rw-r--r--common/constants/default.go3
7 files changed, 263 insertions, 67 deletions
diff --git a/cloud/pkg/cloudhub/cloudhub.go b/cloud/pkg/cloudhub/cloudhub.go
index 21f8af98c..41e0debfa 100644
--- a/cloud/pkg/cloudhub/cloudhub.go
+++ b/cloud/pkg/cloudhub/cloudhub.go
@@ -1,6 +1,7 @@
package cloudhub
import (
+ "errors"
"fmt"
"os"
@@ -25,6 +26,7 @@ import (
)
var DoneTLSTunnelCerts = make(chan bool, 1)
+var sessionMgr *session.Manager
type cloudHub struct {
enable bool
@@ -58,6 +60,7 @@ func newCloudHub(enable bool) *cloudHub {
int(hubconfig.Config.KeepaliveInterval),
sessionManager, client.GetCRDClient(),
messageDispatcher, authorizer)
+ sessionMgr = sessionManager
ch := &cloudHub{
enable: enable,
@@ -151,3 +154,10 @@ func getAuthConfig() authorization.Config {
VersionedInformerFactory: builtinInformerFactory,
}
}
+
+func GetSessionManager() (*session.Manager, error) {
+ if sessionMgr != nil {
+ return sessionMgr, nil
+ }
+ return nil, errors.New("cloudhub not initialized")
+}
diff --git a/cloud/pkg/cloudhub/handler/message_handler.go b/cloud/pkg/cloudhub/handler/message_handler.go
index a3ac829a2..8f56c0cf1 100644
--- a/cloud/pkg/cloudhub/handler/message_handler.go
+++ b/cloud/pkg/cloudhub/handler/message_handler.go
@@ -17,8 +17,10 @@ limitations under the License.
package handler
import (
+ "context"
"time"
+ "github.com/avast/retry-go"
"k8s.io/klog/v2"
"github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/authorization"
@@ -26,6 +28,7 @@ import (
"github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/common/model"
"github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/dispatcher"
"github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/session"
+ "github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller/controller"
reliableclient "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned"
"github.com/kubeedge/viaduct/pkg/conn"
"github.com/kubeedge/viaduct/pkg/mux"
@@ -149,6 +152,19 @@ func (mh *messageHandler) HandleConnection(connection conn.Connection) {
keepaliveInterval, nodeMessagePool, mh.reliableClient)
// add node session to the session manager
mh.SessionManager.AddSession(nodeSession)
+ go func() {
+ err := retry.Do(
+ func() error {
+ return controller.UpdateAnnotation(context.TODO(), nodeID)
+ },
+ retry.Delay(1*time.Second),
+ retry.Attempts(3),
+ retry.DelayType(retry.FixedDelay),
+ )
+ if err != nil {
+ klog.Errorf(err.Error())
+ }
+ }()
// start session for each edge node and it will keep running until
// it encounters some Transport Error from underlying connection.
diff --git a/cloud/pkg/edgecontroller/controller/upstream.go b/cloud/pkg/edgecontroller/controller/upstream.go
index e4bd726b0..d249814fe 100644
--- a/cloud/pkg/edgecontroller/controller/upstream.go
+++ b/cloud/pkg/edgecontroller/controller/upstream.go
@@ -59,12 +59,14 @@ import (
"github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller/constants"
"github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller/types"
routerrule "github.com/kubeedge/kubeedge/cloud/pkg/router/rule"
+ comconstants "github.com/kubeedge/kubeedge/common/constants"
common "github.com/kubeedge/kubeedge/common/constants"
edgeapi "github.com/kubeedge/kubeedge/common/types"
"github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1"
rulesv1 "github.com/kubeedge/kubeedge/pkg/apis/rules/v1"
crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned"
"github.com/kubeedge/kubeedge/pkg/metaserver/util"
+ kubeedgeutil "github.com/kubeedge/kubeedge/pkg/util"
)
// SortedContainerStatuses define A type to help sort container statuses based on container names.
@@ -491,7 +493,16 @@ func (uc *UpstreamController) createNode(nodeID, name string, node *v1.Node) (*v
}()
node.Name = name
- node, err := uc.kubeClient.CoreV1().Nodes().Create(utilcontext.WithEdgeNode(context.Background(), nodeID), node, metaV1.CreateOptions{})
+ hostnameOverride := kubeedgeutil.GetHostname()
+ localIP, err := kubeedgeutil.GetLocalIP(hostnameOverride)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get cloudcore localIP with err:%v", err)
+ }
+ if node.Annotations == nil {
+ node.Annotations = make(map[string]string)
+ }
+ node.Annotations[common.EdgeMappingCloudKey] = localIP
+ node, err = uc.kubeClient.CoreV1().Nodes().Create(utilcontext.WithEdgeNode(context.Background(), nodeID), node, metaV1.CreateOptions{})
if err == nil && len(kubernetesReversedLabels) > 0 {
patchBytes, err := json.Marshal(map[string]interface{}{"metadata": map[string]interface{}{"labels": kubernetesReversedLabels}})
if err == nil {
@@ -1509,6 +1520,32 @@ func (uc *UpstreamController) nodeMsgResponse(nodeName, namespace, content strin
}
}
+func UpdateAnnotation(ctx context.Context, nodeName string) error {
+ node, err := client.GetKubeClient().CoreV1().Nodes().Get(ctx, nodeName, metaV1.GetOptions{})
+ if err != nil {
+ return fmt.Errorf("failed to get node:%s,err:%v", nodeName, err)
+ }
+ hostnameOverride := kubeedgeutil.GetHostname()
+ localIP, err := kubeedgeutil.GetLocalIP(hostnameOverride)
+ if err != nil {
+ return fmt.Errorf("failed to get cloudcore localIP with err:%v", err)
+ }
+ if value, ok := node.Annotations[comconstants.EdgeMappingCloudKey]; ok {
+ if value == localIP {
+ return nil
+ }
+ }
+ if node.Annotations == nil {
+ node.Annotations = make(map[string]string)
+ }
+ node.Annotations[comconstants.EdgeMappingCloudKey] = localIP
+ _, err = client.GetKubeClient().CoreV1().Nodes().Update(ctx, node, metaV1.UpdateOptions{})
+ if err != nil {
+ return fmt.Errorf("failed to update node:%s with err:%v", nodeName, err)
+ }
+ return nil
+}
+
// NewUpstreamController create UpstreamController from config
func NewUpstreamController(config *v1alpha1.EdgeController, factory k8sinformer.SharedInformerFactory) (*UpstreamController, error) {
uc := &UpstreamController{
diff --git a/cloud/pkg/router/listener/http.go b/cloud/pkg/router/listener/http.go
index b7b703646..688db9be5 100644
--- a/cloud/pkg/router/listener/http.go
+++ b/cloud/pkg/router/listener/http.go
@@ -1,18 +1,27 @@
package listener
import (
+ "bytes"
+ "context"
+ "errors"
"fmt"
"io"
"net/http"
+ "strconv"
"strings"
"sync"
"time"
+ "github.com/avast/retry-go"
"github.com/google/uuid"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/client"
routerConfig "github.com/kubeedge/kubeedge/cloud/pkg/router/config"
"github.com/kubeedge/kubeedge/cloud/pkg/router/utils"
+ "github.com/kubeedge/kubeedge/common/constants"
+ "github.com/kubeedge/kubeedge/pkg/util"
)
const MaxMessageBytes = 12 * (1 << 20)
@@ -98,85 +107,132 @@ func (rh *RestHandler) matchedPath(uri string) (string, bool) {
}
func (rh *RestHandler) httpHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Transfer-Encoding", "chunked")
uriSections := strings.Split(r.RequestURI, "/")
if len(uriSections) < 2 {
// URL format incorrect
- klog.Warningf("url format incorrect: %s", r.URL.String())
- w.WriteHeader(http.StatusNotFound)
- if _, err := w.Write([]byte("Request error")); err != nil {
- klog.Errorf("Response write error: %s, %s", r.RequestURI, err.Error())
- }
+ err := fmt.Errorf("url format incorrect: %s", r.URL.String())
+ writeErr(w, r, http.StatusNotFound, err)
return
}
- matchPath, exist := rh.matchedPath(r.RequestURI)
- if !exist {
- klog.Warningf("URL format incorrect: %s", r.RequestURI)
- w.WriteHeader(http.StatusNotFound)
- if _, err := w.Write([]byte("Request error")); err != nil {
- klog.Errorf("Response write error: %s, %s", r.RequestURI, err.Error())
- }
- return
- }
- v, ok := rh.handlers.Load(matchPath)
- if !ok {
- klog.Warningf("No matched handler for path: %s", matchPath)
- return
- }
- handle, ok := v.(Handle)
- if !ok {
- klog.Errorf("invalid convert to Handle. match path: %s", matchPath)
- return
- }
aReaderCloser := http.MaxBytesReader(w, r.Body, MaxMessageBytes)
b, err := io.ReadAll(aReaderCloser)
if err != nil {
- klog.Errorf("request error, write result: %v", err)
- w.WriteHeader(http.StatusBadRequest)
- if _, err = w.Write([]byte("Request error,body is null")); err != nil {
- klog.Errorf("Response write error: %s, %s", r.RequestURI, err.Error())
- }
+ writeErr(w, r, http.StatusBadRequest, err)
return
}
- if isNodeName(uriSections[1]) {
- params := make(map[string]interface{})
- msgID := uuid.New().String()
- params["messageID"] = msgID
- params["request"] = r
- params["timeout"] = rh.restTimeout
- params["data"] = b
+ edgeNodeName := uriSections[1]
+ err = retry.Do(
+ func() error {
+ targetCloudCoreIP, err := GetEdgeToCloudCoreIP(r.Context(), edgeNodeName)
+ if err != nil {
+ return err
+ }
- v, err := handle(params)
- if err != nil {
- klog.Errorf("handle request error, msg id: %s, err: %v", msgID, err)
- return
- }
- response, ok := v.(*http.Response)
- if !ok {
- klog.Errorf("response convert error, msg id: %s", msgID)
- return
- }
- body, err := io.ReadAll(io.LimitReader(response.Body, MaxMessageBytes))
- if err != nil {
- klog.Errorf("response body read error, msg id: %s, reason: %v", msgID, err)
- return
- }
- for key, values := range response.Header {
- for _, value := range values {
- w.Header().Add(key, value)
+ hostnameOverride := util.GetHostname()
+ localIP, err := util.GetLocalIP(hostnameOverride)
+ if err != nil {
+ return fmt.Errorf("failed to get cloudcore localIP with err:%v", err)
}
- }
- w.WriteHeader(response.StatusCode)
- if _, err = w.Write(body); err != nil {
- klog.Errorf("response body write error, msg id: %s, reason: %v", msgID, err)
- return
- }
- klog.Infof("response to client, msg id: %s, write result: success", msgID)
- } else {
- w.WriteHeader(http.StatusNotFound)
- _, err = w.Write([]byte("No rule match"))
- klog.Infof("no rule match, write result: %v", err)
+ if targetCloudCoreIP != localIP {
+ var url string
+ if r.TLS != nil {
+ url = "https://" + targetCloudCoreIP
+ } else {
+ url = "http://" + targetCloudCoreIP
+ }
+ url += ":" + strconv.Itoa(rh.port) + r.RequestURI
+ reqBody := io.NopCloser(bytes.NewBuffer(b))
+ forwardReq, err := http.NewRequest(r.Method, url, reqBody)
+ if err != nil {
+ return fmt.Errorf("failed to create forward request: %v", err)
+ }
+
+ forwardReq.TLS = r.TLS
+ forwardReq.Header = make(http.Header)
+ for key, values := range r.Header {
+ forwardReq.Header[key] = values
+ }
+ return requestForward(targetCloudCoreIP, w, forwardReq)
+ }
+
+ matchPath, exist := rh.matchedPath(r.RequestURI)
+ if !exist {
+ klog.Warningf("URL format incorrect: %s", r.RequestURI)
+ w.WriteHeader(http.StatusNotFound)
+ if _, err := w.Write([]byte("Request error")); err != nil {
+ klog.Errorf("Response write error: %s, %s", r.RequestURI, err.Error())
+ }
+ return nil
+ }
+ v, ok := rh.handlers.Load(matchPath)
+ if !ok {
+ klog.Warningf("No matched handler for path: %s", matchPath)
+ return nil
+ }
+ handle, ok := v.(Handle)
+ if !ok {
+ klog.Errorf("invalid convert to Handle. match path: %s", matchPath)
+ return nil
+ }
+
+ if isNodeName(uriSections[1]) {
+ params := make(map[string]interface{})
+ msgID := uuid.New().String()
+ params["messageID"] = msgID
+ params["request"] = r
+ params["timeout"] = rh.restTimeout
+ params["data"] = b
+
+ v, err := handle(params)
+ if err != nil {
+ klog.Errorf("handle request error, msg id: %s, err: %v", msgID, err)
+ return nil
+ }
+ response, ok := v.(*http.Response)
+ if !ok {
+ klog.Errorf("response convert error, msg id: %s", msgID)
+ return nil
+ }
+ body, err := io.ReadAll(io.LimitReader(response.Body, MaxMessageBytes))
+ if err != nil {
+ klog.Errorf("response body read error, msg id: %s, reason: %v", msgID, err)
+ return nil
+ }
+ for key, values := range response.Header {
+ for _, value := range values {
+ w.Header().Add(key, value)
+ }
+ }
+
+ if response.StatusCode != http.StatusOK {
+ errMsg := string(body)
+ return errors.New(errMsg)
+ }
+
+ w.WriteHeader(response.StatusCode)
+ if _, err = w.Write(body); err != nil {
+ klog.Errorf("response body write error, msg id: %s, reason: %v", msgID, err)
+ return nil
+ }
+ klog.Infof("response to client, msg id: %s, write result: success", msgID)
+ return nil
+ }
+ w.WriteHeader(http.StatusNotFound)
+ _, err = w.Write([]byte("No rule match"))
+ klog.Infof("no rule match, write result: %v", err)
+ return nil
+ },
+ retry.Delay(1*time.Second),
+ retry.Attempts(3),
+ retry.DelayType(retry.FixedDelay),
+ )
+
+ if err != nil {
+ writeErr(w, r, http.StatusInternalServerError, err)
+ return
}
}
@@ -196,3 +252,61 @@ func (rh *RestHandler) IsMatch(key interface{}, message interface{}) bool {
func isNodeName(_ string) bool {
return true
}
+
+func GetEdgeToCloudCoreIP(ctx context.Context, nodeName string) (string, error) {
+ node, err := client.GetKubeClient().CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
+ if err != nil {
+ return "", fmt.Errorf("failed to get node:%s,err:%v", nodeName, err)
+ }
+ cloudCoreIP, ok := node.Annotations[constants.EdgeMappingCloudKey]
+ if !ok {
+ return "", fmt.Errorf("no corresponding cloudcore was found for edgeNode:%s", nodeName)
+ }
+ return cloudCoreIP, nil
+}
+
+func requestForward(targetCloudCoreIP string, w http.ResponseWriter, forwardReq *http.Request) error {
+ httpClient := &http.Client{}
+ resp, err := httpClient.Do(forwardReq)
+ if err != nil {
+ return fmt.Errorf("failed to forward request: %v", err)
+ }
+ defer func(Body io.ReadCloser) {
+ err := Body.Close()
+ if err != nil {
+ klog.Errorf("failed to close resp.Body with err:%v", err)
+ }
+ }(resp.Body)
+
+ for key, values := range resp.Header {
+ for _, value := range values {
+ w.Header().Add(key, value)
+ }
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ bodyBytes, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return fmt.Errorf("error reading body:%v", err)
+ }
+ errMsg := string(bodyBytes)
+ return errors.New(errMsg)
+ }
+
+ w.WriteHeader(resp.StatusCode)
+ _, err = io.Copy(w, resp.Body)
+ if err != nil {
+ return fmt.Errorf("failed to copy resp.Body to writer with err:%v", err)
+ }
+
+ klog.Infof("forwarded request to %s successfully", targetCloudCoreIP)
+ return nil
+}
+
+func writeErr(w http.ResponseWriter, r *http.Request, statusCode int, err error) {
+ klog.Errorf(err.Error())
+ w.WriteHeader(statusCode)
+ if _, err := w.Write([]byte(err.Error())); err != nil {
+ klog.Errorf("Response write error: %s, %s", r.RequestURI, err.Error())
+ }
+}
diff --git a/cloud/pkg/router/provider/eventbus/eventbus.go b/cloud/pkg/router/provider/eventbus/eventbus.go
index e6ca0012b..955587c23 100644
--- a/cloud/pkg/router/provider/eventbus/eventbus.go
+++ b/cloud/pkg/router/provider/eventbus/eventbus.go
@@ -9,6 +9,7 @@ import (
beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
"github.com/kubeedge/beehive/pkg/core/model"
+ "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub"
"github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
"github.com/kubeedge/kubeedge/cloud/pkg/router/constants"
"github.com/kubeedge/kubeedge/cloud/pkg/router/listener"
@@ -143,6 +144,14 @@ func (eb *EventBus) GoToTarget(data map[string]interface{}, _ chan struct{}) (in
msg.SetResourceOperation(resource, publishOperation)
msg.FillBody(string(body))
msg.SetRoute(modules.RouterSourceEventBus, modules.UserGroup)
+
+ sessionMgr, err := cloudhub.GetSessionManager()
+ if err != nil {
+ return nil, err
+ }
+ if _, exists := sessionMgr.GetSession(nodeName); !exists {
+ return nil, fmt.Errorf("cloudcore doesn't have session for node:%s", nodeName)
+ }
beehiveContext.Send(modules.CloudHubModuleName, *msg)
return nil, nil
}
diff --git a/cloud/pkg/router/provider/servicebus/servicebus.go b/cloud/pkg/router/provider/servicebus/servicebus.go
index 250f39786..557ab6e51 100644
--- a/cloud/pkg/router/provider/servicebus/servicebus.go
+++ b/cloud/pkg/router/provider/servicebus/servicebus.go
@@ -12,6 +12,7 @@ import (
beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
"github.com/kubeedge/beehive/pkg/core/model"
+ "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub"
"github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
"github.com/kubeedge/kubeedge/cloud/pkg/router/constants"
"github.com/kubeedge/kubeedge/cloud/pkg/router/listener"
@@ -146,6 +147,14 @@ func (sb *ServiceBus) GoToTarget(data map[string]interface{}, stop chan struct{}
msg.SetResourceOperation(resource, request.Method)
msg.FillBody(request)
msg.SetRoute(modules.RouterSourceServiceBus, modules.UserGroup)
+
+ sessionMgr, err := cloudhub.GetSessionManager()
+ if err != nil {
+ return nil, err
+ }
+ if _, exists := sessionMgr.GetSession(nodeName); !exists {
+ return nil, fmt.Errorf("cloudcore doesn't have session for node:%s", nodeName)
+ }
beehiveContext.Send(modules.CloudHubModuleName, *msg)
if stop != nil {
listener.MessageHandlerInstance.SetCallback(messageID, func(message *model.Message) {
diff --git a/common/constants/default.go b/common/constants/default.go
index 5e254d8fa..db91af7b3 100644
--- a/common/constants/default.go
+++ b/common/constants/default.go
@@ -16,7 +16,8 @@ const (
SystemName = "kubeedge"
SystemNamespace = SystemName
- CloudConfigMapName = "cloudcore"
+ CloudConfigMapName = "cloudcore"
+ EdgeMappingCloudKey = "cloudcore"
// runtime
DockerContainerRuntime = "docker"