diff options
| author | KubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com> | 2024-07-18 20:51:01 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-07-18 20:51:01 +0800 |
| commit | bc6a2c9c0e9fbae9e51d2eb3a74a2ffd9617ba78 (patch) | |
| tree | cb92b02996b697435b6db913cbf7a4cdc5769b8e | |
| parent | Merge pull request #5732 from tangming1996/automated-cherry-pick-of-#5523-ups... (diff) | |
| parent | edge node offline pod‘s status update (diff) | |
| download | kubeedge-bc6a2c9c0e9fbae9e51d2eb3a74a2ffd9617ba78.tar.gz | |
Merge pull request #5740 from luomengY/automated-cherry-pick-of-#5556-upstream-release-1.17v1.17.2origin/release-1.17
Automated cherry pick of #5556:fix:edge node offline pod‘s status update
| -rw-r--r-- | edge/pkg/metamanager/client/pod.go | 5 | ||||
| -rw-r--r-- | edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/store.go | 45 | ||||
| -rw-r--r-- | edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/util/patch.go | 178 | ||||
| -rw-r--r-- | edge/pkg/metamanager/process.go | 7 | ||||
| -rw-r--r-- | go.mod | 2 |
5 files changed, 234 insertions, 3 deletions
diff --git a/edge/pkg/metamanager/client/pod.go b/edge/pkg/metamanager/client/pod.go index 7ebc38fad..036e13785 100644 --- a/edge/pkg/metamanager/client/pod.go +++ b/edge/pkg/metamanager/client/pod.go @@ -2,6 +2,7 @@ package client import ( "encoding/json" + "errors" "fmt" "reflect" "strings" @@ -127,6 +128,10 @@ func (c *pods) Patch(name string, patchBytes []byte) (*corev1.Pod, error) { return nil, fmt.Errorf("parse message to pod failed, err: %v", err) } + if resp.Router.Operation == model.ResponseErrorOperation { + return nil, errors.New(string(content)) + } + return handlePodResp(resource, content) } diff --git a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/store.go b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/store.go index 70312a1ce..f42d19982 100644 --- a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/store.go +++ b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/store.go @@ -14,9 +14,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" + "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/edge/pkg/metamanager/dao" "github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator" + patchutil "github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/util" "github.com/kubeedge/kubeedge/pkg/metaserver" "github.com/kubeedge/kubeedge/pkg/metaserver/util" ) @@ -63,7 +67,16 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ob return err } unstrObj := objPtr.(*unstructured.Unstructured) - return runtime.DecodeInto(s.codec, []byte((*resp.Kvs)[0].Value), unstrObj) + if err = runtime.DecodeInto(s.codec, []byte((*resp.Kvs)[0].Value), unstrObj); err != nil { + return err + } + + if unstrObj.GetKind() == "Pod" { + if err = MergePatchedResource(ctx, unstrObj, model.ResourceTypePodPatch); err != nil { + return err + } + } + return nil } func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { @@ -91,6 +104,12 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption return err } + if unstrObj.GetKind() == "Pod" { + if err = MergePatchedResource(ctx, &unstrObj, model.ResourceTypePodPatch); err != nil { + return err + } + } + labelSet := labels.Set(unstrObj.GetLabels()) if !opts.Predicate.Label.Matches(labelSet) { continue @@ -141,3 +160,27 @@ func newStore() *store { } return &s } + +func MergePatchedResource(ctx context.Context, originalObj *unstructured.Unstructured, resourceTypePatch string) error { + resKey := fmt.Sprintf("%s/%s/%s", originalObj.GetNamespace(), resourceTypePatch, originalObj.GetName()) + var metas *[]string + metas, err := dao.QueryMeta("key", resKey) + if err != nil { + return err + } + if len(*metas) > 0 { + defaultScheme := scheme.Scheme + defaulter := runtime.ObjectDefaulter(defaultScheme) + updatedResource := new(unstructured.Unstructured) + GroupVersionKind := originalObj.GroupVersionKind() + schemaReferenceObj, err := defaultScheme.New(GroupVersionKind) + if err != nil { + return fmt.Errorf("failed to build schema reference object, err: %+v", err) + } + if err = patchutil.StrategicPatchObject(ctx, defaulter, originalObj, []byte((*metas)[0]), updatedResource, schemaReferenceObj, ""); err != nil { + return err + } + originalObj = updatedResource + } + return nil +} diff --git a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/util/patch.go b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/util/patch.go new file mode 100644 index 000000000..b023d288d --- /dev/null +++ b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/util/patch.go @@ -0,0 +1,178 @@ +/* +Copyright 2024 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +@CHANGELOG +KubeEdge Authors: To merge the patchBytes of StrategicMergePatchType into the original resource, +This file is derived from K8S apiserver code with reduced set of methods +Changes done are +1. Package util got some functions from "k8s.io/apiserver/pkg/endpoints/handlers/patch.go" +and made some variant +*/ + +package util + +import ( + "context" + "fmt" + "net/http" + "strings" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/mergepatch" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/apiserver/pkg/warning" + kjson "sigs.k8s.io/json" +) + +func StrategicPatchObject( + requestContext context.Context, + defaulter runtime.ObjectDefaulter, + originalObject runtime.Object, + patchBytes []byte, + objToUpdate runtime.Object, + schemaReferenceObj runtime.Object, + validationDirective string, +) error { + originalObjMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(originalObject) + if err != nil { + return err + } + + patchMap := make(map[string]interface{}) + var strictErrs []error + if validationDirective == metav1.FieldValidationWarn || validationDirective == metav1.FieldValidationStrict { + strictErrs, err = kjson.UnmarshalStrict(patchBytes, &patchMap) + if err != nil { + return errors.NewBadRequest(err.Error()) + } + } else { + if err = kjson.UnmarshalCaseSensitivePreserveInts(patchBytes, &patchMap); err != nil { + return errors.NewBadRequest(err.Error()) + } + } + + return applyPatchToObject(requestContext, defaulter, originalObjMap, patchMap, objToUpdate, schemaReferenceObj, strictErrs, validationDirective) +} + +// applyPatchToObject applies a strategic merge patch of <patchMap> to +// <originalMap> and stores the result in <objToUpdate>. +// NOTE: <objToUpdate> must be a versioned object. +func applyPatchToObject( + requestContext context.Context, + defaulter runtime.ObjectDefaulter, + originalMap map[string]interface{}, + patchMap map[string]interface{}, + objToUpdate runtime.Object, + schemaReferenceObj runtime.Object, + strictErrs []error, + validationDirective string, +) error { + patchedObjMap, err := strategicpatch.StrategicMergeMapPatch(originalMap, patchMap, schemaReferenceObj) + if err != nil { + return interpretStrategicMergePatchError(err) + } + + // Rather than serialize the patched map to JSON, then decode it to an object, we go directly from a map to an object + converter := runtime.DefaultUnstructuredConverter + returnUnknownFields := validationDirective == metav1.FieldValidationWarn || validationDirective == metav1.FieldValidationStrict + if err := converter.FromUnstructuredWithValidation(patchedObjMap, objToUpdate, returnUnknownFields); err != nil { + strictError, isStrictError := runtime.AsStrictDecodingError(err) + switch { + case !isStrictError: + // disregard any sttrictErrs, because it's an incomplete + // list of strict errors given that we don't know what fields were + // unknown because StrategicMergeMapPatch failed. + // Non-strict errors trump in this case. + return errors.NewInvalid(schema.GroupKind{}, "", field.ErrorList{ + field.Invalid(field.NewPath("patch"), fmt.Sprintf("%+v", patchMap), err.Error()), + }) + case validationDirective == metav1.FieldValidationWarn: + addStrictDecodingWarnings(requestContext, append(strictErrs, strictError.Errors()...)) + default: + strictDecodingError := runtime.NewStrictDecodingError(append(strictErrs, strictError.Errors()...)) + return errors.NewInvalid(schema.GroupKind{}, "", field.ErrorList{ + field.Invalid(field.NewPath("patch"), fmt.Sprintf("%+v", patchMap), strictDecodingError.Error()), + }) + } + } else if len(strictErrs) > 0 { + switch { + case validationDirective == metav1.FieldValidationWarn: + addStrictDecodingWarnings(requestContext, strictErrs) + default: + return errors.NewInvalid(schema.GroupKind{}, "", field.ErrorList{ + field.Invalid(field.NewPath("patch"), fmt.Sprintf("%+v", patchMap), runtime.NewStrictDecodingError(strictErrs).Error()), + }) + } + } + + // Decoding from JSON to a versioned object would apply defaults, so we do the same here + defaulter.Default(objToUpdate) + + return nil +} + +// addStrictDecodingWarnings confirms that the error is a strict decoding error +// and if so adds a warning for each strict decoding violation. +func addStrictDecodingWarnings(requestContext context.Context, errs []error) { + for _, e := range errs { + yamlWarnings := parseYAMLWarnings(e.Error()) + for _, w := range yamlWarnings { + warning.AddWarning(requestContext, "", w) + } + } +} + +// parseYAMLWarnings takes the strict decoding errors from the yaml decoder's output +// and parses each individual warnings, or leaves the warning as is if +// it does not look like a yaml strict decoding error. +func parseYAMLWarnings(errString string) []string { + var trimmedString string + if trimmedShortString := strings.TrimPrefix(errString, shortPrefix); len(trimmedShortString) < len(errString) { + trimmedString = trimmedShortString + } else if trimmedLongString := strings.TrimPrefix(errString, longPrefix); len(trimmedLongString) < len(errString) { + trimmedString = trimmedLongString + } else { + // not a yaml error, return as-is + return []string{errString} + } + + splitStrings := strings.Split(trimmedString, "\n") + for i, s := range splitStrings { + splitStrings[i] = strings.TrimSpace(s) + } + return splitStrings +} + +// interpretStrategicMergePatchError interprets the error type and returns an error with appropriate HTTP code. +func interpretStrategicMergePatchError(err error) error { + switch err { + case mergepatch.ErrBadJSONDoc, mergepatch.ErrBadPatchFormatForPrimitiveList, mergepatch.ErrBadPatchFormatForRetainKeys, mergepatch.ErrBadPatchFormatForSetElementOrderList, mergepatch.ErrUnsupportedStrategicMergePatchFormat: + return errors.NewBadRequest(err.Error()) + case mergepatch.ErrNoListOfLists, mergepatch.ErrPatchContentNotMatchRetainKeys: + return errors.NewGenericServerResponse(http.StatusUnprocessableEntity, "", schema.GroupResource{}, "", err.Error(), 0, false) + default: + return err + } +} + +const ( + // shortPrefix is one possible beginning of yaml unmarshal strict errors. + shortPrefix = "yaml: unmarshal errors:\n" + // longPrefix is the other possible beginning of yaml unmarshal strict errors. + longPrefix = "error converting YAML to JSON: yaml: unmarshal errors:\n" +) diff --git a/edge/pkg/metamanager/process.go b/edge/pkg/metamanager/process.go index 16bf596c7..dab76bd9c 100644 --- a/edge/pkg/metamanager/process.go +++ b/edge/pkg/metamanager/process.go @@ -257,7 +257,12 @@ func (m *metaManager) processPatch(message model.Message) { feedbackError(err, message) return } - sendToCloud(&message) + + if connect.IsConnected() { + sendToCloud(&message) + } else { + feedbackError(connect.ErrConnectionLost, message) + } } func (m *metaManager) processResponse(message model.Message) { @@ -276,7 +276,7 @@ require ( k8s.io/pod-security-admission v0.0.0 // indirect k8s.io/system-validators v1.8.0 // indirect oras.land/oras-go v1.2.3 // indirect - sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect + sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd sigs.k8s.io/kustomize/api v0.13.5-0.20230601165947-6ce0bf390ce3 // indirect sigs.k8s.io/kustomize/kyaml v0.14.3-0.20230601165947-6ce0bf390ce3 // indirect ) |
