diff options
| author | Congrool <chpzhangyifei@qq.com> | 2022-04-05 17:14:20 +0800 |
|---|---|---|
| committer | Congrool <chpzhangyifei@zju.edu.cn> | 2022-05-19 17:08:58 +0800 |
| commit | 60e540a47345585110706ff0c27e1b0ad4f1b9e4 (patch) | |
| tree | 049da8b4dde6fc836c3368b4f1b20e34fec37fcf | |
| parent | init edgeapplication (diff) | |
| download | kubeedge-60e540a47345585110706ff0c27e1b0ad4f1b9e4.tar.gz | |
controller manager fix
Signed-off-by: Congrool <chpzhangyifei@qq.com>
Signed-off-by: Congrool <chpzhangyifei@zju.edu.cn>
63 files changed, 2046 insertions, 1287 deletions
@@ -13,7 +13,8 @@ BINARIES=cloudcore \ keadm \ csidriver \ iptablesmanager \ - edgemark + edgemark \ + controllermanager COMPONENTS=cloud \ edge diff --git a/build/controllermanager/Dockerfile b/build/controllermanager/Dockerfile new file mode 100644 index 000000000..a6e4f743d --- /dev/null +++ b/build/controllermanager/Dockerfile @@ -0,0 +1,15 @@ +FROM golang:1.16-alpine3.13 AS builder + +ARG GO_LDFLAGS + +COPY . /go/src/github.com/kubeedge/kubeedge + +RUN CGO_ENABLED=0 GO111MODULE=off go build -v -o /usr/local/bin/controllermanager -ldflags "$GO_LDFLAGS -w -s" \ + github.com/kubeedge/kubeedge/cloud/cmd/controllermanager + + +FROM alpine:3.13 + +COPY --from=builder /usr/local/bin/controllermanager /usr/local/bin/controllermanager + +ENTRYPOINT ["controllermanager"] diff --git a/build/controllermanager/controllermanager-deploy.yaml b/build/controllermanager/controllermanager-deploy.yaml new file mode 100644 index 000000000..95538cb13 --- /dev/null +++ b/build/controllermanager/controllermanager-deploy.yaml @@ -0,0 +1,36 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + k8s-app: kubeedge + kubeedge: controller-manager + name: kubeedge-controller-manager + namespace: kubeedge +spec: + selector: + matchLabels: + k8s-app: kubeedge + kubeedge: controller-manager + template: + metadata: + labels: + k8s-app: kubeedge + kubeedge: controller-manager + spec: + containers: + - name: controller-manager + image: kubeedge/controller-manager:latest + imagePullPolicy: IfNotPresent + restartPolicy: Always + serviceAccountName: controller-manager + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: node-role.kubernetes.io/edge + operator: DoesNotExist + tolerations: + - key: "node-role.kubernetes.io/master" + operator: "Exists" + effect: "NoSchedule" diff --git a/build/controllermanager/controllermanager-rbac.yaml b/build/controllermanager/controllermanager-rbac.yaml new file mode 100644 index 000000000..145441448 --- /dev/null +++ b/build/controllermanager/controllermanager-rbac.yaml @@ -0,0 +1,54 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: controller-manager + namespace: kubeedge + labels: + k8s-app: kubeedge + kubeedge: controller-manager + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: controller-manager + labels: + k8s-app: kubeedge + kubeedge: controller-manager +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: controller-manager +subjects: +- kind: ServiceAccount + name: controller-manager + namespace: kubeedge + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: controller-manager + labels: + k8s-app: kubeedge + kubeedge: controller-manager +rules: +- apiGroups: ["apps.kubeedge.io"] + resources: ["nodegroups", "nodegroups/status"] + verbs: ["*"] +- apiGroups: [""] + resources: ["nodes"] + verbs: ["list", "watch", "patch", "get"] +- apiGroups: [""] + resources: ["pods"] + verbs: ["list", "watch", "get", "delete"] +- apiGroups: ["apps.kubeedge.io"] + resources: ["edgeapplications", "edgeapplications/status"] + verbs: ["*"] +- apiGroups: ["apps"] + resources: ["deployments"] + verbs: ["list", "watch", "create", "update", "patch", "delete", "get"] +- apiGroups: [""] + resources: ["services"] + verbs: ["list", "watch", "create", "update", "patch", "delete", "get"] diff --git a/build/crds/grouping/grouping_v1alpha1_edgeapplication.yaml b/build/crds/apps/apps_v1alpha1_edgeapplication.yaml index 4f2e67b36..0b720c231 100644 --- a/build/crds/grouping/grouping_v1alpha1_edgeapplication.yaml +++ b/build/crds/apps/apps_v1alpha1_edgeapplication.yaml @@ -6,9 +6,9 @@ metadata: annotations: controller-gen.kubebuilder.io/version: v0.6.2 creationTimestamp: null - name: edgeapplications.grouping.kubeedge.io + name: edgeapplications.apps.kubeedge.io spec: - group: grouping.kubeedge.io + group: apps.kubeedge.io names: kind: EdgeApplication listKind: EdgeApplicationList @@ -57,8 +57,8 @@ spec: description: Overriders represents the override rules that would apply on workload. properties: - imageOverrider: - description: ImageOverrider represents the rules dedicated + imageOverriders: + description: ImageOverriders represents the rules dedicated to handling image overrides. items: description: ImageOverrider represents the rules dedicated @@ -194,12 +194,7 @@ spec: description: Version is the version of the resource. type: string required: - - kind - - name - - namespace - ordinal - - resource - - version type: object required: - identifier diff --git a/build/crds/grouping/grouping_v1alpha1_nodegroup.yaml b/build/crds/apps/apps_v1alpha1_nodegroup.yaml index 1c2741eaf..559d4ba97 100644 --- a/build/crds/grouping/grouping_v1alpha1_nodegroup.yaml +++ b/build/crds/apps/apps_v1alpha1_nodegroup.yaml @@ -6,9 +6,9 @@ metadata: annotations: controller-gen.kubebuilder.io/version: v0.6.2 creationTimestamp: null - name: nodegroups.grouping.kubeedge.io + name: nodegroups.apps.kubeedge.io spec: - group: grouping.kubeedge.io + group: apps.kubeedge.io names: kind: NodeGroup listKind: NodeGroupList diff --git a/build/helm/charts/cloudcore/crds/grouping_v1alpha1_nodegroup.yaml b/build/helm/charts/cloudcore/crds/grouping_v1alpha1_nodegroup.yaml deleted file mode 100644 index a9dfa23f8..000000000 --- a/build/helm/charts/cloudcore/crds/grouping_v1alpha1_nodegroup.yaml +++ /dev/null @@ -1,94 +0,0 @@ - ---- -apiVersion: apiextensions.k8s.io/v1 -kind: CustomResourceDefinition -metadata: - annotations: - controller-gen.kubebuilder.io/version: v0.6.2 - creationTimestamp: null - name: nodegroups.grouping.kubeedge.io -spec: - group: grouping.kubeedge.io - names: - kind: NodeGroup - listKind: NodeGroupList - plural: nodegroups - shortNames: - - ng - singular: nodegroup - scope: cluster - versions: - - name: v1alpha1 - schema: - openAPIV3Schema: - description: NodeGroup is the Schema for the nodegroups API - properties: - apiVersion: - description: 'APIVersion defines the versioned schema of this representation - of an object. Servers should convert recognized schemas to the latest - internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' - type: string - kind: - description: 'Kind is a string value representing the REST resource this - object represents. Servers may infer this from the endpoint the client - submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' - type: string - metadata: - type: object - spec: - description: Spec represents the specification of the desired behavior - of member nodegroup. - properties: - matchLabels: - additionalProperties: - type: string - description: MatchLabels are used to select nodes that have these - labels. - type: object - nodes: - description: Nodes contains names of all the nodes in the nodegroup. - items: - type: string - type: array - type: object - status: - description: Status represents the status of member nodegroup. - properties: - nodeStatuses: - description: NodeStatuses is a status list of all selected nodes. - items: - description: NodeStatus contains status of node that selected by - this NodeGroup. - properties: - nodeName: - description: NodeName contains name of this node. - type: string - readyStatus: - description: ReadyStatus contains ready status of this node. - type: string - selectionStatus: - description: SelectionStatus contains status of the selection - result for this node. - type: string - selectionStatusReason: - description: SelectionStatusReason contains human-readable reason - for this SelectionStatus. - type: string - required: - - nodeName - - readyStatus - - selectionStatus - type: object - type: array - type: object - type: object - served: true - storage: true - subresources: - status: {} -status: - acceptedNames: - kind: "" - plural: "" - conditions: [] - storedVersions: [] diff --git a/cloud/cmd/controllermanager/app/controllermanager.go b/cloud/cmd/controllermanager/app/controllermanager.go new file mode 100644 index 000000000..f99010772 --- /dev/null +++ b/cloud/cmd/controllermanager/app/controllermanager.go @@ -0,0 +1,52 @@ +package app + +import ( + "context" + "flag" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "k8s.io/component-base/cli/globalflag" + "k8s.io/klog/v2" + + // set --kubeconfig option + _ "sigs.k8s.io/controller-runtime/pkg/client/config" + + "github.com/kubeedge/kubeedge/cloud/cmd/controllermanager/app/options" + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager" + "github.com/kubeedge/kubeedge/pkg/version/verflag" +) + +func NewControllerManagerCommand(ctx context.Context) *cobra.Command { + opts := options.NewControllerManagerOptions() + cmd := &cobra.Command{ + Use: "controller-manager", + Long: `The node group controller manager run a bunch of controllers`, + Run: func(cmd *cobra.Command, args []string) { + Run(ctx) + }, + } + + pflag.CommandLine.AddGoFlagSet(flag.CommandLine) + fs := cmd.Flags() + namedFs := opts.Flags() + verflag.AddFlags(namedFs.FlagSet("global")) + globalflag.AddGlobalFlags(namedFs.FlagSet("global"), cmd.Name()) + for _, f := range namedFs.FlagSets { + fs.AddFlagSet(f) + } + + return cmd +} + +func Run(ctx context.Context) { + mgr, err := controllermanager.NewAppsControllerManager(ctx) + if err != nil { + klog.Fatalf("failed to get controller manager, %v", err) + } + + // mgr.Start will block until the manager has stopped + if err := mgr.Start(ctx); err != nil { + klog.Fatalf("failed to start controller manager, %v", err) + } +} diff --git a/cloud/cmd/controllermanager/app/options/options.go b/cloud/cmd/controllermanager/app/options/options.go new file mode 100644 index 000000000..269b56c11 --- /dev/null +++ b/cloud/cmd/controllermanager/app/options/options.go @@ -0,0 +1,19 @@ +package options + +import ( + cliflag "k8s.io/component-base/cli/flag" +) + +type ControllerManagerOptions struct { + UseServerSideApply bool +} + +func NewControllerManagerOptions() *ControllerManagerOptions { + return &ControllerManagerOptions{} +} + +func (o *ControllerManagerOptions) Flags() (fss cliflag.NamedFlagSets) { + fs := fss.FlagSet("ControllerManager") + fs.BoolVar(&o.UseServerSideApply, "use-server-side-apply", o.UseServerSideApply, "If use server-side apply when updating templates") + return +} diff --git a/cloud/cmd/controllermanager/controllermanager.go b/cloud/cmd/controllermanager/controllermanager.go new file mode 100644 index 000000000..813c7c57c --- /dev/null +++ b/cloud/cmd/controllermanager/controllermanager.go @@ -0,0 +1,21 @@ +package main + +import ( + "os" + + apiserver "k8s.io/apiserver/pkg/server" + "k8s.io/component-base/logs" + + "github.com/kubeedge/kubeedge/cloud/cmd/controllermanager/app" +) + +func main() { + logs.InitLogs() + defer logs.FlushLogs() + + ctx := apiserver.SetupSignalContext() + + if err := app.NewControllerManagerCommand(ctx).Execute(); err != nil { + os.Exit(1) + } +} diff --git a/cloud/pkg/controllermanager/controllermanager.go b/cloud/pkg/controllermanager/controllermanager.go new file mode 100644 index 000000000..92b258ca5 --- /dev/null +++ b/cloud/pkg/controllermanager/controllermanager.go @@ -0,0 +1,84 @@ +package controllermanager + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer/json" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/manager" + + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/edgeapplication" + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/edgeapplication/overridemanager" + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/edgeapplication/statusmanager" + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/nodegroup" + appsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" +) + +var appsScheme = runtime.NewScheme() + +func init() { + utilruntime.Must(scheme.AddToScheme(appsScheme)) + utilruntime.Must(appsv1alpha1.AddToScheme(appsScheme)) +} + +func NewAppsControllerManager(ctx context.Context) (manager.Manager, error) { + kubeCfg, err := controllerruntime.GetConfig() + if err != nil { + return nil, fmt.Errorf("failed to get kubeconfig, %v", err) + } + controllerManager, err := controllerruntime.NewManager(kubeCfg, controllerruntime.Options{ + Scheme: appsScheme, + // TODO: leader election + // TODO: /healthz + + }) + if err != nil { + return nil, fmt.Errorf("failed to create controller manager, %v", err) + } + + if err := setupControllers(ctx, controllerManager); err != nil { + return nil, err + } + return controllerManager, nil +} + +func setupControllers(ctx context.Context, mgr manager.Manager) error { + Serializer := json.NewYAMLSerializer(json.DefaultMetaFactory, appsScheme, appsScheme) + // TODO: add cacheReader for unstructured + // This returned cli will directly acquire the unstructured objects from API Server which + // have not be registered in the appsScheme. Currently, we only support deployment in + // EdgeApplication, so there's no problem. We have to add cacheReader for unstructured + // obj if we want to support more types, such as CRDs. + cli := mgr.GetClient() + nodeGroupController := &nodegroup.Controller{ + Client: cli, + } + + edgeApplicationControllere := &edgeapplication.Controller{ + Client: cli, + Serializer: Serializer, + StatusManager: statusmanager.NewStatusManager(ctx, mgr, cli, Serializer), + Overrider: &overridemanager.OverrideManager{ + Overriders: []overridemanager.Overrider{ + &overridemanager.NameOverrider{}, + &overridemanager.ReplicasOverrider{}, + &overridemanager.ImageOverrider{}, + &overridemanager.NodeSelectorOverrider{}, + }, + }, + } + + klog.Info("setup nodegroup controller") + if err := nodeGroupController.SetupWithManager(ctx, mgr); err != nil { + return fmt.Errorf("failed to setup nodegroup controller, %v", err) + } + if err := edgeApplicationControllere.SetupWithManager(mgr); err != nil { + return fmt.Errorf("failed to setup edgeapplication controller, %v", err) + } + return nil +} diff --git a/cloud/pkg/controllermanager/edgeapplication/constants/constants.go b/cloud/pkg/controllermanager/edgeapplication/constants/constants.go new file mode 100644 index 000000000..7d52ced49 --- /dev/null +++ b/cloud/pkg/controllermanager/edgeapplication/constants/constants.go @@ -0,0 +1,23 @@ +package constants + +import "k8s.io/apimachinery/pkg/runtime/schema" + +const ( + LastAppliedTemplateAnnotationKey = "apps.kubeedge.io/last-applied-template" + LastContainedResourcesAnnotationKey = "apps.kubeedge.io/last-contained-resources" +) + +var OverriderTargetGVK map[schema.GroupVersionKind]struct{} = map[schema.GroupVersionKind]struct{}{ + DeploymentGVK: {}, +} + +var ServiceGVK = schema.GroupVersionKind{ + Version: "v1", + Kind: "Service", +} + +var DeploymentGVK = schema.GroupVersionKind{ + Group: "apps", + Version: "v1", + Kind: "Deployment", +} diff --git a/cloud/pkg/controllermanager/edgeapplication/edgeapplicationcontroller.go b/cloud/pkg/controllermanager/edgeapplication/edgeapplicationcontroller.go new file mode 100644 index 000000000..3ac8a4499 --- /dev/null +++ b/cloud/pkg/controllermanager/edgeapplication/edgeapplicationcontroller.go @@ -0,0 +1,591 @@ +package edgeapplication + +import ( + "context" + "encoding/json" + "fmt" + "sort" + + jsonpatch "github.com/evanphx/json-patch" + "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/edgeapplication/constants" + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/edgeapplication/overridemanager" + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/edgeapplication/statusmanager" + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/edgeapplication/utils" + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/nodegroup" + appsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" +) + +// Controller is to sync EdgeApplication. +type Controller struct { + client.Client + runtime.Serializer + overridemanager.Overrider + statusmanager.StatusManager + UseServerSideApply bool + ReconcileTriggerChan chan event.GenericEvent +} + +// Reconcile performs a full reconciliation for the object referred to by the Request. +// The Controller will requeue the Request to be processed again if an error is non-nil or +// Result.Requeue is true, otherwise upon completion it will remove the work from the queue. +func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { + klog.Infof("Reconciling EdgeApplication %s/%s", req.NamespacedName.Namespace, req.NamespacedName.Name) + + edgeApp := &appsv1alpha1.EdgeApplication{} + if err := c.Client.Get(ctx, req.NamespacedName, edgeApp); err != nil { + // The resource may no longer exist, in which case we stop processing. + if apierrors.IsNotFound(err) { + return controllerruntime.Result{}, nil + } + + klog.Errorf("failed to get edgeapplication %s/%s, %v", req.NamespacedName.Namespace, req.NamespacedName.Name, err) + return controllerruntime.Result{Requeue: true}, err + } + + if !edgeApp.DeletionTimestamp.IsZero() { + // foreground cascade deletion of OwnerReference + // will take the responsibility of removing created resources. + return controllerruntime.Result{}, nil + } + + return c.syncEdgeApplication(ctx, edgeApp) +} + +// SetupWithManager creates a controller and register to controller manager. +func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { + if c.Client == nil { + return fmt.Errorf("client of edgeapplication controller cannot be nil") + } + if c.Serializer == nil { + return fmt.Errorf("serializer of edgeapplication controller cannot be nil") + } + if c.StatusManager == nil { + return fmt.Errorf("status manager of edgeapplication controller cannot be nil") + } + if c.Overrider == nil { + return fmt.Errorf("overrider of edgeapplication controller cannot be nil") + } + c.ReconcileTriggerChan = make(chan event.GenericEvent) + c.StatusManager.SetReconcileTriggerChan(c.ReconcileTriggerChan) + // start the StatusManager + if err := c.StatusManager.Start(); err != nil { + return fmt.Errorf("fail to start StatusManager, %v", err) + } + return controllerruntime.NewControllerManagedBy(mgr). + For(&appsv1alpha1.EdgeApplication{}). + Watches(&source.Channel{Source: c.ReconcileTriggerChan}, &handler.EnqueueRequestForObject{}). + Complete(c) +} + +func (c *Controller) syncEdgeApplication(ctx context.Context, edgeApp *appsv1alpha1.EdgeApplication) (controllerruntime.Result, error) { + // 1. get manifests, set ownerReference and apply overrides to all target resources + // It will traverse all templates in EdgeApplication. If error occurs during traverse, + // it will log the error and continue. + modifiedTmplInfos := []*utils.TemplateInfo{} + errs := []error{} + overriderInfos := utils.GetAllOverriders(edgeApp) + tmplInfos, err := utils.GetTemplatesInfosOfEdgeApp(edgeApp, c.Serializer) + if err != nil { + klog.Errorf("failed to get all templates from edgeapp %s/%s, %v, continue with what got", edgeApp.Namespace, edgeApp.Name, err) + errs = append(errs, err) + } + for _, tmplInfo := range tmplInfos { + tmpl := tmplInfo.Template + setOwnerReference(tmpl, edgeApp) + if tmpl.GroupVersionKind() == constants.ServiceGVK { + addRangeNodeGroupAnnotation(tmpl) + modifiedTmplInfos = append(modifiedTmplInfos, tmplInfo) + continue + } + + if !needOverride(tmpl) { + klog.V(4).Infof("obj %s/%s of gvk %s does not need override, skip override", + tmpl.GetNamespace(), tmpl.GetName(), tmpl.GroupVersionKind()) + modifiedTmplInfos = append(modifiedTmplInfos, tmplInfo) + continue + } + + // apply overriders + // + // TODO: consider the situation that not all the overrides have been applied successfully + // If one succeeded and another failed, the status of edgeApp will only contain the successful + // one, and have no status about the failed one. + for _, info := range overriderInfos { + copy := tmpl.DeepCopy() + klog.V(4).Infof("override obj %s/%s of gvk %s", copy.GetNamespace(), copy.GetName(), copy.GroupVersionKind()) + if err := c.Overrider.ApplyOverrides(copy, info); err != nil { + klog.Errorf("failed to apply override of nodegroup %s to obj %s/%s of gvk %s, %v", + info.TargetNodeGroup, copy.GetNamespace(), copy.GetName(), copy.GroupVersionKind(), err) + errs = append(errs, err) + continue + } + modifiedTmplInfos = append(modifiedTmplInfos, &utils.TemplateInfo{Ordinal: tmplInfo.Ordinal, Template: copy}) + } + } + + // 2. remove status that do not need + if err := c.updateStatus(ctx, edgeApp, modifiedTmplInfos); err != nil { + klog.Errorf("failed to update status for EdgeApplication %s/%s, %v", edgeApp.Namespace, edgeApp.Name, err) + errs = append(errs, err) + } + + // 3. apply all templates + // It will create/update the resource in the template and notify the status manager + // to monitor its status. + for _, tmplInfo := range modifiedTmplInfos { + tmpl := tmplInfo.Template + if err := c.applyTemplate(ctx, tmpl); err != nil { + klog.Errorf("failed to apply overridden template of EdgeApplication %s/%s, %v, template: %v", edgeApp.Namespace, edgeApp.Name, err, tmpl) + errs = append(errs, err) + continue + } + klog.V(4).Infof("successfully applied overridden template of EdgeApplication %s/%s, template: %v", edgeApp.Namespace, edgeApp.Name, tmpl) + } + + // 4. delete resources that have been removed from the manifests + if err := c.deleteRedundantResources(ctx, edgeApp, modifiedTmplInfos); err != nil { + klog.Errorf("failed to delete redundant resource for EdgeApplication %s/%s, %v", edgeApp.Namespace, edgeApp.Name, err) + errs = append(errs, err) + } + + // 5. update the LastContainedResourcesAnnotation + if err := c.addOrUpdateLastContainedResourcesAnnotation(ctx, edgeApp, modifiedTmplInfos); err != nil { + klog.Errorf("failed to update annotation of EdgeApplication %s/%s, %v", edgeApp.Namespace, edgeApp.Name, err) + errs = append(errs, err) + } + + return controllerruntime.Result{}, errors.NewAggregate(errs) +} + +func (c *Controller) deleteRedundantResources(ctx context.Context, edgeApp *appsv1alpha1.EdgeApplication, currentTmplInfos []*utils.TemplateInfo) error { + lastContainedResourcesInfos, err := c.getLastContainedResourceInfos(edgeApp) + if err != nil { + klog.Errorf("failed to get infos of last contained resources in EdgeApplication %s/%s, %v", + edgeApp.Namespace, edgeApp.Name, err) + return err + } + currentContainedResourceInfos := make([]utils.ResourceInfo, len(currentTmplInfos)) + for i := range currentTmplInfos { + currentContainedResourceInfos[i] = utils.GetResourceInfoOfTemplateInfo(currentTmplInfos[i]) + } + deleted := getDeletedResources(lastContainedResourcesInfos, currentContainedResourceInfos) + for _, info := range deleted { + if err := c.removeResource(ctx, info); err != nil { + klog.Errorf("failed to remove resource %s/%s of gvk %s, %v", + info.Namespace, info.Name, schema.GroupVersionKind{Group: info.Group, Version: info.Version, Kind: info.Kind}, err) + return err + } + klog.V(4).Infof("successfully remove resource %s/%s of gvk %s, %v", + info.Namespace, info.Name, schema.GroupVersionKind{Group: info.Group, Version: info.Version, Kind: info.Kind}, err) + } + return nil +} + +func (c *Controller) updateStatus(ctx context.Context, edgeApp *appsv1alpha1.EdgeApplication, tmplInfos []*utils.TemplateInfo) error { + newStatus := []appsv1alpha1.ManifestStatus{} + tmplMap := map[int][]*utils.TemplateInfo{} + for _, tmplInfo := range tmplInfos { + ordinal := tmplInfo.Ordinal + if _, ok := tmplMap[ordinal]; !ok { + tmplMap[ordinal] = []*utils.TemplateInfo{tmplInfo} + } else { + tmplMap[ordinal] = append(tmplMap[ordinal], tmplInfo) + } + } + + // remove redundant status entries, these status do not have any corresponding + // template in this edgeapplication + statusExists := map[string]struct{}{} + for _, status := range edgeApp.Status.WorkloadStatus { + id := status.Identifier + for _, tmplInfo := range tmplMap[id.Ordinal] { + resourceInfo := utils.GetResourceInfoOfTemplateInfo(tmplInfo) + if utils.IsIdentifierSameAsResourceInfo(id, resourceInfo) { + // this status still need to retain + statusExists[resourceInfo.String()] = struct{}{} + newStatus = append(newStatus, status) + } + } + } + + // add missed status entry to ensure each passed-in template have its corresponding status + for _, tmplInfo := range tmplInfos { + resourceInfo := utils.GetResourceInfoOfTemplateInfo(tmplInfo) + if _, ok := statusExists[resourceInfo.String()]; !ok { + // this tmpl does not have relate status entry, add a new entry for it + newStatus = append(newStatus, appsv1alpha1.ManifestStatus{ + Condition: appsv1alpha1.EdgeAppProcessing, + Identifier: appsv1alpha1.ResourceIdentifier{ + Ordinal: resourceInfo.Ordinal, + Group: resourceInfo.Group, + Version: resourceInfo.Kind, + Kind: resourceInfo.Kind, + Namespace: resourceInfo.Namespace, + Name: resourceInfo.Name, + }, + }) + } + } + + // ensure each template have its corresponding status + // Because of error, some entries in edgeApp.Spec.WorkloadTemplate.Manifests cannot + // be parsed as an template object or cannot applied override to it. These entries should + // also have its status, though they are not elements of passed-in argument tmplInfos. + for ordinal := 0; ordinal < len(edgeApp.Spec.WorkloadTemplate.Manifests); ordinal++ { + find := false + for _, status := range newStatus { + if status.Identifier.Ordinal == ordinal { + find = true + break + } + } + if !find { + newStatus = append(newStatus, appsv1alpha1.ManifestStatus{ + Condition: appsv1alpha1.EdgeAppProcessing, + Identifier: appsv1alpha1.ResourceIdentifier{ + Ordinal: ordinal, + }, + }) + } + } + + sort.Slice(newStatus, func(i, j int) bool { + if newStatus[i].Identifier.Ordinal != newStatus[j].Identifier.Ordinal { + return newStatus[i].Identifier.Ordinal < newStatus[j].Identifier.Ordinal + } + return newStatus[i].Identifier.Name < newStatus[j].Identifier.Name + }) + + if equality.Semantic.DeepEqual(newStatus, edgeApp.Status.WorkloadStatus) { + klog.V(4).Infof("newStatus is same as the current status in edgeApp %s/%s, skip update status", + edgeApp.Namespace, edgeApp.Name) + return nil + } + + edgeApp.Status.WorkloadStatus = newStatus + return c.Client.Status().Update(ctx, edgeApp) +} + +func (c *Controller) ifObjExists(ctx context.Context, obj *unstructured.Unstructured) (bool, *unstructured.Unstructured, error) { + ns, name := obj.GetNamespace(), obj.GetName() + gvk := obj.GetObjectKind().GroupVersionKind() + unstructuredObj := &unstructured.Unstructured{} + unstructuredObj.SetGroupVersionKind(gvk) + if err := c.Client.Get(ctx, client.ObjectKey{Namespace: ns, Name: name}, unstructuredObj); err != nil { + if apierrors.IsNotFound(err) { + return false, nil, nil + } + return false, nil, fmt.Errorf("failed to get obj %s/%s of gvk %s, %v", ns, name, gvk, err) + } + return true, unstructuredObj, nil +} + +func (c *Controller) updateTemplate(ctx context.Context, tmpl *unstructured.Unstructured, curObj *unstructured.Unstructured) error { + if _, ok := curObj.GetAnnotations()[constants.LastAppliedTemplateAnnotationKey]; !ok { + klog.Warningf("cannot find LastAppliedTemplateAnnotation on obj %s/%s of gvk %s, update it with new template", + curObj.GetNamespace(), curObj.GetName(), curObj.GroupVersionKind()) + if err := c.Client.Update(ctx, tmpl); err != nil { + return fmt.Errorf("failed to update object with template %s, %v", tmpl, err) + } + return nil + } + + same, err := isSameAsLastApplied(tmpl, curObj) + if err != nil { + // error occurs when comparing the overridden template with the last applied template + return err + } else if err == nil && same { + // nothing to do for this template + return nil + } + + // The existing object has different last applied template than what is specified in the EdgeApplication. + // Update the object with the template in EdgeApplication, and update its LastAppliedTemplateAnnotation. + if err := addOrUpdateLastAppliedTemplateAnnotation(tmpl); err != nil { + return fmt.Errorf("failed to add LastAppliedTemplateAnnotation to obj %s/%s of gvk %s, %v", + tmpl.GetNamespace(), tmpl.GetName(), tmpl.GroupVersionKind(), err) + } + if err := c.update(ctx, tmpl, curObj); err != nil { + return fmt.Errorf("failed to update object %s/%s of gvk %s, %v", + curObj.GetNamespace(), curObj.GetName(), curObj.GroupVersionKind(), err) + } + return nil +} + +// applyTemplate will apply the passed-in template +// If the object has already existed, it will update it when it is different from what specified in the template +// If the object does not exist, it will create it according to the template +func (c *Controller) applyTemplate(ctx context.Context, tmpl *unstructured.Unstructured) error { + ns, name := tmpl.GetNamespace(), tmpl.GetName() + gvk := tmpl.GroupVersionKind() + exists, curObj, err := c.ifObjExists(ctx, tmpl) + if err != nil { + klog.Errorf("failed to check the existence of obj %s/%s, gvk: %s, %v", ns, name, gvk, err) + return err + } + + if exists { + // the obj has already exited in the cluster + // try to update it + klog.V(4).Infof("object %s/%s of gvk %s has already existed, try to update it with template: %v", ns, name, gvk, tmpl) + if err := c.updateTemplate(ctx, tmpl, curObj); err != nil { + klog.Errorf("failed to update the object %s/%s, gvk: %s, %v", ns, name, gvk, err) + return err + } + } else { + klog.V(4).Infof("try to create object %s/%s of gvk %s with template: %v", ns, name, gvk, tmpl) + if err := addOrUpdateLastAppliedTemplateAnnotation(tmpl); err != nil { + return fmt.Errorf("failed to add LastAppliedTemplateAnnotation to obj %s/%s of gvk %s, %v", + tmpl.GetNamespace(), tmpl.GetName(), tmpl.GroupVersionKind(), err) + } + if err := c.Client.Create(ctx, tmpl); err != nil { + klog.Errorf("failed to create the object %s/%s of gvk %s with template: %v, %v", ns, name, gvk, tmpl, err) + return err + } + } + // notify the StatusManager to watch its status. + return c.StatusManager.WatchStatus(utils.ResourceInfo{ + Group: gvk.Group, + Version: gvk.Version, + Kind: gvk.Kind, + Namespace: ns, + Name: name, + }) +} + +func (c *Controller) removeResource(ctx context.Context, info utils.ResourceInfo) error { + unstructuredObj := &unstructured.Unstructured{} + gvk := schema.GroupVersionKind{ + Group: info.Group, + Version: info.Version, + Kind: info.Kind, + } + unstructuredObj.SetGroupVersionKind(gvk) + if err := c.Client.Get(ctx, types.NamespacedName{Namespace: info.Namespace, Name: info.Name}, unstructuredObj); err != nil && apierrors.IsNotFound(err) { + return fmt.Errorf("failed to get obj %s/%s of gvk %s, %v", info.Namespace, info.Name, gvk, err) + } + + if err := c.Client.Delete(ctx, unstructuredObj); err != nil && apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete obj %s/%s of gvk %s, %v", info.Namespace, info.Name, gvk, err) + } + return nil +} + +func (c *Controller) getLastContainedResourceInfos(edgeApp *appsv1alpha1.EdgeApplication) ([]utils.ResourceInfo, error) { + anno := edgeApp.Annotations + if anno == nil || anno[constants.LastContainedResourcesAnnotationKey] == "" { + klog.Infof("cannot get last contained resources of EdgeApplication %s/%s for annotation not existing, possibly it is a new-created edgeapp", + edgeApp.Namespace, edgeApp.Name) + return []utils.ResourceInfo{}, nil + } + + infos := []utils.ResourceInfo{} + annoValue := anno[constants.LastContainedResourcesAnnotationKey] + if err := json.Unmarshal([]byte(annoValue), &infos); err != nil { + return nil, fmt.Errorf("failed to unmarshal LastContainedResourcesAnnotation on edgeapp %s/%s", edgeApp.Namespace, edgeApp.Name) + } + return infos, nil +} + +// addOrUpdateLastContainedResourcesAnnotation will add the ContainedResourcesAnnotation to the EdgeApplication, +// if the annotation has already existed, it will be updated it according to resources in manifests. +func (c *Controller) addOrUpdateLastContainedResourcesAnnotation(ctx context.Context, edgeApp *appsv1alpha1.EdgeApplication, tmplInfos []*utils.TemplateInfo) error { + if edgeApp.Annotations == nil { + edgeApp.Annotations = make(map[string]string) + } + + resourceInfos := make([]*utils.ResourceInfo, len(tmplInfos)) + for i := range tmplInfos { + info := utils.GetResourceInfoOfTemplateInfo(tmplInfos[i]) + resourceInfos[i] = &info + } + sort.Slice(resourceInfos, func(i, j int) bool { return resourceInfos[i].String() < resourceInfos[j].String() }) + infosJSON, err := json.Marshal(resourceInfos) + if err != nil { + return fmt.Errorf("failed to marshal infos %v, %v", resourceInfos, err) + } + + oldAnno := edgeApp.Annotations[constants.LastContainedResourcesAnnotationKey] + if oldAnno == string(infosJSON) { + klog.V(4).Infof("skip update last-applied-resources annotation of edgeapp %s/%s for same value", edgeApp.Namespace, edgeApp.Name) + return nil + } + edgeApp.Annotations[constants.LastContainedResourcesAnnotationKey] = string(infosJSON) + return c.Client.Update(ctx, edgeApp) +} + +func (c *Controller) update(ctx context.Context, tmpl *unstructured.Unstructured, curObj *unstructured.Unstructured) error { + if c.UseServerSideApply { + if err := c.Client.Update(ctx, tmpl); err != nil { + return fmt.Errorf("failed to update object with template %s, %v", tmpl, err) + } + return nil + } + + // use client-side apply + var oldJSON, newJSON, curObjectJSON, newObjectJSON []byte + var err error + anno, ok := curObj.GetAnnotations()[constants.LastAppliedTemplateAnnotationKey] + if !ok { + return fmt.Errorf("cannot find last-applied-template annotation on obj %s/%s of gvk %s", + curObj.GetNamespace(), curObj.GetName(), curObj.GroupVersionKind()) + } + oldJSON = []byte(anno) + if newJSON, err = tmpl.MarshalJSON(); err != nil { + return fmt.Errorf("failed to serialize template as json %v, %s", tmpl, err) + } + mergePatch, err := jsonpatch.CreateMergePatch(oldJSON, newJSON) + if err != nil { + return fmt.Errorf("cannot get merge patch for error %v, old json: %s, new json: %s", err, oldJSON, newJSON) + } + + if curObjectJSON, err = curObj.MarshalJSON(); err != nil { + return fmt.Errorf("failed to serialize current obj as json, which is %s/%s of gvk %s, err: %v", + curObj.GetNamespace(), curObj.GetName(), curObj.GroupVersionKind(), err) + } + if newObjectJSON, err = jsonpatch.MergePatch(curObjectJSON, mergePatch); err != nil { + return fmt.Errorf("failed to apply json merge patch to current obj %s/%s of gvk %s, merge patch: %s, err: %v", + curObj.GetNamespace(), curObj.GetName(), curObj.GroupVersionKind(), string(mergePatch), err) + } + newObj := &unstructured.Unstructured{} + if _, _, err = c.Serializer.Decode(newObjectJSON, nil, newObj); err != nil { + return fmt.Errorf("failed to decode json of new object as new object for error: %v, json: %s", err, string(newObjectJSON)) + } + + if err := c.Client.Patch(ctx, newObj, client.MergeFrom(curObj)); err != nil { + return fmt.Errorf("failed to update obj as %v, %v", newObj, err) + } + return nil +} + +func addOrUpdateLastAppliedTemplateAnnotation(obj *unstructured.Unstructured) error { + objJSON, err := obj.MarshalJSON() + if err != nil { + return fmt.Errorf("failed to unmarshal obj %s/%s of gvk %s, %v", + obj.GetNamespace(), obj.GetName(), obj.GroupVersionKind(), err) + } + if obj.GetAnnotations() == nil { + annotations := map[string]string{ + constants.LastAppliedTemplateAnnotationKey: string(objJSON), + } + obj.SetAnnotations(annotations) + return nil + } + annotations := obj.GetAnnotations() + annotations[constants.LastAppliedTemplateAnnotationKey] = string(objJSON) + obj.SetAnnotations(annotations) + return nil +} + +func setOwnerReference(obj *unstructured.Unstructured, edgeApp *appsv1alpha1.EdgeApplication) { + toAdd := metav1.OwnerReference{ + APIVersion: edgeApp.APIVersion, + BlockOwnerDeletion: pointer.BoolPtr(true), + Controller: pointer.BoolPtr(true), + Kind: edgeApp.Kind, + Name: edgeApp.Name, + UID: edgeApp.UID, + } + ownerReferences := obj.GetOwnerReferences() + if ownerReferences == nil { + ownerReferences = []metav1.OwnerReference{toAdd} + obj.SetOwnerReferences(ownerReferences) + return + } + + // check if the OwnerReference has already existed + for i := range ownerReferences { + ownerReference := &ownerReferences[i] + if ownerReference.APIVersion == edgeApp.APIVersion && + *ownerReference.Controller && + ownerReference.Kind == edgeApp.Kind { + // one obj can only have one edgeApp as its owner + // so we overwrite this entry. + ownerReference.Name = edgeApp.Name + ownerReference.UID = edgeApp.UID + obj.SetOwnerReferences(ownerReferences) + return + } + } + + // add a new entry to its OwnerReferences + ownerReferences = append(ownerReferences, toAdd) + obj.SetOwnerReferences(ownerReferences) +} + +// isSameAsLastApplied will check if the curObj has the same specified fileds as objInEdgeApp. +// It assumes that fields of the obj in cluster are same as the value of last-applied-template annotation. +func isSameAsLastApplied(objInEdgeApp *unstructured.Unstructured, curObj runtime.Object) (bool, error) { + accessor := meta.NewAccessor() + annots, err := accessor.Annotations(curObj) + if err != nil { + return false, fmt.Errorf("failed to get annotations of object, %v", err) + } + + objJSON, err := objInEdgeApp.MarshalJSON() + if err != nil { + return false, fmt.Errorf("failed to marshal json of obj %s/%s, gvk: %s, %v", + objInEdgeApp.GetNamespace(), objInEdgeApp.GetName(), objInEdgeApp.GroupVersionKind(), err) + } + + if lastApplied, ok := annots[constants.LastAppliedTemplateAnnotationKey]; ok { + if string(objJSON) == lastApplied { + return true, nil + } + return false, nil + } + + return false, fmt.Errorf("cannot find last applied template in annotation, %v, possibly it is not created by EdgeApplication Controller", err) +} + +// needOverride determines if a obj needs override, according to its gvk. +func needOverride(obj runtime.Object) bool { + gvk := obj.GetObjectKind().GroupVersionKind() + _, ok := constants.OverriderTargetGVK[gvk] + return ok +} + +// getDeletedResources will return a slice of all deleted resourceInfo, which +// are in oldInfos but not in newInfos. +func getDeletedResources(oldInfos, newInfos []utils.ResourceInfo) []utils.ResourceInfo { + deleted := []utils.ResourceInfo{} + newInfoStrs := make(map[string]struct{}) + for _, info := range newInfos { + newInfoStrs[info.String()] = struct{}{} + } + for _, info := range oldInfos { + if _, ok := newInfoStrs[info.String()]; !ok { + deleted = append(deleted, info) + } + } + return deleted +} + +func addRangeNodeGroupAnnotation(obj *unstructured.Unstructured) { + anno := obj.GetAnnotations() + if anno == nil { + obj.SetAnnotations( + map[string]string{nodegroup.ServiceTopologyAnnotation: nodegroup.ServiceTopologyRangeNodegroup}, + ) + return + } + anno[nodegroup.ServiceTopologyAnnotation] = nodegroup.ServiceTopologyRangeNodegroup + obj.SetAnnotations(anno) +} diff --git a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/constants.go b/cloud/pkg/controllermanager/edgeapplication/overridemanager/constants.go index f4bb7d9fb..f4bb7d9fb 100644 --- a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/constants.go +++ b/cloud/pkg/controllermanager/edgeapplication/overridemanager/constants.go diff --git a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/helper.go b/cloud/pkg/controllermanager/edgeapplication/overridemanager/helper.go index 63990c969..63990c969 100644 --- a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/helper.go +++ b/cloud/pkg/controllermanager/edgeapplication/overridemanager/helper.go diff --git a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/imageoverrider.go b/cloud/pkg/controllermanager/edgeapplication/overridemanager/imageoverrider.go index 62e98e3d7..e4d1e9dc7 100644 --- a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/imageoverrider.go +++ b/cloud/pkg/controllermanager/edgeapplication/overridemanager/imageoverrider.go @@ -15,8 +15,8 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/klog/v2" - "github.com/kubeedge/kubeedge/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/imageparser" - groupingv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/edgeapplication/overridemanager/imageparser" + appsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" ) const ( @@ -30,7 +30,7 @@ type ImageOverrider struct{} var _ Overrider = &ImageOverrider{} func (o *ImageOverrider) ApplyOverrides(rawObj *unstructured.Unstructured, overriders OverriderInfo) error { - imageOverriders := overriders.Overriders.ImageOverrider + imageOverriders := overriders.Overriders.ImageOverriders for index := range imageOverriders { patches, err := buildPatches(rawObj, &imageOverriders[index]) if err != nil { @@ -48,7 +48,7 @@ func (o *ImageOverrider) ApplyOverrides(rawObj *unstructured.Unstructured, overr } // buildPatches parse JSON patches from resource object by imageOverriders -func buildPatches(rawObj *unstructured.Unstructured, imageOverrider *groupingv1alpha1.ImageOverrider) ([]overrideOption, error) { +func buildPatches(rawObj *unstructured.Unstructured, imageOverrider *appsv1alpha1.ImageOverrider) ([]overrideOption, error) { if imageOverrider.Predicate == nil { return buildPatchesWithEmptyPredicate(rawObj, imageOverrider) } @@ -56,7 +56,7 @@ func buildPatches(rawObj *unstructured.Unstructured, imageOverrider *groupingv1a return buildPatchesWithPredicate(rawObj, imageOverrider) } -func buildPatchesWithEmptyPredicate(rawObj *unstructured.Unstructured, imageOverrider *groupingv1alpha1.ImageOverrider) ([]overrideOption, error) { +func buildPatchesWithEmptyPredicate(rawObj *unstructured.Unstructured, imageOverrider *appsv1alpha1.ImageOverrider) ([]overrideOption, error) { switch rawObj.GetKind() { case PodKind: podObj, err := ConvertToPod(rawObj) @@ -99,7 +99,7 @@ func buildPatchesWithEmptyPredicate(rawObj *unstructured.Unstructured, imageOver return nil, nil } -func extractPatchesBy(podSpec corev1.PodSpec, prefixPath string, imageOverrider *groupingv1alpha1.ImageOverrider) ([]overrideOption, error) { +func extractPatchesBy(podSpec corev1.PodSpec, prefixPath string, imageOverrider *appsv1alpha1.ImageOverrider) ([]overrideOption, error) { patches := make([]overrideOption, 0) for containerIndex, container := range podSpec.Containers { @@ -118,7 +118,7 @@ func spliceImagePath(prefixPath string, containerIndex int) string { return fmt.Sprintf("%s/containers/%d/image", prefixPath, containerIndex) } -func buildPatchesWithPredicate(rawObj *unstructured.Unstructured, imageOverrider *groupingv1alpha1.ImageOverrider) ([]overrideOption, error) { +func buildPatchesWithPredicate(rawObj *unstructured.Unstructured, imageOverrider *appsv1alpha1.ImageOverrider) ([]overrideOption, error) { patches := make([]overrideOption, 0) imageValue, err := obtainImageValue(rawObj, imageOverrider.Predicate.Path) @@ -165,7 +165,7 @@ func obtainImageValue(rawObj *unstructured.Unstructured, predicatePath string) ( return imageValue, nil } -func acquireOverrideOption(imagePath, curImage string, imageOverrider *groupingv1alpha1.ImageOverrider) (overrideOption, error) { +func acquireOverrideOption(imagePath, curImage string, imageOverrider *appsv1alpha1.ImageOverrider) (overrideOption, error) { if !strings.HasPrefix(imagePath, pathSplit) { return overrideOption{}, fmt.Errorf("imagePath should be start with / character") } @@ -176,46 +176,46 @@ func acquireOverrideOption(imagePath, curImage string, imageOverrider *groupingv } return overrideOption{ - Op: string(groupingv1alpha1.OverriderOpReplace), + Op: string(appsv1alpha1.OverriderOpReplace), Path: imagePath, Value: newImage, }, nil } -func overrideImage(curImage string, imageOverrider *groupingv1alpha1.ImageOverrider) (string, error) { +func overrideImage(curImage string, imageOverrider *appsv1alpha1.ImageOverrider) (string, error) { imageComponent, err := imageparser.Parse(curImage) if err != nil { return "", fmt.Errorf("failed to parse image value(%s), error: %v", curImage, err) } switch imageOverrider.Component { - case groupingv1alpha1.Registry: + case appsv1alpha1.Registry: switch imageOverrider.Operator { - case groupingv1alpha1.OverriderOpAdd: + case appsv1alpha1.OverriderOpAdd: imageComponent.SetHostname(imageComponent.Hostname() + imageOverrider.Value) - case groupingv1alpha1.OverriderOpReplace: + case appsv1alpha1.OverriderOpReplace: imageComponent.SetHostname(imageOverrider.Value) - case groupingv1alpha1.OverriderOpRemove: + case appsv1alpha1.OverriderOpRemove: imageComponent.RemoveHostname() } return imageComponent.String(), nil - case groupingv1alpha1.Repository: + case appsv1alpha1.Repository: switch imageOverrider.Operator { - case groupingv1alpha1.OverriderOpAdd: + case appsv1alpha1.OverriderOpAdd: imageComponent.SetRepository(imageComponent.Repository() + imageOverrider.Value) - case groupingv1alpha1.OverriderOpReplace: + case appsv1alpha1.OverriderOpReplace: imageComponent.SetRepository(imageOverrider.Value) - case groupingv1alpha1.OverriderOpRemove: + case appsv1alpha1.OverriderOpRemove: imageComponent.RemoveRepository() } return imageComponent.String(), nil - case groupingv1alpha1.Tag: + case appsv1alpha1.Tag: switch imageOverrider.Operator { - case groupingv1alpha1.OverriderOpAdd: + case appsv1alpha1.OverriderOpAdd: imageComponent.SetTagOrDigest(imageComponent.TagOrDigest() + imageOverrider.Value) - case groupingv1alpha1.OverriderOpReplace: + case appsv1alpha1.OverriderOpReplace: imageComponent.SetTagOrDigest(imageOverrider.Value) - case groupingv1alpha1.OverriderOpRemove: + case appsv1alpha1.OverriderOpRemove: imageComponent.RemoveTagOrDigest() } return imageComponent.String(), nil diff --git a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/imageparser/imageparser.go b/cloud/pkg/controllermanager/edgeapplication/overridemanager/imageparser/imageparser.go index 5747ea708..5747ea708 100644 --- a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/imageparser/imageparser.go +++ b/cloud/pkg/controllermanager/edgeapplication/overridemanager/imageparser/imageparser.go diff --git a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/imageparser/lifted.go b/cloud/pkg/controllermanager/edgeapplication/overridemanager/imageparser/lifted.go index bb15ee180..bb15ee180 100644 --- a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/imageparser/lifted.go +++ b/cloud/pkg/controllermanager/edgeapplication/overridemanager/imageparser/lifted.go diff --git a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/nameoverrider.go b/cloud/pkg/controllermanager/edgeapplication/overridemanager/nameoverrider.go index 6812cb7c5..1edafbf07 100644 --- a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/nameoverrider.go +++ b/cloud/pkg/controllermanager/edgeapplication/overridemanager/nameoverrider.go @@ -9,8 +9,7 @@ import ( type NameOverrider struct{} func (o *NameOverrider) ApplyOverrides(rawObj *unstructured.Unstructured, overriders OverriderInfo) error { - // TODO: - // consider how to override if oldName is empty + // TODO: consider how to override if oldName is empty oldName := rawObj.GetName() newName := fmt.Sprintf("%s-%s", oldName, overriders.TargetNodeGroup) rawObj.SetName(newName) diff --git a/cloud/pkg/controllermanager/edgeapplication/overridemanager/nodeselectoroverrider.go b/cloud/pkg/controllermanager/edgeapplication/overridemanager/nodeselectoroverrider.go new file mode 100644 index 000000000..23cbd79c0 --- /dev/null +++ b/cloud/pkg/controllermanager/edgeapplication/overridemanager/nodeselectoroverrider.go @@ -0,0 +1,34 @@ +package overridemanager + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/nodegroup" +) + +type NodeSelectorOverrider struct{} + +func (o *NodeSelectorOverrider) ApplyOverrides(rawObj *unstructured.Unstructured, overriders OverriderInfo) error { + switch rawObj.GetKind() { + case DeploymentKind: + deploymentObj, err := ConvertToDeployment(rawObj) + if err != nil { + return fmt.Errorf("failed to convert Deployment from unstructured object: %v", err) + } + nodeGroupLabel := map[string]string{ + nodegroup.LabelBelongingTo: overriders.TargetNodeGroup, + } + deploymentObj.Spec.Template.Spec.NodeSelector = nodeGroupLabel + unstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(deploymentObj) + if err != nil { + return fmt.Errorf("failed to convert Deployment to unstructrued object: %v", err) + } + rawObj.Object = unstructured + default: + return fmt.Errorf("cannot override nodeselector for obj of gvk %s", rawObj.GroupVersionKind()) + } + return nil +} diff --git a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/overridemanager.go b/cloud/pkg/controllermanager/edgeapplication/overridemanager/overridemanager.go index ce2e5eb66..42d47d0df 100644 --- a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/overridemanager.go +++ b/cloud/pkg/controllermanager/edgeapplication/overridemanager/overridemanager.go @@ -8,7 +8,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" errorutil "k8s.io/apimachinery/pkg/util/errors" - groupingv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" + appsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" ) // overrideOption define the JSONPatch operator @@ -25,7 +25,7 @@ type Overrider interface { type OverriderInfo struct { TargetNodeGroup string - Overriders *groupingv1alpha1.Overriders + Overriders *appsv1alpha1.Overriders } type OverrideManager struct { diff --git a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/replicasoverrider.go b/cloud/pkg/controllermanager/edgeapplication/overridemanager/replicasoverrider.go index 5c7a7f860..5ebd1e1ba 100644 --- a/cloud/pkg/groupingcontroller/edgeapplication/overridemanager/replicasoverrider.go +++ b/cloud/pkg/controllermanager/edgeapplication/overridemanager/replicasoverrider.go @@ -5,7 +5,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - groupingv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" + apppsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" ) const ( @@ -17,10 +17,13 @@ type ReplicasOverrider struct{} func (o *ReplicasOverrider) ApplyOverrides(rawObj *unstructured.Unstructured, overriders OverriderInfo) error { switch rawObj.GetKind() { case DeploymentKind: + if overriders.Overriders.Replicas == nil { + return nil + } patch := overrideOption{ - Op: string(groupingv1alpha1.OverriderOpReplace), + Op: string(apppsv1alpha1.OverriderOpReplace), Path: deploymentReplicasPath, - Value: overriders.Overriders.Replicas, + Value: *overriders.Overriders.Replicas, } if err := applyJSONPatch(rawObj, []overrideOption{patch}); err != nil { return fmt.Errorf("failed to apply replicas override on deployment %s/%s, %v", diff --git a/cloud/pkg/controllermanager/edgeapplication/statusmanager/reconciler.go b/cloud/pkg/controllermanager/edgeapplication/statusmanager/reconciler.go new file mode 100644 index 000000000..fbebe1858 --- /dev/null +++ b/cloud/pkg/controllermanager/edgeapplication/statusmanager/reconciler.go @@ -0,0 +1,219 @@ +package statusmanager + +import ( + "context" + "fmt" + "sort" + + appsv1 "k8s.io/api/apps/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/edgeapplication/constants" + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/edgeapplication/overridemanager" + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/edgeapplication/utils" + appsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" +) + +type statusReconciler struct { + schema.GroupVersionKind + runtime.Serializer + client.Client + overridemanager.Overrider + ReoncileTriggerChan chan event.GenericEvent +} + +func (r *statusReconciler) Reconcile(ctx context.Context, request controllerruntime.Request) (controllerruntime.Result, error) { + edgeApp := &appsv1alpha1.EdgeApplication{} + if err := r.Client.Get(ctx, request.NamespacedName, edgeApp); err != nil { + if apierrors.IsNotFound(err) { + return controllerruntime.Result{}, nil + } + klog.Errorf("failed to get edgeApp %s/%s, %v", request.Namespace, request.Name, err) + return controllerruntime.Result{Requeue: true}, err + } + + if !edgeApp.GetDeletionTimestamp().IsZero() { + return controllerruntime.Result{}, nil + } + + return r.sync(ctx, edgeApp) +} + +func (r *statusReconciler) sync(ctx context.Context, edgeApp *appsv1alpha1.EdgeApplication) (controllerruntime.Result, error) { + tmplInfos, err := utils.GetTemplatesInfosOfEdgeApp(edgeApp, r.Serializer) + if err != nil { + klog.Errorf("failed to get infos of templates in edgeApp %s/%s, continue with what has been got", edgeApp.Namespace, edgeApp.Name, err) + } + + for _, tmplInfo := range tmplInfos { + tmpl := tmplInfo.Template + gvk := tmpl.GroupVersionKind() + if gvk != r.GroupVersionKind { + // it's not managed by this reconciler + continue + } + if _, ok := constants.OverriderTargetGVK[gvk]; !ok { + if err := r.updateStatus(ctx, edgeApp, tmplInfo, availableIfExists{}); err != nil { + klog.Errorf("failed to update status for edgeApp %s/%s, %v", edgeApp.Namespace, edgeApp.Name, err) + return controllerruntime.Result{Requeue: true}, err + } + } else { + // Apply overriders to get the actual template that applied to + // each nodegroup. Currently, only NameOverrider is applied. + overrideInfos := utils.GetAllOverriders(edgeApp) + for _, overrideInfo := range overrideInfos { + copy := tmpl.DeepCopy() + if err := r.Overrider.ApplyOverrides(copy, overrideInfo); err != nil { + klog.Errorf("failed to apply overrides to template %s/%s of gvk %s when updating status of edgeapp %s/%s, %v", + copy.GetNamespace(), copy.GetName(), gvk, edgeApp.Namespace, edgeApp.Name, err) + continue + } + // TODO: + // Currently, only deployment will be override. So, we just use the deploymentAvailable here. + // It a temporary strategy for convinence. When we need to support more GVK, a generic strategy + // is needed. + newTmplInfo := &utils.TemplateInfo{Ordinal: tmplInfo.Ordinal, Template: copy} + if err := r.updateStatus(ctx, edgeApp, newTmplInfo, deploymentAvailable{}); err != nil { + klog.Errorf("failed to update status for edgeApp %s/%s, %v", edgeApp.Namespace, edgeApp.Name, err) + return controllerruntime.Result{Requeue: true}, err + } + } + } + } + // Trigger reconciliation of edgeapplication controller + // Considering that if an object created by edgeapplication controller is deleted by others, + // only this controller watches its event, thus if it status in edgeapplication resource is unchanged, + // the deleted object will not be created until the next resync. + r.ReoncileTriggerChan <- event.GenericEvent{Object: edgeApp.DeepCopy()} + return controllerruntime.Result{}, nil +} + +func (r *statusReconciler) updateStatus( + ctx context.Context, + edgeApp *appsv1alpha1.EdgeApplication, + tmplInfo *utils.TemplateInfo, + available available) error { + info := utils.GetResourceInfoOfTemplateInfo(tmplInfo) + isAvailable, err := available.IsAvailable(ctx, r.Client, info) + if err != nil { + klog.Errorf("failed to check the availability of obj %s/%s, %s/%s, kind: %s, %v", + info.Namespace, info.Name, info.Group, info.Version, info.Kind, err) + } + if isAvailable { + return r.update(ctx, edgeApp, info, appsv1alpha1.EdgeAppAvailable) + } + return r.update(ctx, edgeApp, info, appsv1alpha1.EdgeAppProcessing) +} + +func (r *statusReconciler) update( + ctx context.Context, + edgeApp *appsv1alpha1.EdgeApplication, + info utils.ResourceInfo, + status appsv1alpha1.ManifestCondition) error { + newStatus := appsv1alpha1.ManifestStatus{ + Identifier: appsv1alpha1.ResourceIdentifier{ + Ordinal: info.Ordinal, + Group: info.Group, + Version: info.Version, + Kind: info.Kind, + Namespace: info.Namespace, + Name: info.Name, + }, + Condition: status, + } + + var statusInEdgeApp *appsv1alpha1.ManifestStatus + for i, status := range edgeApp.Status.WorkloadStatus { + // find the existing status that need update + if status.Identifier.Ordinal == info.Ordinal { + if utils.IsIdentifierSameAsResourceInfo(status.Identifier, info) || utils.IsInitStatus(&status) { + statusInEdgeApp = &edgeApp.Status.WorkloadStatus[i] + break + } + } + } + if statusInEdgeApp == nil { + // not found, add a new entry for it + edgeApp.Status.WorkloadStatus = append(edgeApp.Status.WorkloadStatus, newStatus) + } else if utils.IsInitStatus(statusInEdgeApp) || statusInEdgeApp.Condition != status { + // the existing status needs to be updated + *statusInEdgeApp = newStatus + } else { + // no need to update status + klog.V(4).Infof("obj %s/%s of gvk %s/%s, %s has same status as what in edgeapp %s/%s, skip update status", + info.Namespace, info.Name, info.Group, info.Version, info.Kind, edgeApp.Namespace, edgeApp.Name) + return nil + } + + sort.Slice(edgeApp.Status.WorkloadStatus, func(i, j int) bool { + return edgeApp.Status.WorkloadStatus[i].Identifier.Ordinal < edgeApp.Status.WorkloadStatus[j].Identifier.Ordinal + }) + if err := r.Client.Status().Update(ctx, edgeApp); err != nil { + return fmt.Errorf("failed to update status of EdgeApplication %s/%s, %v", edgeApp.Namespace, edgeApp.Name, err) + } + klog.V(4).Infof("successfully update status of edgeapp %s/%s for obj %s/%s of gvk %s/%s, Kind %s with value %s", + edgeApp.Namespace, edgeApp.Name, info.Namespace, info.Name, info.Group, info.Version, info.Kind, status) + return nil +} + +type available interface { + IsAvailable(context.Context, client.Client, utils.ResourceInfo) (bool, error) +} + +var _ available = availableIfExists{} +var _ available = deploymentAvailable{} + +type availableIfExists struct{} + +func (e availableIfExists) IsAvailable(ctx context.Context, client client.Client, info utils.ResourceInfo) (bool, error) { + obj, err := getObjAccordingToResourceInfo(ctx, client, info) + if err != nil { + return false, err + } + if obj == nil { + return false, nil + } + return true, nil +} + +type deploymentAvailable struct{} + +func (d deploymentAvailable) IsAvailable(ctx context.Context, client client.Client, info utils.ResourceInfo) (bool, error) { + obj, err := getObjAccordingToResourceInfo(ctx, client, info) + if err != nil { + return false, err + } + if obj == nil { + return false, err + } + + deploy := &appsv1.Deployment{} + if err := client.Scheme().Convert(obj, deploy, nil); err != nil { + return false, fmt.Errorf("failed to convert unstructured to deployment for %s/%s, %v", info.Namespace, info.Name, err) + } + + return deploy.Status.ReadyReplicas == *deploy.Spec.Replicas, nil +} + +func getObjAccordingToResourceInfo(ctx context.Context, client client.Client, info utils.ResourceInfo) (*unstructured.Unstructured, error) { + gvk := schema.GroupVersionKind{Group: info.Group, Version: info.Version, Kind: info.Kind} + curObj := &unstructured.Unstructured{} + curObj.SetGroupVersionKind(gvk) + if err := client.Get(ctx, types.NamespacedName{Namespace: info.Namespace, Name: info.Name}, curObj); err != nil { + if apierrors.IsNotFound(err) { + klog.V(4).Infof("cannot find obj %s/%s of gvk %s", info.Namespace, info.Name, gvk) + return nil, nil + } + klog.Errorf("failed to get obj %s/%s, gvk: %s, %v", info.Namespace, info.Name, gvk, err) + return nil, err + } + return curObj, nil +} diff --git a/cloud/pkg/groupingcontroller/edgeapplication/statusmanager/statusmanager.go b/cloud/pkg/controllermanager/edgeapplication/statusmanager/statusmanager.go index b976b8ad5..a65907380 100644 --- a/cloud/pkg/groupingcontroller/edgeapplication/statusmanager/statusmanager.go +++ b/cloud/pkg/controllermanager/edgeapplication/statusmanager/statusmanager.go @@ -13,41 +13,34 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/source" - groupingv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/edgeapplication/overridemanager" + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/edgeapplication/utils" + appsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" ) type StatusManager interface { - WatchStatus(ResourceInfo) error - CancelWatch(ResourceInfo) error - Start() -} - -type ResourceInfo struct { - Group string `json:"group"` - Version string `json:"version"` - Kind string `json:"kind"` - Namespace string `json:"namespace"` - Name string `json:"name"` -} - -func (c *ResourceInfo) String() string { - return fmt.Sprintf("%s/%s, kind=%s, namespace=%s, name=%s", c.Group, c.Version, c.Kind, c.Namespace, c.Name) + WatchStatus(utils.ResourceInfo) error + CancelWatch(utils.ResourceInfo) error + SetReconcileTriggerChan(chan event.GenericEvent) + Start() error } type statusManager struct { - ctx context.Context - mtx sync.Mutex - mgr manager.Manager - client client.Client - serializer runtime.Serializer - watching map[schema.GroupVersionKind]context.CancelFunc - watchCh chan ResourceInfo - cancelCh chan ResourceInfo - started bool + ctx context.Context + mtx sync.Mutex + mgr manager.Manager + client client.Client + serializer runtime.Serializer + watching map[schema.GroupVersionKind]context.CancelFunc + watchCh chan schema.GroupVersionKind + cancelCh chan schema.GroupVersionKind + reconcileTrigger chan event.GenericEvent + started bool } func NewStatusManager(ctx context.Context, mgr manager.Manager, client client.Client, serializer runtime.Serializer) StatusManager { @@ -58,61 +51,69 @@ func NewStatusManager(ctx context.Context, mgr manager.Manager, client client.Cl client: client, serializer: serializer, watching: make(map[schema.GroupVersionKind]context.CancelFunc), - watchCh: make(chan ResourceInfo, 1024), - cancelCh: make(chan ResourceInfo, 1024), + watchCh: make(chan schema.GroupVersionKind, 1024), + cancelCh: make(chan schema.GroupVersionKind, 1024), } } -func (s *statusManager) WatchStatus(info ResourceInfo) error { +func (s *statusManager) WatchStatus(info utils.ResourceInfo) error { if !s.started { return fmt.Errorf("status manager has not started") } select { - case s.watchCh <- info: + case s.watchCh <- infoToGVK(info): default: - return fmt.Errorf("the wathCh of status manager is full, drop the info %s", info) + return fmt.Errorf("the wathCh of status manager is full, drop the info %s", info.String()) } return nil } -func (s *statusManager) CancelWatch(info ResourceInfo) error { +func (s *statusManager) CancelWatch(info utils.ResourceInfo) error { if !s.started { return fmt.Errorf("status manager has not started") } select { - case s.cancelCh <- info: + case s.cancelCh <- infoToGVK(info): default: - return fmt.Errorf("the cancelCh of status manager is full, drop the info %s", info) + return fmt.Errorf("the cancelCh of status manager is full, drop the info %s", info.String()) } return nil } -func (s *statusManager) Start() { +func (s *statusManager) Start() error { + if s.reconcileTrigger == nil { + return fmt.Errorf("reoncileTriger cannot be nil") + } s.started = true go s.watchStatusWorker() go s.cancelWatchWorker() go s.waitForTerminatingWorkers() go wait.Until(s.watchControllersGC, 5*time.Minute, s.ctx.Done()) + return nil +} + +func (s *statusManager) SetReconcileTriggerChan(ch chan event.GenericEvent) { + s.reconcileTrigger = ch } func (s *statusManager) watchStatusWorker() { - for info := range s.watchCh { - if s.isWatching(info) { + for gvk := range s.watchCh { + if s.isWatching(gvk) { continue } ctx, cancel := context.WithCancel(s.ctx) - s.markAsWatching(info, cancel) - if err := s.startToWatch(ctx, info); err != nil { - s.unmarkWatching(info) - klog.Errorf("failed to start to watch status for gvk %s, %v", info.Name, err) - // TODO: - // if need retry + s.markAsWatching(gvk, cancel) + if err := s.startToWatch(ctx, gvk); err != nil { + s.unmarkWatching(gvk) + klog.Errorf("failed to start to watch status for gvk %s, %v", gvk, err) + // TODO: if need retry continue } + klog.V(4).Infof("start to watch status of gvk %s", gvk) } klog.Info("watchStatusWorker exited") } @@ -142,35 +143,38 @@ func (s *statusManager) waitForTerminatingWorkers() { close(s.cancelCh) } -func (s *statusManager) isWatching(info ResourceInfo) bool { +func (s *statusManager) isWatching(gvk schema.GroupVersionKind) bool { s.mtx.Lock() defer s.mtx.Unlock() - gvk := infoToGVK(info) _, ok := s.watching[gvk] return ok } -func (s *statusManager) markAsWatching(info ResourceInfo, cancel context.CancelFunc) { +func (s *statusManager) markAsWatching(gvk schema.GroupVersionKind, cancel context.CancelFunc) { s.mtx.Lock() defer s.mtx.Unlock() - gvk := infoToGVK(info) s.watching[gvk] = cancel } -func (s *statusManager) unmarkWatching(info ResourceInfo) { +func (s *statusManager) unmarkWatching(gvk schema.GroupVersionKind) { s.mtx.Lock() defer s.mtx.Unlock() - if cancel := s.watching[infoToGVK(info)]; cancel != nil { + if cancel := s.watching[gvk]; cancel != nil { cancel() } - delete(s.watching, infoToGVK(info)) + delete(s.watching, gvk) } -func (s *statusManager) startToWatch(ctx context.Context, info ResourceInfo) error { - gvk := infoToGVK(info) +func (s *statusManager) startToWatch(ctx context.Context, gvk schema.GroupVersionKind) error { controllerName := fmt.Sprintf("status-controller-for-%s/%s/%s", gvk.Group, gvk.Version, gvk.Kind) controller, err := controller.NewUnmanaged(controllerName, s.mgr, controller.Options{ - Reconciler: &statusReconciler{Client: s.client, GroupVersionKind: gvk, Serializer: s.serializer}, + Reconciler: &statusReconciler{ + Client: s.client, + GroupVersionKind: gvk, + Serializer: s.serializer, + Overrider: &overridemanager.NameOverrider{}, + ReoncileTriggerChan: s.reconcileTrigger, + }, }) if err != nil { klog.Errorf("failed to get new unmanaged controller for gvk %s, %v", gvk, err) @@ -180,7 +184,7 @@ func (s *statusManager) startToWatch(ctx context.Context, info ResourceInfo) err watchObj := &unstructured.Unstructured{} watchObj.SetGroupVersionKind(gvk) if err := controller.Watch(&source.Kind{Type: watchObj}, &handler.EnqueueRequestForOwner{ - OwnerType: &groupingv1alpha1.EdgeApplication{}, + OwnerType: &appsv1alpha1.EdgeApplication{}, IsController: true, }); err != nil { klog.Errorf("failed to add delete event watch to controller for gvk: %s, %v", gvk, err) @@ -199,72 +203,43 @@ func (s *statusManager) startToWatch(ctx context.Context, info ResourceInfo) err } func (s *statusManager) watchControllersGC() { - edgeAppList := &groupingv1alpha1.EdgeApplicationList{} + edgeAppList := &appsv1alpha1.EdgeApplicationList{} if err := s.client.List(s.ctx, edgeAppList); err != nil { klog.Errorf("failed to list EdgeApplication") return } - infoMap := make(map[ResourceInfo]struct{}) + infoMap := make(map[schema.GroupVersionKind]struct{}) for _, edgeApp := range edgeAppList.Items { - infos, err := GetContainedResourceInfos(&edgeApp, s.serializer) + infos, err := utils.GetContainedResourceInfos(&edgeApp, s.serializer) if err != nil { klog.Errorf("failed to get resourceInfos from edgeApp %s/%s, %v", edgeApp.Namespace, edgeApp.Name, err) continue } for _, info := range infos { - infoMap[info] = struct{}{} + infoMap[infoToGVK(info)] = struct{}{} } } - for info := range infoMap { - if err := s.CancelWatch(info); err != nil { - klog.Errorf("failed to cancel watch for gvk %s, %v", infoToGVK(info), err) - continue + s.mtx.Lock() + defer s.mtx.Unlock() + for gvk := range s.watching { + if _, ok := infoMap[gvk]; !ok { + // no edgeapplication need to watch status of this gvk, so cancel watch of it + if err := s.CancelWatch(utils.ResourceInfo{Group: gvk.Group, Version: gvk.Version, Kind: gvk.Kind}); err != nil { + klog.Errorf("statusControllersGC failed to cancel watch of gvk %s, %v", gvk, err) + continue + } + klog.V(4).Infof("statusControllerGC cancel watch of gvk %s", gvk) } - klog.V(4).Infof("cancel watching status for gvk %s", infoToGVK(info)) } } -func infoToGVK(info ResourceInfo) schema.GroupVersionKind { +func infoToGVK(info utils.ResourceInfo) schema.GroupVersionKind { return schema.GroupVersionKind{ Group: info.Group, Version: info.Version, Kind: info.Kind, } } - -func GetContainedResourceInfos(edgeApp *groupingv1alpha1.EdgeApplication, yamlSerializer runtime.Serializer) ([]ResourceInfo, error) { - objs, err := GetContainedResourceObjs(edgeApp, yamlSerializer) - if err != nil { - return nil, fmt.Errorf("failed to get contained objs, %v", err) - } - infos := []ResourceInfo{} - for _, obj := range objs { - gvk := obj.GroupVersionKind() - info := ResourceInfo{ - Group: gvk.Group, - Version: gvk.Version, - Kind: gvk.Kind, - Namespace: obj.GetNamespace(), - Name: obj.GetName(), - } - infos = append(infos, info) - } - return infos, nil -} - -func GetContainedResourceObjs(edgeApp *groupingv1alpha1.EdgeApplication, yamlSerializer runtime.Serializer) ([]*unstructured.Unstructured, error) { - objs := []*unstructured.Unstructured{} - for _, manifest := range edgeApp.Spec.WorkloadTemplate.Manifests { - obj := &unstructured.Unstructured{} - _, _, err := yamlSerializer.Decode(manifest.Raw, nil, obj) - if err != nil { - return nil, fmt.Errorf("failed to decode manifest of edgeapp %s/%s, %v, manifest: %s", - edgeApp.Namespace, edgeApp.Name, err, manifest) - } - objs = append(objs, obj) - } - return objs, nil -} diff --git a/cloud/pkg/controllermanager/edgeapplication/utils/utils.go b/cloud/pkg/controllermanager/edgeapplication/utils/utils.go new file mode 100644 index 000000000..3f0fc9524 --- /dev/null +++ b/cloud/pkg/controllermanager/edgeapplication/utils/utils.go @@ -0,0 +1,106 @@ +package utils + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/klog/v2" + + "github.com/kubeedge/kubeedge/cloud/pkg/controllermanager/edgeapplication/overridemanager" + appsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" +) + +type ResourceInfo struct { + // Ordinal is the index of the template of this resource in + // the manifetsts of EdgeApplication. + Ordinal int `json:"ordinal"` + Group string `json:"group"` + Version string `json:"version"` + Kind string `json:"kind"` + Namespace string `json:"namespace"` + Name string `json:"name"` +} + +func (c *ResourceInfo) String() string { + return fmt.Sprintf("%d, %s/%s, kind=%s, namespace=%s, name=%s", c.Ordinal, c.Group, c.Version, c.Kind, c.Namespace, c.Name) +} + +type TemplateInfo struct { + Ordinal int + Template *unstructured.Unstructured +} + +func GetAllOverriders(edgeApp *appsv1alpha1.EdgeApplication) []overridemanager.OverriderInfo { + infos := make([]overridemanager.OverriderInfo, 0, len(edgeApp.Spec.WorkloadScope.TargetNodeGroups)) + for index := range edgeApp.Spec.WorkloadScope.TargetNodeGroups { + copied := edgeApp.Spec.WorkloadScope.TargetNodeGroups[index].Overriders.DeepCopy() + infos = append(infos, overridemanager.OverriderInfo{ + TargetNodeGroup: edgeApp.Spec.WorkloadScope.TargetNodeGroups[index].Name, + Overriders: copied, + }) + } + return infos +} + +func GetContainedResourceInfos(edgeApp *appsv1alpha1.EdgeApplication, yamlSerializer runtime.Serializer) ([]ResourceInfo, error) { + tmplInfos, err := GetTemplatesInfosOfEdgeApp(edgeApp, yamlSerializer) + if err != nil { + return nil, fmt.Errorf("failed to get contained objs, %v", err) + } + infos := []ResourceInfo{} + for _, tmplInfo := range tmplInfos { + infos = append(infos, GetResourceInfoOfTemplateInfo(tmplInfo)) + } + return infos, nil +} + +func GetResourceInfoOfTemplateInfo(tmplInfo *TemplateInfo) ResourceInfo { + tmpl := tmplInfo.Template + gvk := tmpl.GroupVersionKind() + info := ResourceInfo{ + Ordinal: tmplInfo.Ordinal, + Group: gvk.Group, + Version: gvk.Version, + Kind: gvk.Kind, + Namespace: tmpl.GetNamespace(), + Name: tmpl.GetName(), + } + return info +} + +func GetTemplatesInfosOfEdgeApp(edgeApp *appsv1alpha1.EdgeApplication, yamlSerializer runtime.Serializer) ([]*TemplateInfo, error) { + tmplInfos := []*TemplateInfo{} + errs := []error{} + for index, manifest := range edgeApp.Spec.WorkloadTemplate.Manifests { + obj := &unstructured.Unstructured{} + _, _, err := yamlSerializer.Decode(manifest.Raw, nil, obj) + if err != nil { + klog.Errorf("failed to decode manifest of edgeapp %s/%s, %v, manifest: %s", + edgeApp.Namespace, edgeApp.Name, err, manifest) + errs = append(errs, err) + continue + } + tmplInfos = append(tmplInfos, &TemplateInfo{Ordinal: index, Template: obj}) + } + return tmplInfos, errors.NewAggregate(errs) +} + +func IsInitStatus(status *appsv1alpha1.ManifestStatus) bool { + identifier := status.Identifier + return identifier.Group == "" && + identifier.Version == "" && + identifier.Kind == "" && + identifier.Resource == "" && + identifier.Namespace == "" && + identifier.Name == "" +} + +func IsIdentifierSameAsResourceInfo(identifier appsv1alpha1.ResourceIdentifier, info ResourceInfo) bool { + return identifier.Group == info.Group && + identifier.Version == info.Version && + identifier.Kind == info.Kind && + identifier.Namespace == info.Namespace && + identifier.Name == info.Name +} diff --git a/cloud/pkg/controllermanager/nodegroup/nodegroupcontroller.go b/cloud/pkg/controllermanager/nodegroup/nodegroupcontroller.go new file mode 100644 index 000000000..a09692f7f --- /dev/null +++ b/cloud/pkg/controllermanager/nodegroup/nodegroupcontroller.go @@ -0,0 +1,433 @@ +package nodegroup + +import ( + "context" + "fmt" + "sort" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/source" + + appsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" +) + +const ( + // ControllerName is the controller name that will be used when reporting events. + ControllerName = "nodegroup-controller" + + LabelBelongingTo = "apps.kubeedge.io/belonging-to" + NodeGroupControllerFinalizer = "apps.kubeedge.io/nodegroup-controller" + ServiceTopologyAnnotation = "apps.kubeedge.io/service-topology" + ServiceTopologyRangeNodegroup = "range-nodegroup" +) + +var ( + conditionStatusReadyStatusMap = map[corev1.ConditionStatus]appsv1alpha1.ReadyStatus{ + corev1.ConditionTrue: appsv1alpha1.NodeReady, + corev1.ConditionFalse: appsv1alpha1.NodeNotReady, + corev1.ConditionUnknown: appsv1alpha1.Unknown, + // for the convinence of processing the situation that node has no ready condition + "": appsv1alpha1.Unknown, + } +) + +// Controller is to sync NodeGroup. +type Controller struct { + client.Client +} + +// Reconcile performs a full reconciliation for the object referred to by the Request. +// The Controller will requeue the Request to be processed again if an error is non-nil or +// Result.Requeue is true, otherwise upon completion it will remove the work from the queue. +func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { + klog.Infof("Reconciling nodeGroup %s", req.NamespacedName.Name) + + nodeGroup := &appsv1alpha1.NodeGroup{} + if err := c.Client.Get(ctx, req.NamespacedName, nodeGroup); err != nil { + // The resource may no longer exist, in which case we stop processing. + if apierrors.IsNotFound(err) { + return controllerruntime.Result{}, nil + } + + return controllerruntime.Result{Requeue: true}, err + } + + if !nodeGroup.DeletionTimestamp.IsZero() { + // remove labels it added to nodes before deleting this NodeGroup + klog.Infof("begin to remove node group label on nodes selected by nodegroup %s", nodeGroup.Name) + if err := c.evictNodesInNodegroup(ctx, nodeGroup.Name); err != nil { + return controllerruntime.Result{Requeue: true}, err + } + // this NodeGroup can be deleted now + if err := c.removeFinalizer(ctx, nodeGroup); err != nil { + return controllerruntime.Result{Requeue: true}, err + } + return controllerruntime.Result{}, nil + } + + if !controllerutil.ContainsFinalizer(nodeGroup, NodeGroupControllerFinalizer) { + controllerutil.AddFinalizer(nodeGroup, NodeGroupControllerFinalizer) + if err := c.Client.Update(ctx, nodeGroup); err != nil { + klog.Errorf("failed to add finalizer for nodegroup %s, %s", nodeGroup.Name, err) + return controllerruntime.Result{Requeue: true}, err + } + } + + return c.syncNodeGroup(ctx, nodeGroup) +} + +func (c *Controller) syncNodeGroup(ctx context.Context, nodeGroup *appsv1alpha1.NodeGroup) (controllerruntime.Result, error) { + debugLogNodes := func(msg string, nodes []corev1.Node) { + if klog.V(4).Enabled() { + if len(nodes) == 0 { + klog.Infof("%s: get no nodes when syncing nodegroup %s", msg, nodeGroup.Name) + return + } + nodeNames := []string{} + for i := range nodes { + nodeNames = append(nodeNames, nodes[i].Name) + } + klog.Infof("%s: get %d nodes %v when syncing nodegroup %s", msg, len(nodes), nodeNames, nodeGroup.Name) + } + } + + newNodes, err := c.getNodesSelectedBy(ctx, nodeGroup) + if err != nil { + klog.Errorf("failed to get all new nodes, %s, continue with what have found.", err) + } + debugLogNodes("get new nodes", newNodes) + + oldNodes, err := c.getNodesByLabels(ctx, map[string]string{LabelBelongingTo: nodeGroup.Name}) + if err != nil { + klog.Errorf("failed to get old nodes for nodegroup %s, %s.", nodeGroup.Name, err) + return controllerruntime.Result{Requeue: true}, err + } + debugLogNodes("get current nodes", oldNodes) + + // delete belonging label on nodes that do not belong to this node group + nodesDeleted, _ := nodesDiff(oldNodes, newNodes) + debugLogNodes("get nodes to delete label", nodesDeleted) + + if err := c.evictNodes(ctx, nodesDeleted); err != nil { + klog.Errorf("failed to evict nodes that do not belong to this nodegroup anymore, %s", err) + return controllerruntime.Result{Requeue: true}, err + } + + // This loop will + // 1. add or update belonging label for nodes + // 2. prepare NodeStatus for NodeGroup + nodeStatusList := []appsv1alpha1.NodeStatus{} + existingNodes := sets.NewString() + for _, node := range newNodes { + existingNodes = existingNodes.Insert(node.Name) + nodeStatus := appsv1alpha1.NodeStatus{ + NodeName: node.Name, + } + // update ReadyStatus + nodeReadyConditionStatus, _ := getNodeReadyConditionFromNode(&node) + nodeStatus.ReadyStatus = conditionStatusReadyStatusMap[nodeReadyConditionStatus] + klog.V(4).Infof("get status %s for node %s, when reconciling nodegroup %s", nodeStatus.ReadyStatus, node.Name, nodeGroup.Name) + + // try to add node group label to this node + if err := c.addOrUpdateNodeLabel(ctx, &node, nodeGroup.Name); err != nil { + klog.Errorf("failed to update belonging label for node %s in nodegroup %s, %s, continue to reconcile other nodes", node.Name, nodeGroup.Name, err) + nodeStatus.SelectionStatus = appsv1alpha1.FailedSelection + nodeStatus.SelectionStatusReason = err.Error() + } else { + nodeStatus.SelectionStatus = appsv1alpha1.SucceededSelection + } + nodeStatusList = append(nodeStatusList, nodeStatus) + } + // update status for nodes that do not exist but specified by node name. + nonExistingNodes := sets.NewString(nodeGroup.Spec.Nodes...).Difference(existingNodes) + for node := range nonExistingNodes { + nodeStatusList = append(nodeStatusList, appsv1alpha1.NodeStatus{ + NodeName: node, + SelectionStatus: appsv1alpha1.FailedSelection, + SelectionStatusReason: "node does not exist", + ReadyStatus: appsv1alpha1.Unknown, + }) + } + sort.Slice(nodeStatusList, func(i, j int) bool { + return nodeStatusList[i].NodeName < nodeStatusList[j].NodeName + }) + if equality.Semantic.DeepEqual(nodeGroup.Status.NodeStatuses, nodeStatusList) { + klog.V(4).Infof("status of nodegroup is unchanged, skip update") + return controllerruntime.Result{}, nil + } + klog.V(4).Infof("status of nodegroup has changed, old: %v, new: %v", nodeGroup.Status.NodeStatuses, nodeStatusList) + nodeGroup.Status.NodeStatuses = nodeStatusList + if err := c.Status().Update(ctx, nodeGroup); err != nil { + klog.Errorf("failed to update status for nodegroup %s, %s", nodeGroup.Name, err) + return controllerruntime.Result{Requeue: true}, nil + } + return controllerruntime.Result{}, nil +} + +// SetupWithManager creates a controller and register to controller manager. +func (c *Controller) SetupWithManager(ctx context.Context, mgr controllerruntime.Manager) error { + if err := mgr.GetFieldIndexer().IndexField(ctx, &corev1.Pod{}, "spec.nodeName", func(o client.Object) []string { + pod := o.(*corev1.Pod) + return []string{pod.Spec.NodeName} + }); err != nil { + return fmt.Errorf("failed to set nodeName field selector for manager, %v", err) + } + return controllerruntime.NewControllerManagedBy(mgr). + For(&appsv1alpha1.NodeGroup{}). + Watches(&source.Kind{Type: &corev1.Node{}}, handler.EnqueueRequestsFromMapFunc(c.nodeMapFunc)). + Complete(c) +} + +// evictNodes will remove the belonging-to label from nodes and evict pods +// that should run in the nodegroup which the node was used to belong to. +func (c *Controller) evictNodes(ctx context.Context, nodes []corev1.Node) error { + errs := []error{} + for _, node := range nodes { + n := node.DeepCopy() + ng := n.Labels[LabelBelongingTo] + delete(n.Labels, LabelBelongingTo) + if err := c.Client.Patch(ctx, n, client.MergeFrom(&node)); err != nil { + klog.Errorf("failed to remove belonging label of nodegroup %s on node %s, %v", ng, node.Name, err) + errs = append(errs, err) + } + + if err := c.evictPodsShouldNotRunOnNode(ctx, n, ng); err != nil { + klog.Errorf("failed to evict pods running on node %s in nodegroup %s, %v", node.Name, ng, err) + errs = append(errs, err) + } + } + return utilerrors.NewAggregate(errs) +} + +func (c *Controller) evictPodsShouldNotRunOnNode(ctx context.Context, node *corev1.Node, nodegroup string) error { + // find all pods running on this node + runningPods := &corev1.PodList{} + nodeNameSelector := fields.OneTermEqualSelector("spec.nodeName", node.Name) + if err := c.Client.List(ctx, runningPods, client.MatchingFieldsSelector{Selector: nodeNameSelector}); err != nil { + return fmt.Errorf("failed to get pods running on node %s, %v", node.Name, err) + } + + // evict pods + errs := []error{} + for i := range runningPods.Items { + pod := &runningPods.Items[i] + nodeSelector := pod.Spec.NodeSelector + if v, ok := nodeSelector[LabelBelongingTo]; ok && v == nodegroup { + // TODO: in an async way? + // Delete pod seems to block until the pod has actually stopped + if err := c.Client.Delete(ctx, pod); err != nil { + errs = append(errs, fmt.Errorf("failed to delete pod %s/%s, %v", pod.Namespace, pod.Name, err)) + } + } + } + return utilerrors.NewAggregate(errs) +} + +func (c *Controller) removeFinalizer(ctx context.Context, nodeGroup *appsv1alpha1.NodeGroup) error { + if !controllerutil.ContainsFinalizer(nodeGroup, NodeGroupControllerFinalizer) { + return nil + } + controllerutil.RemoveFinalizer(nodeGroup, NodeGroupControllerFinalizer) + if err := c.Client.Update(ctx, nodeGroup); err != nil { + klog.Errorf("failed to remove finalizer on nodegroup %s, %s", nodeGroup.Name, err) + return err + } + return nil +} + +func (c *Controller) getNodesSelectedBy(ctx context.Context, nodeGroup *appsv1alpha1.NodeGroup) ([]corev1.Node, error) { + errs := []error{} + nodesByLabel, err := c.getNodesByLabels(ctx, nodeGroup.Spec.MatchLabels) + if err != nil { + klog.Errorf("failed to get nodes by MatchLabels %v, %s", nodeGroup.Spec.MatchLabels, err) + errs = append(errs, err) + } + klog.V(4).Infof("get %d nodes that match labels in nodegroup %s", len(nodesByLabel), nodeGroup.Name) + + nodesByName, err := c.getNodesByNodeName(ctx, nodeGroup.Spec.Nodes) + if err != nil { + klog.Errorf("failed to get all nodes specified in the NodeGroup.Spec.Nodes, %s.", err) + errs = append(errs, err) + } + klog.V(4).Infof("get %d nodes that specified by name in nodegroup %s", len(nodesByName), nodeGroup.Name) + // remove duplicate nodes + return nodesUnion(nodesByLabel, nodesByName), utilerrors.NewAggregate(errs) +} + +// We can assume that one node can only be in one of following conditions: +// 1. This node is an orphan, do not and will not belong to any NodeGroup. +// 2. This node is or will be a member of one NodeGroup. +func (c *Controller) nodeMapFunc(obj client.Object) []controllerruntime.Request { + node := obj.(*corev1.Node) + if nodeGroupName, ok := node.Labels[LabelBelongingTo]; ok { + return []controllerruntime.Request{ + { + NamespacedName: types.NamespacedName{ + Name: nodeGroupName, + }, + }, + } + } + // node do not have belonging label, either a new node will be add to a node group or an orphan node + nodegroupList := &appsv1alpha1.NodeGroupList{} + if err := c.Client.List(context.TODO(), nodegroupList); err != nil { + klog.Errorf("failed to list all nodegroups, %s", err) + return nil + } + + for _, nodegroup := range nodegroupList.Items { + if IfMatchNodeGroup(node, &nodegroup) { + // this node will be added into a node group + return []controllerruntime.Request{ + { + NamespacedName: types.NamespacedName{ + Name: nodegroup.Name, + }, + }, + } + } + } + + // an orphan node, do not reconcile + return []controllerruntime.Request{} +} + +func (c *Controller) evictNodesInNodegroup(ctx context.Context, nodeGroupName string) error { + selector := labels.SelectorFromSet(labels.Set( + map[string]string{LabelBelongingTo: nodeGroupName}, + )) + nodeList := &corev1.NodeList{} + err := c.Client.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector}) + if err != nil { + return err + } + return c.evictNodes(ctx, nodeList.Items) +} + +// getNodesByLabels can get all nodes matching these labels. +func (c *Controller) getNodesByLabels(ctx context.Context, matchLabels map[string]string) ([]corev1.Node, error) { + if matchLabels == nil { + // Return empty when matchLabels is nil + // Otherwise, it will select all nodes, it's not what we want + return []corev1.Node{}, nil + } + selector := labels.SelectorFromSet(labels.Set(matchLabels)) + nodeList := &corev1.NodeList{} + err := c.Client.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector}) + if err != nil { + return nil, err + } + return nodeList.Items, nil +} + +// getNodesByNodeName can get all nodes specified by node names. +func (c *Controller) getNodesByNodeName(ctx context.Context, nodeNames []string) ([]corev1.Node, error) { + errs := []error{} + nodes := []corev1.Node{} + for _, name := range nodeNames { + node := &corev1.Node{} + if err := c.Client.Get(ctx, types.NamespacedName{Name: name}, node); err != nil { + klog.Errorf("failed to get node %s, %s", name, err) + errs = append(errs, err) + continue + } + nodes = append(nodes, *node) + } + + return nodes, utilerrors.NewAggregate(errs) +} + +func (c *Controller) addOrUpdateNodeLabel(ctx context.Context, node *corev1.Node, nodeGroupName string) error { + nodeLabels := node.Labels + v, ok := nodeLabels[LabelBelongingTo] + if ok && v == nodeGroupName { + // nothing to do + return nil + } + if ok && v != nodeGroupName { + return fmt.Errorf("node %s has already belonged to NodeGroup %s", node.Name, nodeGroupName) + } + + // !ok + // add new label to this node + newnode := node.DeepCopy() + if newnode.Labels == nil { + newnode.Labels = map[string]string{} + } + newnode.Labels[LabelBelongingTo] = nodeGroupName + if err := c.Client.Patch(ctx, newnode, client.MergeFrom(node)); err != nil { + klog.Errorf("failed to add label %s=%s to node %s, %s", LabelBelongingTo, nodeGroupName, node.Name, err) + return err + } + return nil +} + +// IfMatchNodeGroup will check if the node is selected by the nodegroup. +func IfMatchNodeGroup(node *corev1.Node, nodegroup *appsv1alpha1.NodeGroup) bool { + // check if nodename is in the nodegroup.Spec.Nodes + for _, nodeName := range nodegroup.Spec.Nodes { + if nodeName == node.Name { + return true + } + } + // check if labels of this node selected by nodegroup.Spec.MatchLabels + selector := labels.SelectorFromSet(labels.Set(nodegroup.Spec.MatchLabels)) + return selector.Matches(labels.Set(node.Labels)) +} + +func getNodeReadyConditionFromNode(node *corev1.Node) (corev1.ConditionStatus, bool) { + for _, condition := range node.Status.Conditions { + if condition.Type == corev1.NodeReady { + return condition.Status, true + } + } + return "", false +} + +func nodesDiff(oldNodes []corev1.Node, newNodes []corev1.Node) ([]corev1.Node, []corev1.Node) { + nodesDeleted, nodesAdded := []corev1.Node{}, []corev1.Node{} + m := map[string]corev1.Node{} + for _, n := range oldNodes { + m[n.Name] = n + } + for _, n := range newNodes { + _, exist := m[n.Name] + if exist { + delete(m, n.Name) + } else { + nodesAdded = append(nodesAdded, n) + } + } + for _, n := range m { + nodesDeleted = append(nodesDeleted, n) + } + return nodesDeleted, nodesAdded +} + +func nodesUnion(a []corev1.Node, b []corev1.Node) []corev1.Node { + nodesMap := map[string]*corev1.Node{} + for i := range a { + nodesMap[a[i].Name] = &a[i] + } + for i := range b { + nodesMap[b[i].Name] = &b[i] + } + + nodes := []corev1.Node{} + for _, node := range nodesMap { + nodes = append(nodes, *node) + } + return nodes +} diff --git a/cloud/pkg/controllermanager/nodegroup/nodegroupcontroller_test.go b/cloud/pkg/controllermanager/nodegroup/nodegroupcontroller_test.go new file mode 100644 index 000000000..6465411b7 --- /dev/null +++ b/cloud/pkg/controllermanager/nodegroup/nodegroupcontroller_test.go @@ -0,0 +1,126 @@ +package nodegroup + +import ( + "sort" + "testing" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestNodesUnion(t *testing.T) { + cases := map[string]struct { + list1 []corev1.Node + list2 []corev1.Node + want []corev1.Node + }{ + "nil-nil": { + list1: nil, + list2: nil, + want: []corev1.Node{}, + }, + "nil-empty": { + list1: nil, + list2: []corev1.Node{}, + want: []corev1.Node{}, + }, + "nil-normal": { + list1: nil, + list2: []corev1.Node{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "node1", + }, + }, + }, + want: []corev1.Node{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "node1", + }, + }, + }, + }, + "normal-normal-different": { + list1: []corev1.Node{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "node1", + }, + }, + }, + list2: []corev1.Node{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "node2", + }, + }, + }, + want: []corev1.Node{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "node1", + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "node2", + }, + }, + }, + }, + "normal-normal-intersection": { + list1: []corev1.Node{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "node1", + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "node2", + }, + }, + }, + list2: []corev1.Node{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "node2", + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "node3", + }, + }, + }, + want: []corev1.Node{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "node1", + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "node2", + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "node3", + }, + }, + }, + }, + } + for n, c := range cases { + results := nodesUnion(c.list1, c.list2) + sort.Slice(results, func(i, j int) bool { + return results[i].Name < results[j].Name + }) + if !equality.Semantic.DeepEqual(results, c.want) { + t.Errorf("failed at case: %s, want: %v, got: %v", n, c.want, results) + } + } +} diff --git a/cloud/pkg/groupingcontroller/edgeapplication/edgeapplicationcontroller.go b/cloud/pkg/groupingcontroller/edgeapplication/edgeapplicationcontroller.go deleted file mode 100644 index ef5c6ab62..000000000 --- a/cloud/pkg/groupingcontroller/edgeapplication/edgeapplicationcontroller.go +++ /dev/null @@ -1,411 +0,0 @@ -package edgeapplication - -import ( - "context" - "encoding/json" - "fmt" - "strings" - - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" - "k8s.io/utils/pointer" - controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/kubeedge/kubeedge/cloud/pkg/groupingcontroller/edgeapplication/overridemanager" - "github.com/kubeedge/kubeedge/cloud/pkg/groupingcontroller/edgeapplication/statusmanager" - groupingv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" -) - -const ( - LastAppliedTemplateAnnotationKey = "grouping.kubeedge.io/last-applied-template" - LastContainedResourcesAnnotationKey = "grouping.kubeedge.io/last-contained-resources" -) - -var overriderTargetGVK map[schema.GroupVersionKind]struct{} = map[schema.GroupVersionKind]struct{}{ - {Group: "apps", Version: "v1", Kind: "Deployment"}: {}, -} - -// Controller is to sync EdgeApplication. -type Controller struct { - client.Client - runtime.Serializer - overridemanager.Overrider - statusmanager.StatusManager -} - -// Reconcile performs a full reconciliation for the object referred to by the Request. -// The Controller will requeue the Request to be processed again if an error is non-nil or -// Result.Requeue is true, otherwise upon completion it will remove the work from the queue. -func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { - klog.Infof("Reconciling EdgeApplication %s/%s", req.NamespacedName.Namespace, req.NamespacedName.Name) - - edgeApp := &groupingv1alpha1.EdgeApplication{} - if err := c.Client.Get(ctx, req.NamespacedName, edgeApp); err != nil { - // The resource may no longer exist, in which case we stop processing. - if apierrors.IsNotFound(err) { - return controllerruntime.Result{}, nil - } - - klog.Errorf("failed to get edgeapplication %s/%s, %v", req.NamespacedName.Namespace, req.NamespacedName.Name, err) - return controllerruntime.Result{Requeue: true}, err - } - - if !edgeApp.DeletionTimestamp.IsZero() { - // foreground cascade deletion of OwnerReference - // will take the responsibility of removing created resources. - return controllerruntime.Result{}, nil - } - - annotations := edgeApp.Annotations - if annotations == nil || annotations[LastContainedResourcesAnnotationKey] == "" { - // it is a new created EdgeApplication - // add LastContainedResourcesAnnotation for it - if err := c.addOrUpdateLastContainedResourcesAnnotation(ctx, edgeApp); err != nil { - klog.Errorf("failed to add LastContainedResourcesAnnotation to EdgeApplication %s/%s, %v", - edgeApp.Namespace, edgeApp.Name, err) - return controllerruntime.Result{Requeue: true}, err - } - // it will reconcile at the next time for update - return controllerruntime.Result{}, nil - } - - return c.syncEdgeApplication(ctx, edgeApp) -} - -// SetupWithManager creates a controller and register to controller manager. -func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { - if c.Client == nil { - return fmt.Errorf("client of edgeapplication controller cannot be nil") - } - if c.Serializer == nil { - return fmt.Errorf("serializer of edgeapplication controller cannot be nil") - } - if c.StatusManager == nil { - return fmt.Errorf("status manager of edgeapplication controller cannot be nil") - } - if c.Overrider == nil { - return fmt.Errorf("overrider of edgeapplication controller cannot be nil") - } - // start the StatusManager - c.StatusManager.Start() - return controllerruntime.NewControllerManagedBy(mgr). - For(&groupingv1alpha1.EdgeApplication{}). - Complete(c) -} - -func (c *Controller) syncEdgeApplication(ctx context.Context, edgeApp *groupingv1alpha1.EdgeApplication) (controllerruntime.Result, error) { - // 1. get manifests, set ownerReference and apply overrides to all target resources - manifests := edgeApp.Spec.WorkloadTemplate.Manifests - overriderInfos := getAllOverriders(edgeApp) - var modifiedTmpls []*unstructured.Unstructured - for _, manifest := range manifests { - copy := manifest.DeepCopy() - unstructuredObj := &unstructured.Unstructured{} - if _, _, err := c.Serializer.Decode(copy.Raw, nil, unstructuredObj); err != nil { - klog.Errorf("failed to get the unstructured of manifest: %s, %v", copy.Raw, err) - return controllerruntime.Result{Requeue: true}, err - } - setOwnerReference(unstructuredObj, edgeApp) - if !needOverride(manifest.Object) { - modifiedTmpls = append(modifiedTmpls, unstructuredObj) - continue - } - - // apply overriders - for _, info := range overriderInfos { - if err := c.Overrider.ApplyOverrides(unstructuredObj, info); err != nil { - klog.Errorf("failed to apply override of %s to template, %v, template: %s", info.TargetNodeGroup, err, string(copy.Raw)) - return controllerruntime.Result{Requeue: true}, err - } - modifiedTmpls = append(modifiedTmpls, unstructuredObj) - } - } - - // 2. apply all templates - for _, tmpl := range modifiedTmpls { - if err := c.applyTemplate(ctx, tmpl); err != nil { - klog.Errorf("failed to apply overridden template of EdgeApplication %s/%s, %v, template: %v", edgeApp.Namespace, edgeApp.Name, err, tmpl) - return controllerruntime.Result{Requeue: true}, err - } - klog.V(4).Infof("successfully applied overridden template of EdgeApplication %s/%s, template: %v", edgeApp.Namespace, edgeApp.Name, tmpl) - } - - // 3. delete resources that have been removed from the manifests - lastContainedResourcesInfos, err := c.getLastContainedResourceInfos(edgeApp) - if err != nil { - klog.Errorf("failed to get infos of last contained resources in EdgeApplication %s/%s, %v", - edgeApp.Namespace, edgeApp.Name, err) - return controllerruntime.Result{Requeue: true}, err - } - currentContainedResourcesInfos, err := statusmanager.GetContainedResourceInfos(edgeApp, c.Serializer) - if err != nil { - klog.Errorf("failed to get infos of current contained resources in EdgeApplication %s/%s, %v", - edgeApp.Namespace, edgeApp.Name, err) - return controllerruntime.Result{Requeue: true}, err - } - deleted := getDeletedResources(lastContainedResourcesInfos, currentContainedResourcesInfos) - for _, info := range deleted { - if err := c.removeResource(ctx, info); err != nil { - klog.Errorf("failed to remove resource %s/%s of gvk %s, %v", - info.Namespace, info.Name, schema.GroupVersionKind{Group: info.Group, Version: info.Version, Kind: info.Kind}, err) - return controllerruntime.Result{Requeue: true}, err - } - klog.V(4).Infof("successfully remove resource %s/%s of gvk %s, %v", - info.Namespace, info.Name, schema.GroupVersionKind{Group: info.Group, Version: info.Version, Kind: info.Kind}, err) - } - - // 4. update the LastContainedResourcesAnnotation - if err := c.addOrUpdateLastContainedResourcesAnnotation(ctx, edgeApp); err != nil { - klog.Errorf("failed to update annotation of EdgeApplication %s/%s, %v", edgeApp.Namespace, edgeApp.Name, err) - return controllerruntime.Result{Requeue: true}, err - } - return controllerruntime.Result{}, nil -} - -func (c *Controller) ifObjExists(ctx context.Context, obj *unstructured.Unstructured) (bool, runtime.Object, error) { - ns, name := obj.GetNamespace(), obj.GetName() - gvk := obj.GetObjectKind().GroupVersionKind() - unstructuredObj := &unstructured.Unstructured{} - unstructuredObj.SetGroupVersionKind(gvk) - if err := c.Client.Get(ctx, client.ObjectKey{Namespace: ns, Name: name}, unstructuredObj); err != nil { - if apierrors.IsNotFound(err) { - return false, nil, nil - } - return false, nil, fmt.Errorf("failed to get obj %s/%s of gvk %s, %v", ns, name, gvk, err) - } - return true, unstructuredObj, nil -} - -func (c *Controller) update(ctx context.Context, tmpl *unstructured.Unstructured, curObj runtime.Object) error { - same, err := isSameAsLastApplied(tmpl, curObj) - if err != nil { - // error occurs when comparing the overridden template with the last applied template - return err - } else if err == nil && same { - // nothing to do for this template - return nil - } - - // The existing object has different last applied template than what is specified in the EdgeApplication. - // Update the object with the template in EdgeApplication. - // - // TODO: - // Currently, we just use the new template in EdgeApplication to overwrite the existing object. - // Maybe we should do it with strategic merge patch. - if err := c.Client.Update(ctx, tmpl); err != nil { - return fmt.Errorf("failed to update object with template %s, %v", tmpl, err) - } - - return nil -} - -// applyTemplate will apply the passed-in template -// If the object has already existed, it will update it when it is different from what specified in the template -// If the object does not exist, it will create it according to the template -func (c *Controller) applyTemplate(ctx context.Context, tmpl *unstructured.Unstructured) error { - ns, name := tmpl.GetNamespace(), tmpl.GetName() - gvk := tmpl.GroupVersionKind() - exists, curObj, err := c.ifObjExists(ctx, tmpl) - if err != nil { - klog.Errorf("failed to check the existence of obj %s/%s, gvk: %s, %v", ns, name, gvk, err) - return err - } - - if exists { - // the obj has already exited in the cluster - // try to update it - klog.V(4).Infof("object %s/%s of gvk %s has already existed, try to update it with template: %v", ns, name, gvk, tmpl) - if err := c.update(ctx, tmpl, curObj); err != nil { - klog.Errorf("failed to update the object %s/%s, gvk: %s, %v", ns, name, gvk, err) - return err - } - return nil - } - - klog.V(4).Infof("try to create object %s/%s of gvk %s with template: %v", ns, name, gvk, tmpl) - if err := c.Client.Create(ctx, tmpl); err != nil { - klog.Errorf("failed to create the object %s/%s of gvk %s with template: %v, %v", ns, name, gvk, tmpl, err) - return err - } - // create the object successfully, notify the StatusManager to - // watch its status. - return c.StatusManager.WatchStatus(statusmanager.ResourceInfo{ - Group: gvk.Group, - Version: gvk.Version, - Kind: gvk.Kind, - Namespace: ns, - Name: name, - }) -} - -func (c *Controller) removeResource(ctx context.Context, info statusmanager.ResourceInfo) error { - unstructuredObj := &unstructured.Unstructured{} - gvk := schema.GroupVersionKind{ - Group: info.Group, - Version: info.Version, - Kind: info.Kind, - } - unstructuredObj.SetGroupVersionKind(gvk) - if err := c.Client.Get(ctx, types.NamespacedName{Namespace: info.Namespace, Name: info.Name}, unstructuredObj); err != nil && apierrors.IsNotFound(err) { - return fmt.Errorf("failed to get obj %s/%s of gvk %s, %v", info.Namespace, info.Name, gvk, err) - } - - if err := c.Client.Delete(ctx, unstructuredObj); err != nil && apierrors.IsNotFound(err) { - return fmt.Errorf("failed to delete obj %s/%s of gvk %s, %v", info.Namespace, info.Name, gvk, err) - } - return nil -} - -func (c *Controller) getLastContainedResourceInfos(edgeApp *groupingv1alpha1.EdgeApplication) ([]statusmanager.ResourceInfo, error) { - anno := edgeApp.Annotations - if anno == nil || anno[LastContainedResourcesAnnotationKey] == "" { - return nil, fmt.Errorf("failed to get last contained resources of EdgeApplication %s/%s for annotation not existing", - edgeApp.Namespace, edgeApp.Name) - } - - infos := []statusmanager.ResourceInfo{} - annoValue := anno[LastContainedResourcesAnnotationKey] - lastContainedResourceJsons := strings.Split(annoValue, ",") - for _, js := range lastContainedResourceJsons { - info := &statusmanager.ResourceInfo{} - if err := json.Unmarshal([]byte(js), info); err != nil { - return nil, fmt.Errorf("failed to unmarshal containedResourceInfo %s, %v", string(js), err) - } - - infos = append(infos, *info) - } - return infos, nil -} - -// addOrUpdateLastContainedResourcesAnnotation will add the ContainedResourcesAnnotation to the EdgeApplication, -// if the annotation has already existed, it will be updated it according to resources in manifests. -func (c *Controller) addOrUpdateLastContainedResourcesAnnotation(ctx context.Context, edgeApp *groupingv1alpha1.EdgeApplication) error { - if edgeApp.Annotations == nil { - edgeApp.Annotations = make(map[string]string) - } - - infos, err := statusmanager.GetContainedResourceInfos(edgeApp, c.Serializer) - if err != nil { - return fmt.Errorf("failed to get infos of current contained resources, %v", err) - } - - containedResources := []string{} - for _, info := range infos { - infoJSON, err := json.Marshal(info) - if err != nil { - return fmt.Errorf("failed to marshal containedResourceInfo %s in edgeApp %s/%s, %v", - info, edgeApp.Namespace, edgeApp.Name, err) - } - containedResources = append(containedResources, string(infoJSON)) - } - - edgeApp.Annotations[LastContainedResourcesAnnotationKey] = strings.Join(containedResources, ",") - if err := c.Client.Update(ctx, edgeApp); err != nil { - return fmt.Errorf("failed to update edgeApp, %v", err) - } - - return nil -} - -func setOwnerReference(obj *unstructured.Unstructured, edgeApp *groupingv1alpha1.EdgeApplication) { - toAdd := metav1.OwnerReference{ - APIVersion: edgeApp.APIVersion, - BlockOwnerDeletion: pointer.BoolPtr(true), - Controller: pointer.BoolPtr(true), - Kind: edgeApp.Kind, - Name: edgeApp.Name, - UID: edgeApp.UID, - } - ownerReferences := obj.GetOwnerReferences() - if ownerReferences == nil { - ownerReferences = []metav1.OwnerReference{toAdd} - obj.SetOwnerReferences(ownerReferences) - return - } - - // check if the OwnerReference has already existed - for i := range ownerReferences { - ownerReference := &ownerReferences[i] - if ownerReference.APIVersion == edgeApp.APIVersion && - *ownerReference.Controller && - ownerReference.Kind == edgeApp.Kind { - // one obj can only have one edgeApp as its owner - // so we overwrite this entry. - ownerReference.Name = edgeApp.Name - ownerReference.UID = edgeApp.UID - obj.SetOwnerReferences(ownerReferences) - return - } - } - - // add a new entry to its OwnerReferences - ownerReferences = append(ownerReferences, toAdd) - obj.SetOwnerReferences(ownerReferences) -} - -// isSameAsLastApplied will check if the curObj has the same specified fileds as objInEdgeApp. -// It assumes that fields of the obj in cluster are same as the value of last-applied-template annotation. -func isSameAsLastApplied(objInEdgeApp *unstructured.Unstructured, curObj runtime.Object) (bool, error) { - accessor := meta.NewAccessor() - annots, err := accessor.Annotations(curObj) - if err != nil { - return false, fmt.Errorf("failed to get annotations of object, %v", err) - } - - objJSON, err := objInEdgeApp.MarshalJSON() - if err != nil { - return false, fmt.Errorf("failed to marshal json of obj %s/%s, gvk: %s, %v", - objInEdgeApp.GetNamespace(), objInEdgeApp.GetName(), objInEdgeApp.GroupVersionKind(), err) - } - - if lastApplied, ok := annots[LastAppliedTemplateAnnotationKey]; ok { - if string(objJSON) == lastApplied { - return true, nil - } - return false, nil - } - - return false, fmt.Errorf("cannot find last applied template in annotation, %v, possibly it is not created by EdgeApplication Controller", err) -} - -func getAllOverriders(edgeApp *groupingv1alpha1.EdgeApplication) []overridemanager.OverriderInfo { - infos := make([]overridemanager.OverriderInfo, 0, len(edgeApp.Spec.WorkloadScope.TargetNodeGroups)) - for index := range edgeApp.Spec.WorkloadScope.TargetNodeGroups { - copied := edgeApp.Spec.WorkloadScope.TargetNodeGroups[index].Overriders.DeepCopy() - infos = append(infos, overridemanager.OverriderInfo{ - TargetNodeGroup: edgeApp.Spec.WorkloadScope.TargetNodeGroups[index].Name, - Overriders: copied, - }) - } - return infos -} - -// needOverride determines if a obj needs override, according to its gvk. -func needOverride(obj runtime.Object) bool { - gvk := obj.GetObjectKind().GroupVersionKind() - _, ok := overriderTargetGVK[gvk] - return ok -} - -// getDeletedResources will return a slice of all deleted resourceInfo, which -// are in oldInfos but not in newInfos. -func getDeletedResources(oldInfos, newInfos []statusmanager.ResourceInfo) []statusmanager.ResourceInfo { - deleted := []statusmanager.ResourceInfo{} - newInfoStrs := make(map[string]struct{}) - for _, info := range newInfos { - newInfoStrs[info.String()] = struct{}{} - } - for _, info := range oldInfos { - if _, ok := newInfoStrs[info.String()]; !ok { - deleted = append(deleted, info) - } - } - return deleted -} diff --git a/cloud/pkg/groupingcontroller/edgeapplication/statusmanager/reconciler.go b/cloud/pkg/groupingcontroller/edgeapplication/statusmanager/reconciler.go deleted file mode 100644 index 5b3963cfc..000000000 --- a/cloud/pkg/groupingcontroller/edgeapplication/statusmanager/reconciler.go +++ /dev/null @@ -1,180 +0,0 @@ -package statusmanager - -import ( - "context" - "fmt" - - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/apis/apps" - controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - - groupingv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" -) - -var caredGVKs map[schema.GroupVersionKind]struct{} = map[schema.GroupVersionKind]struct{}{ - {Group: "apps", Version: "v1", Kind: "Deployment"}: {}, -} - -type statusReconciler struct { - schema.GroupVersionKind - runtime.Serializer - client.Client -} - -func (r *statusReconciler) Reconcile(ctx context.Context, request controllerruntime.Request) (controllerruntime.Result, error) { - edgeApp := &groupingv1alpha1.EdgeApplication{} - if err := r.Client.Get(ctx, request.NamespacedName, edgeApp); err != nil { - if apierrors.IsNotFound(err) { - return controllerruntime.Result{}, nil - } - klog.Errorf("failed to get edgeApp %s/%s, %v", request.Namespace, request.Name, err) - return controllerruntime.Result{Requeue: true}, err - } - - if !edgeApp.GetDeletionTimestamp().IsZero() { - return controllerruntime.Result{}, nil - } - - return r.sync(ctx, edgeApp) -} - -func (r *statusReconciler) sync(ctx context.Context, edgeApp *groupingv1alpha1.EdgeApplication) (controllerruntime.Result, error) { - objs, err := GetContainedResourceObjs(edgeApp, r.Serializer) - if err != nil { - klog.Errorf("failed to get objects of manifests in edgeApp, %v", err) - return controllerruntime.Result{Requeue: true}, err - } - - for _, obj := range objs { - gvk := obj.GroupVersionKind() - if gvk != r.GroupVersionKind { - // it's not managed by this reconciler - continue - } - if _, ok := caredGVKs[gvk]; !ok { - available := availableIfExists{} - if err := r.updateStatus(ctx, edgeApp, obj, available); err != nil { - klog.Errorf("failed to update status for edgeApp %s/%s, %v", edgeApp.Namespace, edgeApp.Name, err) - return controllerruntime.Result{Requeue: true}, err - } - } else { - // Currently only deployment are cared - available := deploymentAvailable{} - if err := r.updateStatus(ctx, edgeApp, obj, available); err != nil { - klog.Errorf("failed to update status for edgeApp %s/%s, %v", edgeApp.Namespace, edgeApp.Name, err) - return controllerruntime.Result{Requeue: true}, err - } - } - } - return controllerruntime.Result{}, nil -} - -func (r *statusReconciler) updateStatus( - ctx context.Context, - edgeApp *groupingv1alpha1.EdgeApplication, - obj *unstructured.Unstructured, - available available) error { - isAvailable, err := available.IsAvailable(ctx, r.Client, obj) - if err != nil { - return fmt.Errorf("failed to check the availability of obj %s/%s, gvk: %s, %v", - obj.GetNamespace(), obj.GetName(), obj.GroupVersionKind(), err) - } - if isAvailable { - return r.update(ctx, edgeApp, obj, groupingv1alpha1.EdgeAppAvailable) - } - return r.update(ctx, edgeApp, obj, groupingv1alpha1.EdgeAppProcessing) -} - -func (r *statusReconciler) update( - ctx context.Context, - edgeApp *groupingv1alpha1.EdgeApplication, - obj *unstructured.Unstructured, - status groupingv1alpha1.ManifestCondition) error { - isSame := func(identifier groupingv1alpha1.ResourceIdentifier, obj *unstructured.Unstructured) bool { - return identifier.Group == obj.GroupVersionKind().Group && - identifier.Version == obj.GroupVersionKind().Version && - identifier.Kind == obj.GroupVersionKind().Kind && - identifier.Namespace == obj.GetNamespace() && - identifier.Name == obj.GetName() - } - - var statusInEdgeApp *groupingv1alpha1.ManifestStatus - for i, status := range edgeApp.Status.WorkloadStatus { - if isSame(status.Identifier, obj) { - statusInEdgeApp = &edgeApp.Status.WorkloadStatus[i] - break - } - } - if statusInEdgeApp == nil { - // not found, add a new entry for it - edgeApp.Status.WorkloadStatus = append(edgeApp.Status.WorkloadStatus, groupingv1alpha1.ManifestStatus{ - Identifier: groupingv1alpha1.ResourceIdentifier{ - Group: obj.GroupVersionKind().Group, - Version: obj.GroupVersionKind().Version, - Kind: obj.GroupVersionKind().Kind, - Namespace: obj.GetNamespace(), - Name: obj.GetName(), - }, - Condition: status, - }) - } else if statusInEdgeApp.Condition == status { - // no need to update status - return nil - } else { - statusInEdgeApp.Condition = status - } - - if err := r.Client.Update(ctx, edgeApp); err != nil { - return fmt.Errorf("failed to update status of EdgeApplication %s/%s, %v", edgeApp.Namespace, edgeApp.Name, err) - } - return nil -} - -type available interface { - IsAvailable(context.Context, client.Client, runtime.Object) (bool, error) -} - -type availableIfExists struct{} - -func (e availableIfExists) IsAvailable(ctx context.Context, client client.Client, obj runtime.Object) (bool, error) { - unstructuredObj, ok := obj.(*unstructured.Unstructured) - if !ok { - return false, fmt.Errorf("failed to convert object to unstructured") - } - - gvk := unstructuredObj.GroupVersionKind() - ns, name := unstructuredObj.GetNamespace(), unstructuredObj.GetName() - curObj := &unstructured.Unstructured{} - curObj.SetGroupVersionKind(gvk) - if err := client.Get(ctx, types.NamespacedName{Namespace: ns, Name: name}, curObj); err != nil { - if apierrors.IsNotFound(err) { - return false, nil - } - return false, fmt.Errorf("failed to get obj %s/%s, gvk: %s, %v", ns, name, gvk, err) - } - return true, nil -} - -type deploymentAvailable struct{} - -func (d deploymentAvailable) IsAvailable(ctx context.Context, client client.Client, obj runtime.Object) (bool, error) { - deploy, ok := obj.(*apps.Deployment) - if !ok { - return false, fmt.Errorf("failed to convert objecto to deployment") - } - curDeploy := &apps.Deployment{} - if err := client.Get(ctx, types.NamespacedName{Namespace: deploy.Namespace, Name: deploy.Name}, curDeploy); err != nil { - if apierrors.IsNotFound(err) { - return false, nil - } - return false, fmt.Errorf("failed to get deployment %s/%s, %v", deploy.Namespace, deploy.Name, err) - } - - return deploy.Status.ReadyReplicas == deploy.Spec.Replicas, nil -} diff --git a/cloud/pkg/groupingcontroller/nodegroup/nodegroupcontroller.go b/cloud/pkg/groupingcontroller/nodegroup/nodegroupcontroller.go deleted file mode 100644 index ffb2283b3..000000000 --- a/cloud/pkg/groupingcontroller/nodegroup/nodegroupcontroller.go +++ /dev/null @@ -1,351 +0,0 @@ -package nodegroup - -import ( - "context" - "fmt" - "sort" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" - utilerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/klog/v2" - controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/source" - - groupingv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" -) - -const ( - // ControllerName is the controller name that will be used when reporting events. - ControllerName = "nodegroup-controller" - - LabelBelongingTo = "grouping.kubeedge.io/belonging-to" - NodeGroupControllerFinalizer = "grouping.kubeedge.io/nodegroup-controller" -) - -var ( - conditionStatusReadyStatusMap = map[corev1.ConditionStatus]groupingv1alpha1.ReadyStatus{ - corev1.ConditionTrue: groupingv1alpha1.NodeReady, - corev1.ConditionFalse: groupingv1alpha1.NodeNotReady, - corev1.ConditionUnknown: groupingv1alpha1.Unknown, - // for the convinence of processing the situation that node has no ready condition - "": groupingv1alpha1.Unknown, - } -) - -// nodeGroupStatusSort implements sort.Interface for NodeGroupStatus -type nodeGroupStatusSort struct { - list []groupingv1alpha1.NodeStatus -} - -func (n *nodeGroupStatusSort) Len() int { return len(n.list) } -func (n *nodeGroupStatusSort) Less(i, j int) bool { return n.list[i].NodeName < n.list[j].NodeName } -func (n *nodeGroupStatusSort) Swap(i, j int) { - tmp := n.list[i].DeepCopy() - n.list[i] = *n.list[j].DeepCopy() - n.list[j] = *tmp -} - -// Controller is to sync NodeGroup. -type Controller struct { - client.Client -} - -// Reconcile performs a full reconciliation for the object referred to by the Request. -// The Controller will requeue the Request to be processed again if an error is non-nil or -// Result.Requeue is true, otherwise upon completion it will remove the work from the queue. -func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { - klog.Infof("Reconciling nodeGroup %s", req.NamespacedName.Name) - - nodeGroup := &groupingv1alpha1.NodeGroup{} - if err := c.Client.Get(context.TODO(), req.NamespacedName, nodeGroup); err != nil { - // The resource may no longer exist, in which case we stop processing. - if apierrors.IsNotFound(err) { - return controllerruntime.Result{}, nil - } - - return controllerruntime.Result{Requeue: true}, err - } - - if !nodeGroup.DeletionTimestamp.IsZero() { - // remove labels it added to nodes before deleting this NodeGroup - klog.Infof("begin to remove node group label on nodes selected by nodegroup %s", nodeGroup.Name) - if err := c.removeBelongingLabelOfNodeGroup(nodeGroup.Name); err != nil { - return controllerruntime.Result{Requeue: true}, err - } - // this NodeGroup can be deleted now - if err := c.removeFinalizer(nodeGroup); err != nil { - return controllerruntime.Result{Requeue: true}, err - } - return controllerruntime.Result{}, nil - } - - if !controllerutil.ContainsFinalizer(nodeGroup, NodeGroupControllerFinalizer) { - controllerutil.AddFinalizer(nodeGroup, NodeGroupControllerFinalizer) - if err := c.Client.Update(context.TODO(), nodeGroup); err != nil { - klog.Errorf("failed to add finalizer for nodegroup %s, %s", nodeGroup.Name, err) - return controllerruntime.Result{Requeue: true}, err - } - } - - return c.syncNodeGroup(nodeGroup) -} - -func (c *Controller) syncNodeGroup(nodeGroup *groupingv1alpha1.NodeGroup) (controllerruntime.Result, error) { - newNodes, err := c.getNodesSelectedBy(nodeGroup) - if err != nil { - klog.Errorf("failed to get all new nodes, %s, continue with what have found.", err) - } - - oldNodes, err := c.getNodesByLabels(map[string]string{ - LabelBelongingTo: nodeGroup.Name, - }) - if err != nil { - klog.Errorf("failed to get old nodes for nodegroup %s, %s.", nodeGroup.Name, err) - return controllerruntime.Result{Requeue: true}, err - } - - // delete belonging label on nodes that do not belong to this node group - nodesDeleted, _ := nodesDiff(oldNodes, newNodes) - if err := c.removeBelongingLabelOnNodes(nodesDeleted); err != nil { - klog.Errorf("failed to remove label on nodes that do not belong to this nodegroup, %s", err) - return controllerruntime.Result{Requeue: true}, err - } - - // collect statuses of nodes - nodeStatusList := []groupingv1alpha1.NodeStatus{} - for _, node := range newNodes { - nodeStatus := groupingv1alpha1.NodeStatus{ - NodeName: node.Name, - } - // update ReadyStatus - nodeReadyConditionStatus, _ := getNodeReadyConditionFromNode(&node) - nodeStatus.ReadyStatus = conditionStatusReadyStatusMap[nodeReadyConditionStatus] - - // try to add node group label to this node - if err := c.addOrUpdateNodeLabel(&node, nodeGroup.Name); err != nil { - klog.Errorf("failed to update belonging label for node %s in nodegroup %s, %s, continue to reconcile other nodes", node, nodeGroup.Name, err) - nodeStatus.SelectionStatus = groupingv1alpha1.FailedSelection - nodeStatus.SelectionStatusReason = err.Error() - } - } - - sort.Sort(&nodeGroupStatusSort{nodeStatusList}) - newNodeGroupStatus := groupingv1alpha1.NodeGroupStatus{NodeStatuses: nodeStatusList} - if !equality.Semantic.DeepEqual(nodeGroup.Status, newNodeGroupStatus) { - // the status of this NodeGroup has changed, try to update status - nodeGroup.Status = newNodeGroupStatus - if err := c.Status().Update(context.TODO(), nodeGroup); err != nil { - klog.Errorf("failed to update status for nodegroup %s, %s", nodeGroup.Name, err) - return controllerruntime.Result{Requeue: true}, nil - } - } - - return controllerruntime.Result{}, nil -} - -// SetupWithManager creates a controller and register to controller manager. -func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { - return controllerruntime.NewControllerManagedBy(mgr). - For(&groupingv1alpha1.NodeGroup{}). - Watches(&source.Kind{Type: &corev1.Node{}}, handler.EnqueueRequestsFromMapFunc(c.nodeMapFunc), nil). - Complete(c) -} - -func (c *Controller) removeBelongingLabelOnNodes(nodes []corev1.Node) error { - errs := []error{} - for _, node := range nodes { - n := node.DeepCopy() - delete(n.Labels, LabelBelongingTo) - if err := c.Client.Update(context.TODO(), n); err != nil { - errs = append(errs, err) - } - } - return utilerrors.NewAggregate(errs) -} - -func (c *Controller) removeFinalizer(nodeGroup *groupingv1alpha1.NodeGroup) error { - if !controllerutil.ContainsFinalizer(nodeGroup, LabelBelongingTo) { - return nil - } - controllerutil.RemoveFinalizer(nodeGroup, NodeGroupControllerFinalizer) - if err := c.Client.Update(context.TODO(), nodeGroup); err != nil { - klog.Errorf("failed to remove finalizer on nodegroup %s, %s", nodeGroup, err) - return err - } - return nil -} - -func (c *Controller) getNodesSelectedBy(nodeGroup *groupingv1alpha1.NodeGroup) ([]corev1.Node, error) { - errs := []error{} - allNodes, err := c.getNodesByLabels(nodeGroup.Spec.MatchLabels) - if err != nil { - klog.Errorf("failed to get nodes by MatchLabels %v, %s", nodeGroup.Spec.MatchLabels, err) - errs = append(errs, err) - } - - nodes, err := c.getNodesByNodeName(nodeGroup.Spec.Nodes) - if err != nil { - klog.Errorf("failed to get all nodes specified in the NodeGroup.Spec.Nodes, %s.", err) - errs = append(errs, err) - } - allNodes = append(allNodes, nodes...) - return allNodes, utilerrors.NewAggregate(errs) -} - -// We can assume that one node can only be in one of following conditions: -// 1. This node is an orphan, do not and will not belong to any NodeGroup. -// 2. This node is or will be a member of one NodeGroup. -func (c *Controller) nodeMapFunc(obj client.Object) []controllerruntime.Request { - node := obj.(*corev1.Node) - if nodeGroupName, ok := node.Labels[LabelBelongingTo]; ok { - return []controllerruntime.Request{ - { - NamespacedName: types.NamespacedName{ - Name: nodeGroupName, - }, - }, - } - } - // node do not have belonging label, either a new node will be add to a node group or an orphan node - nodegroupList := &groupingv1alpha1.NodeGroupList{} - if err := c.Client.List(context.TODO(), nodegroupList); err != nil { - klog.Errorf("failed to list all nodegroups, %s", err) - return nil - } - - for _, nodegroup := range nodegroupList.Items { - if IfMatchNodeGroup(node, &nodegroup) { - // this node will be added into a node group - return []controllerruntime.Request{ - { - NamespacedName: types.NamespacedName{ - Name: nodegroup.Name, - }, - }, - } - } - } - - // an orphan node, do not reconcile - return []controllerruntime.Request{} -} - -func (c *Controller) removeBelongingLabelOfNodeGroup(nodeGroupName string) error { - selector := labels.SelectorFromSet(labels.Set( - map[string]string{LabelBelongingTo: nodeGroupName}, - )) - nodeList := &corev1.NodeList{} - err := c.Client.List(context.TODO(), nodeList, &client.ListOptions{LabelSelector: selector}) - if err != nil { - return err - } - - errs := []error{} - for _, node := range nodeList.Items { - newNode := node.DeepCopy() - delete(newNode.Labels, LabelBelongingTo) - if err := c.Client.Update(context.TODO(), newNode); err != nil { - klog.Errorf("failed to delete node group label of %s on node %s, %s", nodeGroupName, node, err) - } - } - return utilerrors.NewAggregate(errs) -} - -// getNodesByLabels can get all nodes matching these labels. -func (c *Controller) getNodesByLabels(matchLabels map[string]string) ([]corev1.Node, error) { - selector := labels.SelectorFromSet(labels.Set(matchLabels)) - nodeList := &corev1.NodeList{} - err := c.Client.List(context.TODO(), nodeList, &client.ListOptions{LabelSelector: selector}) - if err != nil { - return nil, err - } - return nodeList.Items, nil -} - -// getNodesByNodeName can get all nodes specified by node names. -func (c *Controller) getNodesByNodeName(nodeNames []string) ([]corev1.Node, error) { - errs := []error{} - nodes := []corev1.Node{} - for _, name := range nodeNames { - node := &corev1.Node{} - if err := c.Client.Get(context.TODO(), types.NamespacedName{Name: name}, node); err != nil { - klog.Errorf("failed to get node %s, %s", name, err) - errs = append(errs, err) - continue - } - nodes = append(nodes, *node) - } - - return nodes, utilerrors.NewAggregate(errs) -} - -func (c *Controller) addOrUpdateNodeLabel(node *corev1.Node, nodeGroupName string) error { - nodeLabels := node.Labels - v, ok := nodeLabels[LabelBelongingTo] - if ok && v == nodeGroupName { - // nothing to do - return nil - } - if ok && v != nodeGroupName { - return fmt.Errorf("node %s has already belonged to NodeGroup %s", node.Name, nodeGroupName) - } - - // !ok - // add new label to this node - newnode := node.DeepCopy() - newnode.Labels[LabelBelongingTo] = nodeGroupName - if err := c.Client.Update(context.TODO(), newnode); err != nil { - klog.Errorf("failed to add label %s=%s to node %s, %s", LabelBelongingTo, nodeGroupName, node.Name, err) - return err - } - return nil -} - -// IfMatchNodeGroup will check if the node is selected by the nodegroup. -func IfMatchNodeGroup(node *corev1.Node, nodegroup *groupingv1alpha1.NodeGroup) bool { - // check if nodename is in the nodegroup.Spec.Nodes - for _, nodeName := range nodegroup.Spec.Nodes { - if nodeName == node.Name { - return true - } - } - // check if labels of this node selected by nodegroup.Spec.MatchLabels - selector := labels.SelectorFromSet(labels.Set(nodegroup.Spec.MatchLabels)) - return selector.Matches(labels.Set(node.Labels)) -} - -func getNodeReadyConditionFromNode(node *corev1.Node) (corev1.ConditionStatus, bool) { - for _, condition := range node.Status.Conditions { - if condition.Type == corev1.NodeReady { - return condition.Status, true - } - } - return "", false -} - -func nodesDiff(oldNodes []corev1.Node, newNodes []corev1.Node) ([]corev1.Node, []corev1.Node) { - nodesDeleted, nodesAdded := []corev1.Node{}, []corev1.Node{} - m := map[string]corev1.Node{} - for _, n := range oldNodes { - m[n.Name] = n - } - for _, n := range newNodes { - _, exist := m[n.Name] - if exist { - delete(m, n.Name) - } else { - nodesAdded = append(nodesAdded, n) - } - } - for _, n := range m { - nodesDeleted = append(nodesDeleted, n) - } - return nodesDeleted, nodesAdded -} diff --git a/hack/generate-crds.sh b/hack/generate-crds.sh index ec3534047..8eba11a75 100755 --- a/hack/generate-crds.sh +++ b/hack/generate-crds.sh @@ -22,7 +22,7 @@ CRD_VERSIONS=v1 CRD_OUTPUTS=build/crds DEVICES_VERSION=v1alpha2 RELIABLESYNCS_VERSION=v1alpha1 -GROUPING_VERSION=v1alpha1 +APPS_VERSION=v1alpha1 HELM_CRDS_DIR=manifests/charts/cloudcore/crds ROUTER_DIR=build/crds/router @@ -79,7 +79,7 @@ function :copy:to:destination { # rename files, copy files mkdir -p ${CRD_OUTPUTS}/devices mkdir -p ${CRD_OUTPUTS}/reliablesyncs - mkdir -p ${CRD_OUTPUTS}/grouping + mkdir -p ${CRD_OUTPUTS}/apps for entry in `ls /tmp/crds/*.yaml`; do CRD_NAME=$(echo ${entry} | cut -d'.' -f3 | cut -d'_' -f2) @@ -90,8 +90,8 @@ function :copy:to:destination { cp -v ${entry} ${HELM_CRDS_DIR}/devices_${DEVICES_VERSION}_${CRD_NAME}.yaml elif [ "$CRD_NAME" == "edgeapplications" ] || [ "$CRD_NAME" == "nodegroups" ]; then CRD_NAME=$(remove_suffix_s "$CRD_NAME") - cp -v ${entry} ${CRD_OUTPUTS}/grouping/grouping_${GROUPING_VERSION}_${CRD_NAME}.yaml - cp -v ${entry} ${HELM_CRDS_DIR}/grouping_${GROUPING_VERSION}_${CRD_NAME}.yaml + cp -v ${entry} ${CRD_OUTPUTS}/apps/apps_${APPS_VERSION}_${CRD_NAME}.yaml + cp -v ${entry} ${HELM_CRDS_DIR}/apps_${APPS_VERSION}_${CRD_NAME}.yaml elif [ "$CRD_NAME" == "clusterobjectsyncs" ]; then cp -v ${entry} ${CRD_OUTPUTS}/reliablesyncs/cluster_objectsync_${RELIABLESYNCS_VERSION}.yaml cp -v ${entry} ${HELM_CRDS_DIR}/cluster_objectsync_${RELIABLESYNCS_VERSION}.yaml diff --git a/hack/lib/golang.sh b/hack/lib/golang.sh index 7d4717050..deda28e7d 100755 --- a/hack/lib/golang.sh +++ b/hack/lib/golang.sh @@ -174,6 +174,7 @@ ALL_BINARIES_AND_TARGETS=( csidriver:cloud/cmd/csidriver iptablesmanager:cloud/cmd/iptablesmanager edgemark:edge/cmd/edgemark + controllermanager:cloud/cmd/controllermanager ) kubeedge::golang::get_target_by_binary() { diff --git a/hack/make-rules/image.sh b/hack/make-rules/image.sh index f93d5c6f3..275a55dbc 100755 --- a/hack/make-rules/image.sh +++ b/hack/make-rules/image.sh @@ -32,6 +32,7 @@ ALL_IMAGES_AND_TARGETS=( csidriver:csidriver:build/csidriver/Dockerfile iptablesmanager:iptables-manager:build/iptablesmanager/Dockerfile edgemark:edgemark:build/edgemark/Dockerfile + controllermanager:controller-manager:build/controllermanager/Dockerfile installation-package:installation-package:build/docker/installation-package/installation-package.dockerfile ) diff --git a/hack/update-codegen.sh b/hack/update-codegen.sh index 0541515b5..f8d485c7e 100755 --- a/hack/update-codegen.sh +++ b/hack/update-codegen.sh @@ -24,5 +24,5 @@ export GOPATH="${GOPATH:-$(go env GOPATH)}" ${KUBEEDGE_ROOT}/hack/generate-groups.sh "deepcopy,client,informer,lister" \ github.com/kubeedge/kubeedge/pkg/client github.com/kubeedge/kubeedge/pkg/apis \ -"devices:v1alpha2 reliablesyncs:v1alpha1 rules:v1 grouping:v1alpha1" \ +"devices:v1alpha2 reliablesyncs:v1alpha1 rules:v1 apps:v1alpha1" \ --go-header-file ${KUBEEDGE_ROOT}/hack/boilerplate/boilerplate.txt diff --git a/manifests/charts/cloudcore/crds/grouping_v1alpha1_edgeapplication.yaml b/manifests/charts/cloudcore/crds/apps_v1alpha1_edgeapplication.yaml index 4f2e67b36..0b720c231 100644 --- a/manifests/charts/cloudcore/crds/grouping_v1alpha1_edgeapplication.yaml +++ b/manifests/charts/cloudcore/crds/apps_v1alpha1_edgeapplication.yaml @@ -6,9 +6,9 @@ metadata: annotations: controller-gen.kubebuilder.io/version: v0.6.2 creationTimestamp: null - name: edgeapplications.grouping.kubeedge.io + name: edgeapplications.apps.kubeedge.io spec: - group: grouping.kubeedge.io + group: apps.kubeedge.io names: kind: EdgeApplication listKind: EdgeApplicationList @@ -57,8 +57,8 @@ spec: description: Overriders represents the override rules that would apply on workload. properties: - imageOverrider: - description: ImageOverrider represents the rules dedicated + imageOverriders: + description: ImageOverriders represents the rules dedicated to handling image overrides. items: description: ImageOverrider represents the rules dedicated @@ -194,12 +194,7 @@ spec: description: Version is the version of the resource. type: string required: - - kind - - name - - namespace - ordinal - - resource - - version type: object required: - identifier diff --git a/manifests/charts/cloudcore/crds/grouping_v1alpha1_nodegroup.yaml b/manifests/charts/cloudcore/crds/apps_v1alpha1_nodegroup.yaml index 1c2741eaf..559d4ba97 100644 --- a/manifests/charts/cloudcore/crds/grouping_v1alpha1_nodegroup.yaml +++ b/manifests/charts/cloudcore/crds/apps_v1alpha1_nodegroup.yaml @@ -6,9 +6,9 @@ metadata: annotations: controller-gen.kubebuilder.io/version: v0.6.2 creationTimestamp: null - name: nodegroups.grouping.kubeedge.io + name: nodegroups.apps.kubeedge.io spec: - group: grouping.kubeedge.io + group: apps.kubeedge.io names: kind: NodeGroup listKind: NodeGroupList diff --git a/pkg/apis/grouping/v1alpha1/doc.go b/pkg/apis/apps/v1alpha1/doc.go index fea03623c..27e7c9713 100644 --- a/pkg/apis/grouping/v1alpha1/doc.go +++ b/pkg/apis/apps/v1alpha1/doc.go @@ -17,5 +17,5 @@ limitations under the License. // Package v1alpha1 is the v1alpha1 version of the API. // +k8s:openapi-gen=true // +k8s:deepcopy-gen=package,register -// +groupName=grouping.kubeedge.io +// +groupName=apps.kubeedge.io package v1alpha1 diff --git a/pkg/apis/grouping/v1alpha1/edgeapplication_types.go b/pkg/apis/apps/v1alpha1/edgeapplication_types.go index a75142b65..f62212ea0 100644 --- a/pkg/apis/grouping/v1alpha1/edgeapplication_types.go +++ b/pkg/apis/apps/v1alpha1/edgeapplication_types.go @@ -58,10 +58,10 @@ type ResourceTemplate struct { type Overriders struct { // Replicas will override the replicas field of deployment // +optional - Replicas int `json:"replicas,omitempty"` - // ImageOverrider represents the rules dedicated to handling image overrides. + Replicas *int `json:"replicas,omitempty"` + // ImageOverriders represents the rules dedicated to handling image overrides. // +optional - ImageOverrider []ImageOverrider `json:"imageOverrider,omitempty"` + ImageOverriders []ImageOverrider `json:"imageOverriders,omitempty"` } // ImageOverrider represents the rules dedicated to handling image overrides. @@ -173,25 +173,32 @@ type ResourceIdentifier struct { // Ordinal represents an index in manifests list, so the condition can still be linked // to a manifest even though manifest cannot be parsed successfully. // +kubebuilder:validation:Minimum=0 + // +required Ordinal int `json:"ordinal"` // Group is the group of the resource. + // +optional Group string `json:"group,omitempty"` // Version is the version of the resource. - Version string `json:"version"` + // +optional + Version string `json:"version,omitempty"` // Kind is the kind of the resource. - Kind string `json:"kind"` + // +optional + Kind string `json:"kind,omitempty"` // Resource is the resource type of the resource - Resource string `json:"resource"` + // +optional + Resource string `json:"resource,omitempty"` // Namespace is the namespace of the resource - Namespace string `json:"namespace"` + // +optional + Namespace string `json:"namespace,omitempty"` // Name is the name of the resource - Name string `json:"name"` + // +optional + Name string `json:"name,omitempty"` } type ManifestCondition string diff --git a/pkg/apis/grouping/v1alpha1/nodegroup_types.go b/pkg/apis/apps/v1alpha1/nodegroup_types.go index 8dd3e47a5..8dd3e47a5 100644 --- a/pkg/apis/grouping/v1alpha1/nodegroup_types.go +++ b/pkg/apis/apps/v1alpha1/nodegroup_types.go diff --git a/pkg/apis/grouping/v1alpha1/register.go b/pkg/apis/apps/v1alpha1/register.go index 448288993..df43c8cdb 100644 --- a/pkg/apis/grouping/v1alpha1/register.go +++ b/pkg/apis/apps/v1alpha1/register.go @@ -25,7 +25,7 @@ import ( ) // GroupName specifies the group name used to register the objects. -const GroupName = "grouping.kubeedge.io" +const GroupName = "apps.kubeedge.io" // GroupVersion specifies the group and the version used to register the objects. var GroupVersion = v1.GroupVersion{Group: GroupName, Version: "v1alpha1"} diff --git a/pkg/apis/grouping/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/apps/v1alpha1/zz_generated.deepcopy.go index ca4598791..664b9bff4 100644 --- a/pkg/apis/grouping/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -324,8 +324,13 @@ func (in *NodeStatus) DeepCopy() *NodeStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Overriders) DeepCopyInto(out *Overriders) { *out = *in - if in.ImageOverrider != nil { - in, out := &in.ImageOverrider, &out.ImageOverrider + if in.Replicas != nil { + in, out := &in.Replicas, &out.Replicas + *out = new(int) + **out = **in + } + if in.ImageOverriders != nil { + in, out := &in.ImageOverriders, &out.ImageOverriders *out = make([]ImageOverrider, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) diff --git a/pkg/client/clientset/versioned/clientset.go b/pkg/client/clientset/versioned/clientset.go index e06747052..053c1df2b 100644 --- a/pkg/client/clientset/versioned/clientset.go +++ b/pkg/client/clientset/versioned/clientset.go @@ -21,8 +21,8 @@ package versioned import ( "fmt" + appsv1alpha1 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/apps/v1alpha1" devicesv1alpha2 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/devices/v1alpha2" - groupingv1alpha1 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/grouping/v1alpha1" reliablesyncsv1alpha1 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/reliablesyncs/v1alpha1" rulesv1 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/rules/v1" discovery "k8s.io/client-go/discovery" @@ -32,8 +32,8 @@ import ( type Interface interface { Discovery() discovery.DiscoveryInterface + AppsV1alpha1() appsv1alpha1.AppsV1alpha1Interface DevicesV1alpha2() devicesv1alpha2.DevicesV1alpha2Interface - GroupingV1alpha1() groupingv1alpha1.GroupingV1alpha1Interface ReliablesyncsV1alpha1() reliablesyncsv1alpha1.ReliablesyncsV1alpha1Interface RulesV1() rulesv1.RulesV1Interface } @@ -42,22 +42,22 @@ type Interface interface { // version included in a Clientset. type Clientset struct { *discovery.DiscoveryClient + appsV1alpha1 *appsv1alpha1.AppsV1alpha1Client devicesV1alpha2 *devicesv1alpha2.DevicesV1alpha2Client - groupingV1alpha1 *groupingv1alpha1.GroupingV1alpha1Client reliablesyncsV1alpha1 *reliablesyncsv1alpha1.ReliablesyncsV1alpha1Client rulesV1 *rulesv1.RulesV1Client } +// AppsV1alpha1 retrieves the AppsV1alpha1Client +func (c *Clientset) AppsV1alpha1() appsv1alpha1.AppsV1alpha1Interface { + return c.appsV1alpha1 +} + // DevicesV1alpha2 retrieves the DevicesV1alpha2Client func (c *Clientset) DevicesV1alpha2() devicesv1alpha2.DevicesV1alpha2Interface { return c.devicesV1alpha2 } -// GroupingV1alpha1 retrieves the GroupingV1alpha1Client -func (c *Clientset) GroupingV1alpha1() groupingv1alpha1.GroupingV1alpha1Interface { - return c.groupingV1alpha1 -} - // ReliablesyncsV1alpha1 retrieves the ReliablesyncsV1alpha1Client func (c *Clientset) ReliablesyncsV1alpha1() reliablesyncsv1alpha1.ReliablesyncsV1alpha1Interface { return c.reliablesyncsV1alpha1 @@ -89,11 +89,11 @@ func NewForConfig(c *rest.Config) (*Clientset, error) { } var cs Clientset var err error - cs.devicesV1alpha2, err = devicesv1alpha2.NewForConfig(&configShallowCopy) + cs.appsV1alpha1, err = appsv1alpha1.NewForConfig(&configShallowCopy) if err != nil { return nil, err } - cs.groupingV1alpha1, err = groupingv1alpha1.NewForConfig(&configShallowCopy) + cs.devicesV1alpha2, err = devicesv1alpha2.NewForConfig(&configShallowCopy) if err != nil { return nil, err } @@ -117,8 +117,8 @@ func NewForConfig(c *rest.Config) (*Clientset, error) { // panics if there is an error in the config. func NewForConfigOrDie(c *rest.Config) *Clientset { var cs Clientset + cs.appsV1alpha1 = appsv1alpha1.NewForConfigOrDie(c) cs.devicesV1alpha2 = devicesv1alpha2.NewForConfigOrDie(c) - cs.groupingV1alpha1 = groupingv1alpha1.NewForConfigOrDie(c) cs.reliablesyncsV1alpha1 = reliablesyncsv1alpha1.NewForConfigOrDie(c) cs.rulesV1 = rulesv1.NewForConfigOrDie(c) @@ -129,8 +129,8 @@ func NewForConfigOrDie(c *rest.Config) *Clientset { // New creates a new Clientset for the given RESTClient. func New(c rest.Interface) *Clientset { var cs Clientset + cs.appsV1alpha1 = appsv1alpha1.New(c) cs.devicesV1alpha2 = devicesv1alpha2.New(c) - cs.groupingV1alpha1 = groupingv1alpha1.New(c) cs.reliablesyncsV1alpha1 = reliablesyncsv1alpha1.New(c) cs.rulesV1 = rulesv1.New(c) diff --git a/pkg/client/clientset/versioned/fake/clientset_generated.go b/pkg/client/clientset/versioned/fake/clientset_generated.go index 1ead541fe..cf687ef14 100644 --- a/pkg/client/clientset/versioned/fake/clientset_generated.go +++ b/pkg/client/clientset/versioned/fake/clientset_generated.go @@ -20,10 +20,10 @@ package fake import ( clientset "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" + appsv1alpha1 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/apps/v1alpha1" + fakeappsv1alpha1 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/apps/v1alpha1/fake" devicesv1alpha2 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/devices/v1alpha2" fakedevicesv1alpha2 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/devices/v1alpha2/fake" - groupingv1alpha1 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/grouping/v1alpha1" - fakegroupingv1alpha1 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/grouping/v1alpha1/fake" reliablesyncsv1alpha1 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/reliablesyncs/v1alpha1" fakereliablesyncsv1alpha1 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/reliablesyncs/v1alpha1/fake" rulesv1 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/rules/v1" @@ -85,16 +85,16 @@ var ( _ testing.FakeClient = &Clientset{} ) +// AppsV1alpha1 retrieves the AppsV1alpha1Client +func (c *Clientset) AppsV1alpha1() appsv1alpha1.AppsV1alpha1Interface { + return &fakeappsv1alpha1.FakeAppsV1alpha1{Fake: &c.Fake} +} + // DevicesV1alpha2 retrieves the DevicesV1alpha2Client func (c *Clientset) DevicesV1alpha2() devicesv1alpha2.DevicesV1alpha2Interface { return &fakedevicesv1alpha2.FakeDevicesV1alpha2{Fake: &c.Fake} } -// GroupingV1alpha1 retrieves the GroupingV1alpha1Client -func (c *Clientset) GroupingV1alpha1() groupingv1alpha1.GroupingV1alpha1Interface { - return &fakegroupingv1alpha1.FakeGroupingV1alpha1{Fake: &c.Fake} -} - // ReliablesyncsV1alpha1 retrieves the ReliablesyncsV1alpha1Client func (c *Clientset) ReliablesyncsV1alpha1() reliablesyncsv1alpha1.ReliablesyncsV1alpha1Interface { return &fakereliablesyncsv1alpha1.FakeReliablesyncsV1alpha1{Fake: &c.Fake} diff --git a/pkg/client/clientset/versioned/fake/register.go b/pkg/client/clientset/versioned/fake/register.go index 0260a08f7..eaae1bb89 100644 --- a/pkg/client/clientset/versioned/fake/register.go +++ b/pkg/client/clientset/versioned/fake/register.go @@ -19,8 +19,8 @@ limitations under the License. package fake import ( + appsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" devicesv1alpha2 "github.com/kubeedge/kubeedge/pkg/apis/devices/v1alpha2" - groupingv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" reliablesyncsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/reliablesyncs/v1alpha1" rulesv1 "github.com/kubeedge/kubeedge/pkg/apis/rules/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -34,8 +34,8 @@ var scheme = runtime.NewScheme() var codecs = serializer.NewCodecFactory(scheme) var localSchemeBuilder = runtime.SchemeBuilder{ + appsv1alpha1.AddToScheme, devicesv1alpha2.AddToScheme, - groupingv1alpha1.AddToScheme, reliablesyncsv1alpha1.AddToScheme, rulesv1.AddToScheme, } diff --git a/pkg/client/clientset/versioned/scheme/register.go b/pkg/client/clientset/versioned/scheme/register.go index a1cc00655..39cc9cdde 100644 --- a/pkg/client/clientset/versioned/scheme/register.go +++ b/pkg/client/clientset/versioned/scheme/register.go @@ -19,8 +19,8 @@ limitations under the License. package scheme import ( + appsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" devicesv1alpha2 "github.com/kubeedge/kubeedge/pkg/apis/devices/v1alpha2" - groupingv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" reliablesyncsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/reliablesyncs/v1alpha1" rulesv1 "github.com/kubeedge/kubeedge/pkg/apis/rules/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -34,8 +34,8 @@ var Scheme = runtime.NewScheme() var Codecs = serializer.NewCodecFactory(Scheme) var ParameterCodec = runtime.NewParameterCodec(Scheme) var localSchemeBuilder = runtime.SchemeBuilder{ + appsv1alpha1.AddToScheme, devicesv1alpha2.AddToScheme, - groupingv1alpha1.AddToScheme, reliablesyncsv1alpha1.AddToScheme, rulesv1.AddToScheme, } diff --git a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/grouping_client.go b/pkg/client/clientset/versioned/typed/apps/v1alpha1/apps_client.go index de22ab0f3..5a59d909d 100644 --- a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/grouping_client.go +++ b/pkg/client/clientset/versioned/typed/apps/v1alpha1/apps_client.go @@ -19,32 +19,32 @@ limitations under the License. package v1alpha1 import ( - v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" + v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/scheme" rest "k8s.io/client-go/rest" ) -type GroupingV1alpha1Interface interface { +type AppsV1alpha1Interface interface { RESTClient() rest.Interface EdgeApplicationsGetter NodeGroupsGetter } -// GroupingV1alpha1Client is used to interact with features provided by the grouping.kubeedge.io group. -type GroupingV1alpha1Client struct { +// AppsV1alpha1Client is used to interact with features provided by the apps.kubeedge.io group. +type AppsV1alpha1Client struct { restClient rest.Interface } -func (c *GroupingV1alpha1Client) EdgeApplications(namespace string) EdgeApplicationInterface { +func (c *AppsV1alpha1Client) EdgeApplications(namespace string) EdgeApplicationInterface { return newEdgeApplications(c, namespace) } -func (c *GroupingV1alpha1Client) NodeGroups() NodeGroupInterface { +func (c *AppsV1alpha1Client) NodeGroups() NodeGroupInterface { return newNodeGroups(c) } -// NewForConfig creates a new GroupingV1alpha1Client for the given config. -func NewForConfig(c *rest.Config) (*GroupingV1alpha1Client, error) { +// NewForConfig creates a new AppsV1alpha1Client for the given config. +func NewForConfig(c *rest.Config) (*AppsV1alpha1Client, error) { config := *c if err := setConfigDefaults(&config); err != nil { return nil, err @@ -53,12 +53,12 @@ func NewForConfig(c *rest.Config) (*GroupingV1alpha1Client, error) { if err != nil { return nil, err } - return &GroupingV1alpha1Client{client}, nil + return &AppsV1alpha1Client{client}, nil } -// NewForConfigOrDie creates a new GroupingV1alpha1Client for the given config and +// NewForConfigOrDie creates a new AppsV1alpha1Client for the given config and // panics if there is an error in the config. -func NewForConfigOrDie(c *rest.Config) *GroupingV1alpha1Client { +func NewForConfigOrDie(c *rest.Config) *AppsV1alpha1Client { client, err := NewForConfig(c) if err != nil { panic(err) @@ -66,9 +66,9 @@ func NewForConfigOrDie(c *rest.Config) *GroupingV1alpha1Client { return client } -// New creates a new GroupingV1alpha1Client for the given RESTClient. -func New(c rest.Interface) *GroupingV1alpha1Client { - return &GroupingV1alpha1Client{c} +// New creates a new AppsV1alpha1Client for the given RESTClient. +func New(c rest.Interface) *AppsV1alpha1Client { + return &AppsV1alpha1Client{c} } func setConfigDefaults(config *rest.Config) error { @@ -86,7 +86,7 @@ func setConfigDefaults(config *rest.Config) error { // RESTClient returns a RESTClient that is used to communicate // with API server by this client implementation. -func (c *GroupingV1alpha1Client) RESTClient() rest.Interface { +func (c *AppsV1alpha1Client) RESTClient() rest.Interface { if c == nil { return nil } diff --git a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/doc.go b/pkg/client/clientset/versioned/typed/apps/v1alpha1/doc.go index 62dbf54f4..62dbf54f4 100644 --- a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/doc.go +++ b/pkg/client/clientset/versioned/typed/apps/v1alpha1/doc.go diff --git a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/edgeapplication.go b/pkg/client/clientset/versioned/typed/apps/v1alpha1/edgeapplication.go index 5637c8a31..211794139 100644 --- a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/edgeapplication.go +++ b/pkg/client/clientset/versioned/typed/apps/v1alpha1/edgeapplication.go @@ -22,7 +22,7 @@ import ( "context" "time" - v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" + v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" scheme "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/scheme" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" types "k8s.io/apimachinery/pkg/types" @@ -57,7 +57,7 @@ type edgeApplications struct { } // newEdgeApplications returns a EdgeApplications -func newEdgeApplications(c *GroupingV1alpha1Client, namespace string) *edgeApplications { +func newEdgeApplications(c *AppsV1alpha1Client, namespace string) *edgeApplications { return &edgeApplications{ client: c.RESTClient(), ns: namespace, diff --git a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/fake/doc.go b/pkg/client/clientset/versioned/typed/apps/v1alpha1/fake/doc.go index 937651e2e..937651e2e 100644 --- a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/fake/doc.go +++ b/pkg/client/clientset/versioned/typed/apps/v1alpha1/fake/doc.go diff --git a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/fake/fake_grouping_client.go b/pkg/client/clientset/versioned/typed/apps/v1alpha1/fake/fake_apps_client.go index 620225b0c..28008a704 100644 --- a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/fake/fake_grouping_client.go +++ b/pkg/client/clientset/versioned/typed/apps/v1alpha1/fake/fake_apps_client.go @@ -19,26 +19,26 @@ limitations under the License. package fake import ( - v1alpha1 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/grouping/v1alpha1" + v1alpha1 "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/typed/apps/v1alpha1" rest "k8s.io/client-go/rest" testing "k8s.io/client-go/testing" ) -type FakeGroupingV1alpha1 struct { +type FakeAppsV1alpha1 struct { *testing.Fake } -func (c *FakeGroupingV1alpha1) EdgeApplications(namespace string) v1alpha1.EdgeApplicationInterface { +func (c *FakeAppsV1alpha1) EdgeApplications(namespace string) v1alpha1.EdgeApplicationInterface { return &FakeEdgeApplications{c, namespace} } -func (c *FakeGroupingV1alpha1) NodeGroups() v1alpha1.NodeGroupInterface { +func (c *FakeAppsV1alpha1) NodeGroups() v1alpha1.NodeGroupInterface { return &FakeNodeGroups{c} } // RESTClient returns a RESTClient that is used to communicate // with API server by this client implementation. -func (c *FakeGroupingV1alpha1) RESTClient() rest.Interface { +func (c *FakeAppsV1alpha1) RESTClient() rest.Interface { var ret *rest.RESTClient return ret } diff --git a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/fake/fake_edgeapplication.go b/pkg/client/clientset/versioned/typed/apps/v1alpha1/fake/fake_edgeapplication.go index 29bfef30b..068250cf2 100644 --- a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/fake/fake_edgeapplication.go +++ b/pkg/client/clientset/versioned/typed/apps/v1alpha1/fake/fake_edgeapplication.go @@ -21,7 +21,7 @@ package fake import ( "context" - v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" + v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" labels "k8s.io/apimachinery/pkg/labels" schema "k8s.io/apimachinery/pkg/runtime/schema" @@ -32,13 +32,13 @@ import ( // FakeEdgeApplications implements EdgeApplicationInterface type FakeEdgeApplications struct { - Fake *FakeGroupingV1alpha1 + Fake *FakeAppsV1alpha1 ns string } -var edgeapplicationsResource = schema.GroupVersionResource{Group: "grouping.kubeedge.io", Version: "v1alpha1", Resource: "edgeapplications"} +var edgeapplicationsResource = schema.GroupVersionResource{Group: "apps.kubeedge.io", Version: "v1alpha1", Resource: "edgeapplications"} -var edgeapplicationsKind = schema.GroupVersionKind{Group: "grouping.kubeedge.io", Version: "v1alpha1", Kind: "EdgeApplication"} +var edgeapplicationsKind = schema.GroupVersionKind{Group: "apps.kubeedge.io", Version: "v1alpha1", Kind: "EdgeApplication"} // Get takes name of the edgeApplication, and returns the corresponding edgeApplication object, and an error if there is any. func (c *FakeEdgeApplications) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.EdgeApplication, err error) { diff --git a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/fake/fake_nodegroup.go b/pkg/client/clientset/versioned/typed/apps/v1alpha1/fake/fake_nodegroup.go index d93077f79..6e77ba3d8 100644 --- a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/fake/fake_nodegroup.go +++ b/pkg/client/clientset/versioned/typed/apps/v1alpha1/fake/fake_nodegroup.go @@ -21,7 +21,7 @@ package fake import ( "context" - v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" + v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" labels "k8s.io/apimachinery/pkg/labels" schema "k8s.io/apimachinery/pkg/runtime/schema" @@ -32,12 +32,12 @@ import ( // FakeNodeGroups implements NodeGroupInterface type FakeNodeGroups struct { - Fake *FakeGroupingV1alpha1 + Fake *FakeAppsV1alpha1 } -var nodegroupsResource = schema.GroupVersionResource{Group: "grouping.kubeedge.io", Version: "v1alpha1", Resource: "nodegroups"} +var nodegroupsResource = schema.GroupVersionResource{Group: "apps.kubeedge.io", Version: "v1alpha1", Resource: "nodegroups"} -var nodegroupsKind = schema.GroupVersionKind{Group: "grouping.kubeedge.io", Version: "v1alpha1", Kind: "NodeGroup"} +var nodegroupsKind = schema.GroupVersionKind{Group: "apps.kubeedge.io", Version: "v1alpha1", Kind: "NodeGroup"} // Get takes name of the nodeGroup, and returns the corresponding nodeGroup object, and an error if there is any. func (c *FakeNodeGroups) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.NodeGroup, err error) { diff --git a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/apps/v1alpha1/generated_expansion.go index 6111ecca1..6111ecca1 100644 --- a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/apps/v1alpha1/generated_expansion.go diff --git a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/nodegroup.go b/pkg/client/clientset/versioned/typed/apps/v1alpha1/nodegroup.go index 29b0b43c0..de35a2401 100644 --- a/pkg/client/clientset/versioned/typed/grouping/v1alpha1/nodegroup.go +++ b/pkg/client/clientset/versioned/typed/apps/v1alpha1/nodegroup.go @@ -22,7 +22,7 @@ import ( "context" "time" - v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" + v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" scheme "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned/scheme" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" types "k8s.io/apimachinery/pkg/types" @@ -56,7 +56,7 @@ type nodeGroups struct { } // newNodeGroups returns a NodeGroups -func newNodeGroups(c *GroupingV1alpha1Client) *nodeGroups { +func newNodeGroups(c *AppsV1alpha1Client) *nodeGroups { return &nodeGroups{ client: c.RESTClient(), } diff --git a/pkg/client/informers/externalversions/grouping/interface.go b/pkg/client/informers/externalversions/apps/interface.go index d17516c70..dfc550dcc 100644 --- a/pkg/client/informers/externalversions/grouping/interface.go +++ b/pkg/client/informers/externalversions/apps/interface.go @@ -16,10 +16,10 @@ limitations under the License. // Code generated by informer-gen. DO NOT EDIT. -package grouping +package apps import ( - v1alpha1 "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions/grouping/v1alpha1" + v1alpha1 "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions/apps/v1alpha1" internalinterfaces "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions/internalinterfaces" ) diff --git a/pkg/client/informers/externalversions/grouping/v1alpha1/edgeapplication.go b/pkg/client/informers/externalversions/apps/v1alpha1/edgeapplication.go index cc2ef91e0..b6fff4001 100644 --- a/pkg/client/informers/externalversions/grouping/v1alpha1/edgeapplication.go +++ b/pkg/client/informers/externalversions/apps/v1alpha1/edgeapplication.go @@ -22,10 +22,10 @@ import ( "context" time "time" - groupingv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" + appsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" versioned "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" internalinterfaces "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions/internalinterfaces" - v1alpha1 "github.com/kubeedge/kubeedge/pkg/client/listers/grouping/v1alpha1" + v1alpha1 "github.com/kubeedge/kubeedge/pkg/client/listers/apps/v1alpha1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" watch "k8s.io/apimachinery/pkg/watch" @@ -62,16 +62,16 @@ func NewFilteredEdgeApplicationInformer(client versioned.Interface, namespace st if tweakListOptions != nil { tweakListOptions(&options) } - return client.GroupingV1alpha1().EdgeApplications(namespace).List(context.TODO(), options) + return client.AppsV1alpha1().EdgeApplications(namespace).List(context.TODO(), options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } - return client.GroupingV1alpha1().EdgeApplications(namespace).Watch(context.TODO(), options) + return client.AppsV1alpha1().EdgeApplications(namespace).Watch(context.TODO(), options) }, }, - &groupingv1alpha1.EdgeApplication{}, + &appsv1alpha1.EdgeApplication{}, resyncPeriod, indexers, ) @@ -82,7 +82,7 @@ func (f *edgeApplicationInformer) defaultInformer(client versioned.Interface, re } func (f *edgeApplicationInformer) Informer() cache.SharedIndexInformer { - return f.factory.InformerFor(&groupingv1alpha1.EdgeApplication{}, f.defaultInformer) + return f.factory.InformerFor(&appsv1alpha1.EdgeApplication{}, f.defaultInformer) } func (f *edgeApplicationInformer) Lister() v1alpha1.EdgeApplicationLister { diff --git a/pkg/client/informers/externalversions/grouping/v1alpha1/interface.go b/pkg/client/informers/externalversions/apps/v1alpha1/interface.go index 51c90bc2b..51c90bc2b 100644 --- a/pkg/client/informers/externalversions/grouping/v1alpha1/interface.go +++ b/pkg/client/informers/externalversions/apps/v1alpha1/interface.go diff --git a/pkg/client/informers/externalversions/grouping/v1alpha1/nodegroup.go b/pkg/client/informers/externalversions/apps/v1alpha1/nodegroup.go index a95a6dece..380e57cf7 100644 --- a/pkg/client/informers/externalversions/grouping/v1alpha1/nodegroup.go +++ b/pkg/client/informers/externalversions/apps/v1alpha1/nodegroup.go @@ -22,10 +22,10 @@ import ( "context" time "time" - groupingv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" + appsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" versioned "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" internalinterfaces "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions/internalinterfaces" - v1alpha1 "github.com/kubeedge/kubeedge/pkg/client/listers/grouping/v1alpha1" + v1alpha1 "github.com/kubeedge/kubeedge/pkg/client/listers/apps/v1alpha1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" watch "k8s.io/apimachinery/pkg/watch" @@ -61,16 +61,16 @@ func NewFilteredNodeGroupInformer(client versioned.Interface, resyncPeriod time. if tweakListOptions != nil { tweakListOptions(&options) } - return client.GroupingV1alpha1().NodeGroups().List(context.TODO(), options) + return client.AppsV1alpha1().NodeGroups().List(context.TODO(), options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } - return client.GroupingV1alpha1().NodeGroups().Watch(context.TODO(), options) + return client.AppsV1alpha1().NodeGroups().Watch(context.TODO(), options) }, }, - &groupingv1alpha1.NodeGroup{}, + &appsv1alpha1.NodeGroup{}, resyncPeriod, indexers, ) @@ -81,7 +81,7 @@ func (f *nodeGroupInformer) defaultInformer(client versioned.Interface, resyncPe } func (f *nodeGroupInformer) Informer() cache.SharedIndexInformer { - return f.factory.InformerFor(&groupingv1alpha1.NodeGroup{}, f.defaultInformer) + return f.factory.InformerFor(&appsv1alpha1.NodeGroup{}, f.defaultInformer) } func (f *nodeGroupInformer) Lister() v1alpha1.NodeGroupLister { diff --git a/pkg/client/informers/externalversions/factory.go b/pkg/client/informers/externalversions/factory.go index a79e0b6f3..93d352e6c 100644 --- a/pkg/client/informers/externalversions/factory.go +++ b/pkg/client/informers/externalversions/factory.go @@ -24,8 +24,8 @@ import ( time "time" versioned "github.com/kubeedge/kubeedge/pkg/client/clientset/versioned" + apps "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions/apps" devices "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions/devices" - grouping "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions/grouping" internalinterfaces "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions/internalinterfaces" reliablesyncs "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions/reliablesyncs" rules "github.com/kubeedge/kubeedge/pkg/client/informers/externalversions/rules" @@ -175,18 +175,18 @@ type SharedInformerFactory interface { ForResource(resource schema.GroupVersionResource) (GenericInformer, error) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool + Apps() apps.Interface Devices() devices.Interface - Grouping() grouping.Interface Reliablesyncs() reliablesyncs.Interface Rules() rules.Interface } -func (f *sharedInformerFactory) Devices() devices.Interface { - return devices.New(f, f.namespace, f.tweakListOptions) +func (f *sharedInformerFactory) Apps() apps.Interface { + return apps.New(f, f.namespace, f.tweakListOptions) } -func (f *sharedInformerFactory) Grouping() grouping.Interface { - return grouping.New(f, f.namespace, f.tweakListOptions) +func (f *sharedInformerFactory) Devices() devices.Interface { + return devices.New(f, f.namespace, f.tweakListOptions) } func (f *sharedInformerFactory) Reliablesyncs() reliablesyncs.Interface { diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index 070702122..cd93d6553 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -21,8 +21,8 @@ package externalversions import ( "fmt" + v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" v1alpha2 "github.com/kubeedge/kubeedge/pkg/apis/devices/v1alpha2" - v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" reliablesyncsv1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/reliablesyncs/v1alpha1" v1 "github.com/kubeedge/kubeedge/pkg/apis/rules/v1" schema "k8s.io/apimachinery/pkg/runtime/schema" @@ -55,18 +55,18 @@ func (f *genericInformer) Lister() cache.GenericLister { // TODO extend this to unknown resources with a client pool func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) { switch resource { - // Group=devices, Version=v1alpha2 + // Group=apps.kubeedge.io, Version=v1alpha1 + case v1alpha1.SchemeGroupVersion.WithResource("edgeapplications"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Apps().V1alpha1().EdgeApplications().Informer()}, nil + case v1alpha1.SchemeGroupVersion.WithResource("nodegroups"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Apps().V1alpha1().NodeGroups().Informer()}, nil + + // Group=devices, Version=v1alpha2 case v1alpha2.SchemeGroupVersion.WithResource("devices"): return &genericInformer{resource: resource.GroupResource(), informer: f.Devices().V1alpha2().Devices().Informer()}, nil case v1alpha2.SchemeGroupVersion.WithResource("devicemodels"): return &genericInformer{resource: resource.GroupResource(), informer: f.Devices().V1alpha2().DeviceModels().Informer()}, nil - // Group=grouping.kubeedge.io, Version=v1alpha1 - case v1alpha1.SchemeGroupVersion.WithResource("edgeapplications"): - return &genericInformer{resource: resource.GroupResource(), informer: f.Grouping().V1alpha1().EdgeApplications().Informer()}, nil - case v1alpha1.SchemeGroupVersion.WithResource("nodegroups"): - return &genericInformer{resource: resource.GroupResource(), informer: f.Grouping().V1alpha1().NodeGroups().Informer()}, nil - // Group=reliablesyncs.kubeedge.io, Version=v1alpha1 case reliablesyncsv1alpha1.SchemeGroupVersion.WithResource("clusterobjectsyncs"): return &genericInformer{resource: resource.GroupResource(), informer: f.Reliablesyncs().V1alpha1().ClusterObjectSyncs().Informer()}, nil diff --git a/pkg/client/listers/grouping/v1alpha1/edgeapplication.go b/pkg/client/listers/apps/v1alpha1/edgeapplication.go index 2667b787f..b5e215f3c 100644 --- a/pkg/client/listers/grouping/v1alpha1/edgeapplication.go +++ b/pkg/client/listers/apps/v1alpha1/edgeapplication.go @@ -19,7 +19,7 @@ limitations under the License. package v1alpha1 import ( - v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" + v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" diff --git a/pkg/client/listers/grouping/v1alpha1/expansion_generated.go b/pkg/client/listers/apps/v1alpha1/expansion_generated.go index efe2db7a3..efe2db7a3 100644 --- a/pkg/client/listers/grouping/v1alpha1/expansion_generated.go +++ b/pkg/client/listers/apps/v1alpha1/expansion_generated.go diff --git a/pkg/client/listers/grouping/v1alpha1/nodegroup.go b/pkg/client/listers/apps/v1alpha1/nodegroup.go index eeb274459..7c9cd600c 100644 --- a/pkg/client/listers/grouping/v1alpha1/nodegroup.go +++ b/pkg/client/listers/apps/v1alpha1/nodegroup.go @@ -19,7 +19,7 @@ limitations under the License. package v1alpha1 import ( - v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/grouping/v1alpha1" + v1alpha1 "github.com/kubeedge/kubeedge/pkg/apis/apps/v1alpha1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" |
