summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com>2024-07-18 20:51:01 +0800
committerGitHub <noreply@github.com>2024-07-18 20:51:01 +0800
commitbc6a2c9c0e9fbae9e51d2eb3a74a2ffd9617ba78 (patch)
treecb92b02996b697435b6db913cbf7a4cdc5769b8e
parentMerge pull request #5732 from tangming1996/automated-cherry-pick-of-#5523-ups... (diff)
parentedge node offline pod‘s status update (diff)
downloadkubeedge-origin/release-1.17.tar.gz
Merge pull request #5740 from luomengY/automated-cherry-pick-of-#5556-upstream-release-1.17v1.17.2origin/release-1.17
Automated cherry pick of #5556:fix:edge node offline pod‘s status update
-rw-r--r--edge/pkg/metamanager/client/pod.go5
-rw-r--r--edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/store.go45
-rw-r--r--edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/util/patch.go178
-rw-r--r--edge/pkg/metamanager/process.go7
-rw-r--r--go.mod2
5 files changed, 234 insertions, 3 deletions
diff --git a/edge/pkg/metamanager/client/pod.go b/edge/pkg/metamanager/client/pod.go
index 7ebc38fad..036e13785 100644
--- a/edge/pkg/metamanager/client/pod.go
+++ b/edge/pkg/metamanager/client/pod.go
@@ -2,6 +2,7 @@ package client
import (
"encoding/json"
+ "errors"
"fmt"
"reflect"
"strings"
@@ -127,6 +128,10 @@ func (c *pods) Patch(name string, patchBytes []byte) (*corev1.Pod, error) {
return nil, fmt.Errorf("parse message to pod failed, err: %v", err)
}
+ if resp.Router.Operation == model.ResponseErrorOperation {
+ return nil, errors.New(string(content))
+ }
+
return handlePodResp(resource, content)
}
diff --git a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/store.go b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/store.go
index 70312a1ce..f42d19982 100644
--- a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/store.go
+++ b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/store.go
@@ -14,9 +14,13 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
+ "k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
+ "github.com/kubeedge/beehive/pkg/core/model"
+ "github.com/kubeedge/kubeedge/edge/pkg/metamanager/dao"
"github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator"
+ patchutil "github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/util"
"github.com/kubeedge/kubeedge/pkg/metaserver"
"github.com/kubeedge/kubeedge/pkg/metaserver/util"
)
@@ -63,7 +67,16 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ob
return err
}
unstrObj := objPtr.(*unstructured.Unstructured)
- return runtime.DecodeInto(s.codec, []byte((*resp.Kvs)[0].Value), unstrObj)
+ if err = runtime.DecodeInto(s.codec, []byte((*resp.Kvs)[0].Value), unstrObj); err != nil {
+ return err
+ }
+
+ if unstrObj.GetKind() == "Pod" {
+ if err = MergePatchedResource(ctx, unstrObj, model.ResourceTypePodPatch); err != nil {
+ return err
+ }
+ }
+ return nil
}
func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
@@ -91,6 +104,12 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
return err
}
+ if unstrObj.GetKind() == "Pod" {
+ if err = MergePatchedResource(ctx, &unstrObj, model.ResourceTypePodPatch); err != nil {
+ return err
+ }
+ }
+
labelSet := labels.Set(unstrObj.GetLabels())
if !opts.Predicate.Label.Matches(labelSet) {
continue
@@ -141,3 +160,27 @@ func newStore() *store {
}
return &s
}
+
+func MergePatchedResource(ctx context.Context, originalObj *unstructured.Unstructured, resourceTypePatch string) error {
+ resKey := fmt.Sprintf("%s/%s/%s", originalObj.GetNamespace(), resourceTypePatch, originalObj.GetName())
+ var metas *[]string
+ metas, err := dao.QueryMeta("key", resKey)
+ if err != nil {
+ return err
+ }
+ if len(*metas) > 0 {
+ defaultScheme := scheme.Scheme
+ defaulter := runtime.ObjectDefaulter(defaultScheme)
+ updatedResource := new(unstructured.Unstructured)
+ GroupVersionKind := originalObj.GroupVersionKind()
+ schemaReferenceObj, err := defaultScheme.New(GroupVersionKind)
+ if err != nil {
+ return fmt.Errorf("failed to build schema reference object, err: %+v", err)
+ }
+ if err = patchutil.StrategicPatchObject(ctx, defaulter, originalObj, []byte((*metas)[0]), updatedResource, schemaReferenceObj, ""); err != nil {
+ return err
+ }
+ originalObj = updatedResource
+ }
+ return nil
+}
diff --git a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/util/patch.go b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/util/patch.go
new file mode 100644
index 000000000..b023d288d
--- /dev/null
+++ b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/util/patch.go
@@ -0,0 +1,178 @@
+/*
+Copyright 2024 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+@CHANGELOG
+KubeEdge Authors: To merge the patchBytes of StrategicMergePatchType into the original resource,
+This file is derived from K8S apiserver code with reduced set of methods
+Changes done are
+1. Package util got some functions from "k8s.io/apiserver/pkg/endpoints/handlers/patch.go"
+and made some variant
+*/
+
+package util
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "strings"
+
+ "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/util/mergepatch"
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
+ "k8s.io/apimachinery/pkg/util/validation/field"
+ "k8s.io/apiserver/pkg/warning"
+ kjson "sigs.k8s.io/json"
+)
+
+func StrategicPatchObject(
+ requestContext context.Context,
+ defaulter runtime.ObjectDefaulter,
+ originalObject runtime.Object,
+ patchBytes []byte,
+ objToUpdate runtime.Object,
+ schemaReferenceObj runtime.Object,
+ validationDirective string,
+) error {
+ originalObjMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(originalObject)
+ if err != nil {
+ return err
+ }
+
+ patchMap := make(map[string]interface{})
+ var strictErrs []error
+ if validationDirective == metav1.FieldValidationWarn || validationDirective == metav1.FieldValidationStrict {
+ strictErrs, err = kjson.UnmarshalStrict(patchBytes, &patchMap)
+ if err != nil {
+ return errors.NewBadRequest(err.Error())
+ }
+ } else {
+ if err = kjson.UnmarshalCaseSensitivePreserveInts(patchBytes, &patchMap); err != nil {
+ return errors.NewBadRequest(err.Error())
+ }
+ }
+
+ return applyPatchToObject(requestContext, defaulter, originalObjMap, patchMap, objToUpdate, schemaReferenceObj, strictErrs, validationDirective)
+}
+
+// applyPatchToObject applies a strategic merge patch of <patchMap> to
+// <originalMap> and stores the result in <objToUpdate>.
+// NOTE: <objToUpdate> must be a versioned object.
+func applyPatchToObject(
+ requestContext context.Context,
+ defaulter runtime.ObjectDefaulter,
+ originalMap map[string]interface{},
+ patchMap map[string]interface{},
+ objToUpdate runtime.Object,
+ schemaReferenceObj runtime.Object,
+ strictErrs []error,
+ validationDirective string,
+) error {
+ patchedObjMap, err := strategicpatch.StrategicMergeMapPatch(originalMap, patchMap, schemaReferenceObj)
+ if err != nil {
+ return interpretStrategicMergePatchError(err)
+ }
+
+ // Rather than serialize the patched map to JSON, then decode it to an object, we go directly from a map to an object
+ converter := runtime.DefaultUnstructuredConverter
+ returnUnknownFields := validationDirective == metav1.FieldValidationWarn || validationDirective == metav1.FieldValidationStrict
+ if err := converter.FromUnstructuredWithValidation(patchedObjMap, objToUpdate, returnUnknownFields); err != nil {
+ strictError, isStrictError := runtime.AsStrictDecodingError(err)
+ switch {
+ case !isStrictError:
+ // disregard any sttrictErrs, because it's an incomplete
+ // list of strict errors given that we don't know what fields were
+ // unknown because StrategicMergeMapPatch failed.
+ // Non-strict errors trump in this case.
+ return errors.NewInvalid(schema.GroupKind{}, "", field.ErrorList{
+ field.Invalid(field.NewPath("patch"), fmt.Sprintf("%+v", patchMap), err.Error()),
+ })
+ case validationDirective == metav1.FieldValidationWarn:
+ addStrictDecodingWarnings(requestContext, append(strictErrs, strictError.Errors()...))
+ default:
+ strictDecodingError := runtime.NewStrictDecodingError(append(strictErrs, strictError.Errors()...))
+ return errors.NewInvalid(schema.GroupKind{}, "", field.ErrorList{
+ field.Invalid(field.NewPath("patch"), fmt.Sprintf("%+v", patchMap), strictDecodingError.Error()),
+ })
+ }
+ } else if len(strictErrs) > 0 {
+ switch {
+ case validationDirective == metav1.FieldValidationWarn:
+ addStrictDecodingWarnings(requestContext, strictErrs)
+ default:
+ return errors.NewInvalid(schema.GroupKind{}, "", field.ErrorList{
+ field.Invalid(field.NewPath("patch"), fmt.Sprintf("%+v", patchMap), runtime.NewStrictDecodingError(strictErrs).Error()),
+ })
+ }
+ }
+
+ // Decoding from JSON to a versioned object would apply defaults, so we do the same here
+ defaulter.Default(objToUpdate)
+
+ return nil
+}
+
+// addStrictDecodingWarnings confirms that the error is a strict decoding error
+// and if so adds a warning for each strict decoding violation.
+func addStrictDecodingWarnings(requestContext context.Context, errs []error) {
+ for _, e := range errs {
+ yamlWarnings := parseYAMLWarnings(e.Error())
+ for _, w := range yamlWarnings {
+ warning.AddWarning(requestContext, "", w)
+ }
+ }
+}
+
+// parseYAMLWarnings takes the strict decoding errors from the yaml decoder's output
+// and parses each individual warnings, or leaves the warning as is if
+// it does not look like a yaml strict decoding error.
+func parseYAMLWarnings(errString string) []string {
+ var trimmedString string
+ if trimmedShortString := strings.TrimPrefix(errString, shortPrefix); len(trimmedShortString) < len(errString) {
+ trimmedString = trimmedShortString
+ } else if trimmedLongString := strings.TrimPrefix(errString, longPrefix); len(trimmedLongString) < len(errString) {
+ trimmedString = trimmedLongString
+ } else {
+ // not a yaml error, return as-is
+ return []string{errString}
+ }
+
+ splitStrings := strings.Split(trimmedString, "\n")
+ for i, s := range splitStrings {
+ splitStrings[i] = strings.TrimSpace(s)
+ }
+ return splitStrings
+}
+
+// interpretStrategicMergePatchError interprets the error type and returns an error with appropriate HTTP code.
+func interpretStrategicMergePatchError(err error) error {
+ switch err {
+ case mergepatch.ErrBadJSONDoc, mergepatch.ErrBadPatchFormatForPrimitiveList, mergepatch.ErrBadPatchFormatForRetainKeys, mergepatch.ErrBadPatchFormatForSetElementOrderList, mergepatch.ErrUnsupportedStrategicMergePatchFormat:
+ return errors.NewBadRequest(err.Error())
+ case mergepatch.ErrNoListOfLists, mergepatch.ErrPatchContentNotMatchRetainKeys:
+ return errors.NewGenericServerResponse(http.StatusUnprocessableEntity, "", schema.GroupResource{}, "", err.Error(), 0, false)
+ default:
+ return err
+ }
+}
+
+const (
+ // shortPrefix is one possible beginning of yaml unmarshal strict errors.
+ shortPrefix = "yaml: unmarshal errors:\n"
+ // longPrefix is the other possible beginning of yaml unmarshal strict errors.
+ longPrefix = "error converting YAML to JSON: yaml: unmarshal errors:\n"
+)
diff --git a/edge/pkg/metamanager/process.go b/edge/pkg/metamanager/process.go
index 16bf596c7..dab76bd9c 100644
--- a/edge/pkg/metamanager/process.go
+++ b/edge/pkg/metamanager/process.go
@@ -257,7 +257,12 @@ func (m *metaManager) processPatch(message model.Message) {
feedbackError(err, message)
return
}
- sendToCloud(&message)
+
+ if connect.IsConnected() {
+ sendToCloud(&message)
+ } else {
+ feedbackError(connect.ErrConnectionLost, message)
+ }
}
func (m *metaManager) processResponse(message model.Message) {
diff --git a/go.mod b/go.mod
index 76025a15c..44b91662d 100644
--- a/go.mod
+++ b/go.mod
@@ -276,7 +276,7 @@ require (
k8s.io/pod-security-admission v0.0.0 // indirect
k8s.io/system-validators v1.8.0 // indirect
oras.land/oras-go v1.2.3 // indirect
- sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
+ sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd
sigs.k8s.io/kustomize/api v0.13.5-0.20230601165947-6ce0bf390ce3 // indirect
sigs.k8s.io/kustomize/kyaml v0.14.3-0.20230601165947-6ce0bf390ce3 // indirect
)