diff options
| author | KubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com> | 2024-07-18 15:34:01 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-07-18 15:34:01 +0800 |
| commit | 78aa2cce5285b00c3124ab342c7349bc068fc2d2 (patch) | |
| tree | 074eacde6a760dbd7d52e7e4dacc3f6f92ce902a | |
| parent | Merge pull request #5585 from spambot000/cloudhub_enhance (diff) | |
| parent | Supports high availability of router messages. (diff) | |
| download | kubeedge-78aa2cce5285b00c3124ab342c7349bc068fc2d2.tar.gz | |
Merge pull request #5619 from luomengY/harouter
Supports high availability of router messages.
| -rw-r--r-- | cloud/pkg/cloudhub/cloudhub.go | 10 | ||||
| -rw-r--r-- | cloud/pkg/cloudhub/handler/message_handler.go | 16 | ||||
| -rw-r--r-- | cloud/pkg/edgecontroller/controller/upstream.go | 39 | ||||
| -rw-r--r-- | cloud/pkg/router/listener/http.go | 244 | ||||
| -rw-r--r-- | cloud/pkg/router/provider/eventbus/eventbus.go | 9 | ||||
| -rw-r--r-- | cloud/pkg/router/provider/servicebus/servicebus.go | 9 | ||||
| -rw-r--r-- | common/constants/default.go | 3 |
7 files changed, 263 insertions, 67 deletions
diff --git a/cloud/pkg/cloudhub/cloudhub.go b/cloud/pkg/cloudhub/cloudhub.go index 21f8af98c..41e0debfa 100644 --- a/cloud/pkg/cloudhub/cloudhub.go +++ b/cloud/pkg/cloudhub/cloudhub.go @@ -1,6 +1,7 @@ package cloudhub import ( + "errors" "fmt" "os" @@ -25,6 +26,7 @@ import ( ) var DoneTLSTunnelCerts = make(chan bool, 1) +var sessionMgr *session.Manager type cloudHub struct { enable bool @@ -58,6 +60,7 @@ func newCloudHub(enable bool) *cloudHub { int(hubconfig.Config.KeepaliveInterval), sessionManager, client.GetCRDClient(), messageDispatcher, authorizer) + sessionMgr = sessionManager ch := &cloudHub{ enable: enable, @@ -151,3 +154,10 @@ func getAuthConfig() authorization.Config { VersionedInformerFactory: builtinInformerFactory, } } + +func GetSessionManager() (*session.Manager, error) { + if sessionMgr != nil { + return sessionMgr, nil + } + return nil, errors.New("cloudhub not initialized") +} diff --git a/cloud/pkg/cloudhub/handler/message_handler.go b/cloud/pkg/cloudhub/handler/message_handler.go index a3ac829a2..8f56c0cf1 100644 --- a/cloud/pkg/cloudhub/handler/message_handler.go +++ b/cloud/pkg/cloudhub/handler/message_handler.go @@ -17,8 +17,10 @@ limitations under the License. package handler import ( + "context" "time" + "github.com/avast/retry-go" "k8s.io/klog/v2" "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/authorization" @@ -26,6 +28,7 @@ import ( "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/common/model" "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/dispatcher" "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/session" + "github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller/controller" reliableclient "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" "github.com/kubeedge/viaduct/pkg/conn" "github.com/kubeedge/viaduct/pkg/mux" @@ -149,6 +152,19 @@ func (mh *messageHandler) HandleConnection(connection conn.Connection) { keepaliveInterval, nodeMessagePool, mh.reliableClient) // add node session to the session manager mh.SessionManager.AddSession(nodeSession) + go func() { + err := retry.Do( + func() error { + return controller.UpdateAnnotation(context.TODO(), nodeID) + }, + retry.Delay(1*time.Second), + retry.Attempts(3), + retry.DelayType(retry.FixedDelay), + ) + if err != nil { + klog.Errorf(err.Error()) + } + }() // start session for each edge node and it will keep running until // it encounters some Transport Error from underlying connection. diff --git a/cloud/pkg/edgecontroller/controller/upstream.go b/cloud/pkg/edgecontroller/controller/upstream.go index e4bd726b0..d249814fe 100644 --- a/cloud/pkg/edgecontroller/controller/upstream.go +++ b/cloud/pkg/edgecontroller/controller/upstream.go @@ -59,12 +59,14 @@ import ( "github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller/constants" "github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller/types" routerrule "github.com/kubeedge/kubeedge/cloud/pkg/router/rule" + comconstants "github.com/kubeedge/kubeedge/common/constants" common "github.com/kubeedge/kubeedge/common/constants" edgeapi "github.com/kubeedge/kubeedge/common/types" "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1" rulesv1 "github.com/kubeedge/kubeedge/pkg/apis/rules/v1" crdClientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" "github.com/kubeedge/kubeedge/pkg/metaserver/util" + kubeedgeutil "github.com/kubeedge/kubeedge/pkg/util" ) // SortedContainerStatuses define A type to help sort container statuses based on container names. @@ -491,7 +493,16 @@ func (uc *UpstreamController) createNode(nodeID, name string, node *v1.Node) (*v }() node.Name = name - node, err := uc.kubeClient.CoreV1().Nodes().Create(utilcontext.WithEdgeNode(context.Background(), nodeID), node, metaV1.CreateOptions{}) + hostnameOverride := kubeedgeutil.GetHostname() + localIP, err := kubeedgeutil.GetLocalIP(hostnameOverride) + if err != nil { + return nil, fmt.Errorf("failed to get cloudcore localIP with err:%v", err) + } + if node.Annotations == nil { + node.Annotations = make(map[string]string) + } + node.Annotations[common.EdgeMappingCloudKey] = localIP + node, err = uc.kubeClient.CoreV1().Nodes().Create(utilcontext.WithEdgeNode(context.Background(), nodeID), node, metaV1.CreateOptions{}) if err == nil && len(kubernetesReversedLabels) > 0 { patchBytes, err := json.Marshal(map[string]interface{}{"metadata": map[string]interface{}{"labels": kubernetesReversedLabels}}) if err == nil { @@ -1509,6 +1520,32 @@ func (uc *UpstreamController) nodeMsgResponse(nodeName, namespace, content strin } } +func UpdateAnnotation(ctx context.Context, nodeName string) error { + node, err := client.GetKubeClient().CoreV1().Nodes().Get(ctx, nodeName, metaV1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get node:%s,err:%v", nodeName, err) + } + hostnameOverride := kubeedgeutil.GetHostname() + localIP, err := kubeedgeutil.GetLocalIP(hostnameOverride) + if err != nil { + return fmt.Errorf("failed to get cloudcore localIP with err:%v", err) + } + if value, ok := node.Annotations[comconstants.EdgeMappingCloudKey]; ok { + if value == localIP { + return nil + } + } + if node.Annotations == nil { + node.Annotations = make(map[string]string) + } + node.Annotations[comconstants.EdgeMappingCloudKey] = localIP + _, err = client.GetKubeClient().CoreV1().Nodes().Update(ctx, node, metaV1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update node:%s with err:%v", nodeName, err) + } + return nil +} + // NewUpstreamController create UpstreamController from config func NewUpstreamController(config *v1alpha1.EdgeController, factory k8sinformer.SharedInformerFactory) (*UpstreamController, error) { uc := &UpstreamController{ diff --git a/cloud/pkg/router/listener/http.go b/cloud/pkg/router/listener/http.go index b7b703646..688db9be5 100644 --- a/cloud/pkg/router/listener/http.go +++ b/cloud/pkg/router/listener/http.go @@ -1,18 +1,27 @@ package listener import ( + "bytes" + "context" + "errors" "fmt" "io" "net/http" + "strconv" "strings" "sync" "time" + "github.com/avast/retry-go" "github.com/google/uuid" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" + "github.com/kubeedge/kubeedge/cloud/pkg/common/client" routerConfig "github.com/kubeedge/kubeedge/cloud/pkg/router/config" "github.com/kubeedge/kubeedge/cloud/pkg/router/utils" + "github.com/kubeedge/kubeedge/common/constants" + "github.com/kubeedge/kubeedge/pkg/util" ) const MaxMessageBytes = 12 * (1 << 20) @@ -98,85 +107,132 @@ func (rh *RestHandler) matchedPath(uri string) (string, bool) { } func (rh *RestHandler) httpHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Transfer-Encoding", "chunked") uriSections := strings.Split(r.RequestURI, "/") if len(uriSections) < 2 { // URL format incorrect - klog.Warningf("url format incorrect: %s", r.URL.String()) - w.WriteHeader(http.StatusNotFound) - if _, err := w.Write([]byte("Request error")); err != nil { - klog.Errorf("Response write error: %s, %s", r.RequestURI, err.Error()) - } + err := fmt.Errorf("url format incorrect: %s", r.URL.String()) + writeErr(w, r, http.StatusNotFound, err) return } - matchPath, exist := rh.matchedPath(r.RequestURI) - if !exist { - klog.Warningf("URL format incorrect: %s", r.RequestURI) - w.WriteHeader(http.StatusNotFound) - if _, err := w.Write([]byte("Request error")); err != nil { - klog.Errorf("Response write error: %s, %s", r.RequestURI, err.Error()) - } - return - } - v, ok := rh.handlers.Load(matchPath) - if !ok { - klog.Warningf("No matched handler for path: %s", matchPath) - return - } - handle, ok := v.(Handle) - if !ok { - klog.Errorf("invalid convert to Handle. match path: %s", matchPath) - return - } aReaderCloser := http.MaxBytesReader(w, r.Body, MaxMessageBytes) b, err := io.ReadAll(aReaderCloser) if err != nil { - klog.Errorf("request error, write result: %v", err) - w.WriteHeader(http.StatusBadRequest) - if _, err = w.Write([]byte("Request error,body is null")); err != nil { - klog.Errorf("Response write error: %s, %s", r.RequestURI, err.Error()) - } + writeErr(w, r, http.StatusBadRequest, err) return } - if isNodeName(uriSections[1]) { - params := make(map[string]interface{}) - msgID := uuid.New().String() - params["messageID"] = msgID - params["request"] = r - params["timeout"] = rh.restTimeout - params["data"] = b + edgeNodeName := uriSections[1] + err = retry.Do( + func() error { + targetCloudCoreIP, err := GetEdgeToCloudCoreIP(r.Context(), edgeNodeName) + if err != nil { + return err + } - v, err := handle(params) - if err != nil { - klog.Errorf("handle request error, msg id: %s, err: %v", msgID, err) - return - } - response, ok := v.(*http.Response) - if !ok { - klog.Errorf("response convert error, msg id: %s", msgID) - return - } - body, err := io.ReadAll(io.LimitReader(response.Body, MaxMessageBytes)) - if err != nil { - klog.Errorf("response body read error, msg id: %s, reason: %v", msgID, err) - return - } - for key, values := range response.Header { - for _, value := range values { - w.Header().Add(key, value) + hostnameOverride := util.GetHostname() + localIP, err := util.GetLocalIP(hostnameOverride) + if err != nil { + return fmt.Errorf("failed to get cloudcore localIP with err:%v", err) } - } - w.WriteHeader(response.StatusCode) - if _, err = w.Write(body); err != nil { - klog.Errorf("response body write error, msg id: %s, reason: %v", msgID, err) - return - } - klog.Infof("response to client, msg id: %s, write result: success", msgID) - } else { - w.WriteHeader(http.StatusNotFound) - _, err = w.Write([]byte("No rule match")) - klog.Infof("no rule match, write result: %v", err) + if targetCloudCoreIP != localIP { + var url string + if r.TLS != nil { + url = "https://" + targetCloudCoreIP + } else { + url = "http://" + targetCloudCoreIP + } + url += ":" + strconv.Itoa(rh.port) + r.RequestURI + reqBody := io.NopCloser(bytes.NewBuffer(b)) + forwardReq, err := http.NewRequest(r.Method, url, reqBody) + if err != nil { + return fmt.Errorf("failed to create forward request: %v", err) + } + + forwardReq.TLS = r.TLS + forwardReq.Header = make(http.Header) + for key, values := range r.Header { + forwardReq.Header[key] = values + } + return requestForward(targetCloudCoreIP, w, forwardReq) + } + + matchPath, exist := rh.matchedPath(r.RequestURI) + if !exist { + klog.Warningf("URL format incorrect: %s", r.RequestURI) + w.WriteHeader(http.StatusNotFound) + if _, err := w.Write([]byte("Request error")); err != nil { + klog.Errorf("Response write error: %s, %s", r.RequestURI, err.Error()) + } + return nil + } + v, ok := rh.handlers.Load(matchPath) + if !ok { + klog.Warningf("No matched handler for path: %s", matchPath) + return nil + } + handle, ok := v.(Handle) + if !ok { + klog.Errorf("invalid convert to Handle. match path: %s", matchPath) + return nil + } + + if isNodeName(uriSections[1]) { + params := make(map[string]interface{}) + msgID := uuid.New().String() + params["messageID"] = msgID + params["request"] = r + params["timeout"] = rh.restTimeout + params["data"] = b + + v, err := handle(params) + if err != nil { + klog.Errorf("handle request error, msg id: %s, err: %v", msgID, err) + return nil + } + response, ok := v.(*http.Response) + if !ok { + klog.Errorf("response convert error, msg id: %s", msgID) + return nil + } + body, err := io.ReadAll(io.LimitReader(response.Body, MaxMessageBytes)) + if err != nil { + klog.Errorf("response body read error, msg id: %s, reason: %v", msgID, err) + return nil + } + for key, values := range response.Header { + for _, value := range values { + w.Header().Add(key, value) + } + } + + if response.StatusCode != http.StatusOK { + errMsg := string(body) + return errors.New(errMsg) + } + + w.WriteHeader(response.StatusCode) + if _, err = w.Write(body); err != nil { + klog.Errorf("response body write error, msg id: %s, reason: %v", msgID, err) + return nil + } + klog.Infof("response to client, msg id: %s, write result: success", msgID) + return nil + } + w.WriteHeader(http.StatusNotFound) + _, err = w.Write([]byte("No rule match")) + klog.Infof("no rule match, write result: %v", err) + return nil + }, + retry.Delay(1*time.Second), + retry.Attempts(3), + retry.DelayType(retry.FixedDelay), + ) + + if err != nil { + writeErr(w, r, http.StatusInternalServerError, err) + return } } @@ -196,3 +252,61 @@ func (rh *RestHandler) IsMatch(key interface{}, message interface{}) bool { func isNodeName(_ string) bool { return true } + +func GetEdgeToCloudCoreIP(ctx context.Context, nodeName string) (string, error) { + node, err := client.GetKubeClient().CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return "", fmt.Errorf("failed to get node:%s,err:%v", nodeName, err) + } + cloudCoreIP, ok := node.Annotations[constants.EdgeMappingCloudKey] + if !ok { + return "", fmt.Errorf("no corresponding cloudcore was found for edgeNode:%s", nodeName) + } + return cloudCoreIP, nil +} + +func requestForward(targetCloudCoreIP string, w http.ResponseWriter, forwardReq *http.Request) error { + httpClient := &http.Client{} + resp, err := httpClient.Do(forwardReq) + if err != nil { + return fmt.Errorf("failed to forward request: %v", err) + } + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + klog.Errorf("failed to close resp.Body with err:%v", err) + } + }(resp.Body) + + for key, values := range resp.Header { + for _, value := range values { + w.Header().Add(key, value) + } + } + + if resp.StatusCode != http.StatusOK { + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading body:%v", err) + } + errMsg := string(bodyBytes) + return errors.New(errMsg) + } + + w.WriteHeader(resp.StatusCode) + _, err = io.Copy(w, resp.Body) + if err != nil { + return fmt.Errorf("failed to copy resp.Body to writer with err:%v", err) + } + + klog.Infof("forwarded request to %s successfully", targetCloudCoreIP) + return nil +} + +func writeErr(w http.ResponseWriter, r *http.Request, statusCode int, err error) { + klog.Errorf(err.Error()) + w.WriteHeader(statusCode) + if _, err := w.Write([]byte(err.Error())); err != nil { + klog.Errorf("Response write error: %s, %s", r.RequestURI, err.Error()) + } +} diff --git a/cloud/pkg/router/provider/eventbus/eventbus.go b/cloud/pkg/router/provider/eventbus/eventbus.go index e6ca0012b..955587c23 100644 --- a/cloud/pkg/router/provider/eventbus/eventbus.go +++ b/cloud/pkg/router/provider/eventbus/eventbus.go @@ -9,6 +9,7 @@ import ( beehiveContext "github.com/kubeedge/beehive/pkg/core/context" "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub" "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" "github.com/kubeedge/kubeedge/cloud/pkg/router/constants" "github.com/kubeedge/kubeedge/cloud/pkg/router/listener" @@ -143,6 +144,14 @@ func (eb *EventBus) GoToTarget(data map[string]interface{}, _ chan struct{}) (in msg.SetResourceOperation(resource, publishOperation) msg.FillBody(string(body)) msg.SetRoute(modules.RouterSourceEventBus, modules.UserGroup) + + sessionMgr, err := cloudhub.GetSessionManager() + if err != nil { + return nil, err + } + if _, exists := sessionMgr.GetSession(nodeName); !exists { + return nil, fmt.Errorf("cloudcore doesn't have session for node:%s", nodeName) + } beehiveContext.Send(modules.CloudHubModuleName, *msg) return nil, nil } diff --git a/cloud/pkg/router/provider/servicebus/servicebus.go b/cloud/pkg/router/provider/servicebus/servicebus.go index 250f39786..557ab6e51 100644 --- a/cloud/pkg/router/provider/servicebus/servicebus.go +++ b/cloud/pkg/router/provider/servicebus/servicebus.go @@ -12,6 +12,7 @@ import ( beehiveContext "github.com/kubeedge/beehive/pkg/core/context" "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub" "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" "github.com/kubeedge/kubeedge/cloud/pkg/router/constants" "github.com/kubeedge/kubeedge/cloud/pkg/router/listener" @@ -146,6 +147,14 @@ func (sb *ServiceBus) GoToTarget(data map[string]interface{}, stop chan struct{} msg.SetResourceOperation(resource, request.Method) msg.FillBody(request) msg.SetRoute(modules.RouterSourceServiceBus, modules.UserGroup) + + sessionMgr, err := cloudhub.GetSessionManager() + if err != nil { + return nil, err + } + if _, exists := sessionMgr.GetSession(nodeName); !exists { + return nil, fmt.Errorf("cloudcore doesn't have session for node:%s", nodeName) + } beehiveContext.Send(modules.CloudHubModuleName, *msg) if stop != nil { listener.MessageHandlerInstance.SetCallback(messageID, func(message *model.Message) { diff --git a/common/constants/default.go b/common/constants/default.go index 5e254d8fa..db91af7b3 100644 --- a/common/constants/default.go +++ b/common/constants/default.go @@ -16,7 +16,8 @@ const ( SystemName = "kubeedge" SystemNamespace = SystemName - CloudConfigMapName = "cloudcore" + CloudConfigMapName = "cloudcore" + EdgeMappingCloudKey = "cloudcore" // runtime DockerContainerRuntime = "docker" |
