diff options
25 files changed, 182 insertions, 1605 deletions
diff --git a/cloud/cmd/cloudcore/app/options/options.go b/cloud/cmd/cloudcore/app/options/options.go index b23366dfb..b36900d64 100644 --- a/cloud/cmd/cloudcore/app/options/options.go +++ b/cloud/cmd/cloudcore/app/options/options.go @@ -32,6 +32,11 @@ type CloudCoreOptions struct { ConfigFile string } +type TunnelPortRecord struct { + IPTunnelPort map[string]int `json:"ipTunnelPort"` + Port map[int]bool `json:"port"` +} + func NewCloudCoreOptions() *CloudCoreOptions { return &CloudCoreOptions{ ConfigFile: path.Join(constants.DefaultConfigDir, "cloudcore.yaml"), diff --git a/cloud/cmd/cloudcore/app/server.go b/cloud/cmd/cloudcore/app/server.go index 507c69a45..6009ffa47 100644 --- a/cloud/cmd/cloudcore/app/server.go +++ b/cloud/cmd/cloudcore/app/server.go @@ -1,10 +1,18 @@ package app import ( + "context" + "encoding/json" + "errors" "fmt" + "math/rand" + "os" "time" "github.com/spf13/cobra" + v1 "k8s.io/api/core/v1" + apierror "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" cliflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/cli/globalflag" "k8s.io/component-base/term" @@ -14,16 +22,17 @@ import ( beehiveContext "github.com/kubeedge/beehive/pkg/core/context" "github.com/kubeedge/kubeedge/cloud/cmd/cloudcore/app/options" "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub" - hubconfig "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/config" + "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/servers/httpserver" "github.com/kubeedge/kubeedge/cloud/pkg/cloudstream" "github.com/kubeedge/kubeedge/cloud/pkg/common/client" "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" + "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" "github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller" "github.com/kubeedge/kubeedge/cloud/pkg/dynamiccontroller" "github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller" - kele "github.com/kubeedge/kubeedge/cloud/pkg/leaderelection" "github.com/kubeedge/kubeedge/cloud/pkg/router" "github.com/kubeedge/kubeedge/cloud/pkg/synccontroller" + "github.com/kubeedge/kubeedge/common/constants" "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1" "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1/validation" "github.com/kubeedge/kubeedge/pkg/util" @@ -63,17 +72,21 @@ kubernetes controller which manages devices so that the device metadata/status d // To help debugging, immediately log version klog.Infof("Version: %+v", version.Get()) client.InitKubeEdgeClient(config.KubeAPIConfig) - gis := informers.GetInformersManager() - registerModules(config) - // If leader election is enabled, runCommand via LeaderElector until done and exit. - if config.LeaderElection.LeaderElect { - electionChecker := kele.NewLeaderReadyzAdaptor(time.Second * 20) - hubconfig.Config.Checker = electionChecker - kele.Run(config, electionChecker) - return + // Negotiate TunnelPort for multi cloudcore instances + waitTime := rand.Int31n(10) + time.Sleep(time.Duration(waitTime) * time.Second) + tunnelport, err := NegotiateTunnelPort() + if err != nil { + panic(err) } + config.CommonConfig.TunnelPort = *tunnelport + + gis := informers.GetInformersManager() + + registerModules(config) + // Start all modules if disable leader election core.StartModules() gis.Start(beehiveContext.Done()) @@ -107,10 +120,113 @@ kubernetes controller which manages devices so that the device metadata/status d // registerModules register all the modules started in cloudcore func registerModules(c *v1alpha1.CloudCoreConfig) { cloudhub.Register(c.Modules.CloudHub) - edgecontroller.Register(c.Modules.EdgeController) + edgecontroller.Register(c.Modules.EdgeController, c.CommonConfig) devicecontroller.Register(c.Modules.DeviceController) synccontroller.Register(c.Modules.SyncController) cloudstream.Register(c.Modules.CloudStream) router.Register(c.Modules.Router) dynamiccontroller.Register(c.Modules.DynamicController) } + +func NegotiateTunnelPort() (*int, error) { + kubeClient := client.GetKubeClient() + err := httpserver.CreateNamespaceIfNeeded(kubeClient, modules.NamespaceSystem) + if err != nil { + return nil, errors.New("failed to create system namespace") + } + + tunnelPort, err := kubeClient.CoreV1().ConfigMaps(modules.NamespaceSystem).Get(context.TODO(), modules.TunnelPort, metav1.GetOptions{}) + + if err != nil && !apierror.IsNotFound(err) { + return nil, err + } + + localIP := getLocalIP() + + var record options.TunnelPortRecord + if err == nil { + recordStr, found := tunnelPort.Annotations[modules.TunnelPortRecordAnnotationKey] + recordBytes := []byte(recordStr) + if !found { + return nil, errors.New("failed to get tunnel port record") + } + + if err := json.Unmarshal(recordBytes, &record); err != nil { + return nil, err + } + + _, found = record.IPTunnelPort[localIP] + if !found { + port := negotiatePort(record.Port) + + record.IPTunnelPort[localIP] = port + record.Port[port] = true + + recordBytes, err := json.Marshal(record) + if err != nil { + return nil, err + } + + tunnelPort.Annotations[modules.TunnelPortRecordAnnotationKey] = string(recordBytes) + + _, err = kubeClient.CoreV1().ConfigMaps(modules.NamespaceSystem).Update(context.TODO(), tunnelPort, metav1.UpdateOptions{}) + if err != nil { + return nil, err + } + + return &port, nil + } + } + + if apierror.IsNotFound(err) { + record := options.TunnelPortRecord{ + IPTunnelPort: map[string]int{ + localIP: constants.ServerPort, + }, + Port: map[int]bool{ + constants.ServerPort: true, + }, + } + recordBytes, err := json.Marshal(record) + if err != nil { + return nil, err + } + + _, err = kubeClient.CoreV1().ConfigMaps(modules.NamespaceSystem).Create(context.TODO(), &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: modules.TunnelPort, + Namespace: modules.NamespaceSystem, + Annotations: map[string]string{ + modules.TunnelPortRecordAnnotationKey: string(recordBytes), + }, + }, + }, metav1.CreateOptions{}) + + if err != nil { + return nil, err + } + + port := constants.ServerPort + return &port, nil + } + + return nil, nil +} + +func negotiatePort(portRecord map[int]bool) int { + for port := constants.ServerPort; ; { + if _, found := portRecord[port]; !found { + return port + } + port++ + } +} + +func getLocalIP() string { + hostnameOverride, err := os.Hostname() + if err != nil { + hostnameOverride = constants.DefaultHostnameOverride + } + localIP, _ := util.GetLocalIP(hostnameOverride) + return localIP +} diff --git a/cloud/pkg/cloudhub/config/config.go b/cloud/pkg/cloudhub/config/config.go index 3e45cf01d..714f701d0 100644 --- a/cloud/pkg/cloudhub/config/config.go +++ b/cloud/pkg/cloudhub/config/config.go @@ -11,7 +11,6 @@ import ( "github.com/kubeedge/kubeedge/cloud/pkg/client/clientset/versioned" syncinformer "github.com/kubeedge/kubeedge/cloud/pkg/client/informers/externalversions/reliablesyncs/v1alpha1" synclister "github.com/kubeedge/kubeedge/cloud/pkg/client/listers/reliablesyncs/v1alpha1" - kele "github.com/kubeedge/kubeedge/cloud/pkg/leaderelection" "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1" ) @@ -25,7 +24,6 @@ type Configure struct { CaKey []byte Cert []byte Key []byte - Checker *kele.ReadyzAdaptor } func InitConfigure(hub *v1alpha1.CloudHub) { diff --git a/cloud/pkg/cloudhub/handler/messagehandler.go b/cloud/pkg/cloudhub/handler/messagehandler.go index 9f450d986..e76f045ac 100644 --- a/cloud/pkg/cloudhub/handler/messagehandler.go +++ b/cloud/pkg/cloudhub/handler/messagehandler.go @@ -446,10 +446,20 @@ func (mh *MessageHandle) MessageWriteLoop(info *model.HubInfo, stopServe chan Ex copyMsg := deepcopy(msg) trimMessage(copyMsg) + // initialize timer and retry count for sending message + var ( + retry = 0 + retryInterval time.Duration = 2 + ) + for { conn, ok := mh.nodeConns.Load(info.NodeID) if !ok { - time.Sleep(time.Second * 2) + if retry == 1 { + break + } + retry++ + time.Sleep(time.Second * retryInterval) continue } err := mh.sendMsg(conn.(hubio.CloudHubIO), info, copyMsg, msg, nodeStore) diff --git a/cloud/pkg/cloudhub/servers/httpserver/secretsutil.go b/cloud/pkg/cloudhub/servers/httpserver/secretsutil.go index 7b2e56e6a..cdca41411 100644 --- a/cloud/pkg/cloudhub/servers/httpserver/secretsutil.go +++ b/cloud/pkg/cloudhub/servers/httpserver/secretsutil.go @@ -10,11 +10,10 @@ import ( "k8s.io/client-go/kubernetes" "github.com/kubeedge/kubeedge/cloud/pkg/common/client" + "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" ) const ( - NamespaceSystem string = "kubeedge" - TokenSecretName string = "tokensecret" TokenDataName string = "tokendata" CaSecretName string = "casecret" @@ -53,7 +52,7 @@ func CreateTokenSecret(caHashAndToken []byte) error { TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"}, ObjectMeta: metav1.ObjectMeta{ Name: TokenSecretName, - Namespace: NamespaceSystem, + Namespace: modules.NamespaceSystem, }, Data: map[string][]byte{ TokenDataName: caHashAndToken, @@ -61,7 +60,7 @@ func CreateTokenSecret(caHashAndToken []byte) error { StringData: map[string]string{}, Type: "Opaque", } - return CreateSecret(token, NamespaceSystem) + return CreateSecret(token, modules.NamespaceSystem) } func CreateCaSecret(certDER, key []byte) error { @@ -69,7 +68,7 @@ func CreateCaSecret(certDER, key []byte) error { TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"}, ObjectMeta: metav1.ObjectMeta{ Name: CaSecretName, - Namespace: NamespaceSystem, + Namespace: modules.NamespaceSystem, }, Data: map[string][]byte{ CaDataName: certDER, @@ -78,7 +77,7 @@ func CreateCaSecret(certDER, key []byte) error { StringData: map[string]string{}, Type: "Opaque", } - return CreateSecret(caSecret, NamespaceSystem) + return CreateSecret(caSecret, modules.NamespaceSystem) } func CreateCloudCoreSecret(certDER, key []byte) error { @@ -86,7 +85,7 @@ func CreateCloudCoreSecret(certDER, key []byte) error { TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"}, ObjectMeta: metav1.ObjectMeta{ Name: CloudCoreSecretName, - Namespace: NamespaceSystem, + Namespace: modules.NamespaceSystem, }, Data: map[string][]byte{ CloudCoreCertName: certDER, @@ -95,7 +94,7 @@ func CreateCloudCoreSecret(certDER, key []byte) error { StringData: map[string]string{}, Type: "Opaque", } - return CreateSecret(cloudCoreCert, NamespaceSystem) + return CreateSecret(cloudCoreCert, modules.NamespaceSystem) } func CreateNamespaceIfNeeded(cli kubernetes.Interface, ns string) error { diff --git a/cloud/pkg/cloudhub/servers/httpserver/server.go b/cloud/pkg/cloudhub/servers/httpserver/server.go index 832c3efdb..88b97a9d1 100644 --- a/cloud/pkg/cloudhub/servers/httpserver/server.go +++ b/cloud/pkg/cloudhub/servers/httpserver/server.go @@ -34,6 +34,7 @@ import ( "k8s.io/klog/v2" hubconfig "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/config" + "github.com/kubeedge/kubeedge/cloud/pkg/common/modules" "github.com/kubeedge/kubeedge/common/constants" ) @@ -42,7 +43,6 @@ func StartHTTPServer() { router := mux.NewRouter() router.HandleFunc(constants.DefaultCertURL, edgeCoreClientCert).Methods("GET") router.HandleFunc(constants.DefaultCAURL, getCA).Methods("GET") - router.HandleFunc(constants.DefaultCloudCoreReadyCheckURL, electionHandler).Methods("GET") addr := fmt.Sprintf("%s:%d", hubconfig.Config.HTTPS.Address, hubconfig.Config.HTTPS.Port) @@ -71,29 +71,6 @@ func getCA(w http.ResponseWriter, r *http.Request) { } } -//electionHandler returns the status whether the cloudcore is ready -func electionHandler(w http.ResponseWriter, r *http.Request) { - checker := hubconfig.Config.Checker - if checker == nil { - w.WriteHeader(http.StatusOK) - if _, err := w.Write([]byte("Cloudcore is ready with no leaderelection")); err != nil { - klog.Errorf("failed to write http response, err: %v", err) - } - return - } - if checker.Check(r) != nil { - w.WriteHeader(http.StatusNotFound) - if _, err := w.Write([]byte("Cloudcore is not ready")); err != nil { - klog.Errorf("failed to write http response, err: %v", err) - } - } else { - w.WriteHeader(http.StatusOK) - if _, err := w.Write([]byte("Cloudcore is ready")); err != nil { - klog.Errorf("failed to write http response, err: %v", err) - } - } -} - // EncodeCertPEM returns PEM-encoded certificate data func EncodeCertPEM(cert *x509.Certificate) []byte { block := pem.Block{ @@ -244,7 +221,7 @@ func signCerts(subInfo pkix.Name, pbKey crypto.PublicKey) ([]byte, error) { // CheckCaExistsFromSecret checks ca from secret func CheckCaExistsFromSecret() bool { - if _, err := GetSecret(CaSecretName, NamespaceSystem); err != nil { + if _, err := GetSecret(CaSecretName, modules.NamespaceSystem); err != nil { return false } return true @@ -252,7 +229,7 @@ func CheckCaExistsFromSecret() bool { // CheckCertExistsFromSecret checks CloudCore certificate from secret func CheckCertExistsFromSecret() bool { - if _, err := GetSecret(CloudCoreSecretName, NamespaceSystem); err != nil { + if _, err := GetSecret(CloudCoreSecretName, modules.NamespaceSystem); err != nil { return false } return true @@ -288,7 +265,7 @@ func PrepareAllCerts() error { UpdateConfig(caDER, caKeyDER, nil, nil) } else { - s, err := GetSecret(CaSecretName, NamespaceSystem) + s, err := GetSecret(CaSecretName, modules.NamespaceSystem) if err != nil { klog.Errorf("failed to get CaSecret, error: %v", err) return err @@ -327,7 +304,7 @@ func PrepareAllCerts() error { UpdateConfig(nil, nil, certDER, keyDER) } else { - s, err := GetSecret(CloudCoreSecretName, NamespaceSystem) + s, err := GetSecret(CloudCoreSecretName, modules.NamespaceSystem) if err != nil { klog.Errorf("failed to get CloudCore secret, error: %v", err) return err diff --git a/cloud/pkg/common/modules/modules.go b/cloud/pkg/common/modules/modules.go index be45bfc4e..a59ffec38 100644 --- a/cloud/pkg/common/modules/modules.go +++ b/cloud/pkg/common/modules/modules.go @@ -23,4 +23,9 @@ const ( RouterGroupName = "router" UserGroup = "user" + + NamespaceSystem string = "kubeedge" + TunnelPort string = "tunnelport" + + TunnelPortRecordAnnotationKey string = "tunnelportrecord.kubeedge.io" ) diff --git a/cloud/pkg/edgecontroller/controller/upstream.go b/cloud/pkg/edgecontroller/controller/upstream.go index edeec1c2b..93e56a64f 100644 --- a/cloud/pkg/edgecontroller/controller/upstream.go +++ b/cloud/pkg/edgecontroller/controller/upstream.go @@ -88,6 +88,7 @@ type UpstreamController struct { kubeClient kubernetes.Interface messageLayer messagelayer.MessageLayer crdClient crdClientset.Interface + TunnelPort int // message channel nodeStatusChan chan model.Message @@ -402,6 +403,7 @@ func (uc *UpstreamController) updatePodStatus() { // createNode create new edge node to kubernetes func (uc *UpstreamController) createNode(name string, node *v1.Node) (*v1.Node, error) { node.Name = name + node.Status.DaemonEndpoints.KubeletEndpoint.Port = int32(uc.TunnelPort) return uc.kubeClient.CoreV1().Nodes().Create(context.Background(), node, metaV1.CreateOptions{}) } @@ -524,6 +526,9 @@ func (uc *UpstreamController) updateNodeStatus() { nodeStatusRequest.Status.VolumesAttached = getNode.Status.VolumesAttached getNode.Status = nodeStatusRequest.Status + + getNode.Status.DaemonEndpoints.KubeletEndpoint.Port = int32(uc.TunnelPort) + node, err := uc.kubeClient.CoreV1().Nodes().UpdateStatus(context.Background(), getNode, metaV1.UpdateOptions{}) if err != nil { klog.Warningf("message: %s process failure, update node failed with error: %s, namespace: %s, name: %s", msg.GetID(), err, getNode.Namespace, getNode.Name) diff --git a/cloud/pkg/edgecontroller/edgecontroller.go b/cloud/pkg/edgecontroller/edgecontroller.go index 6f83d46c0..3f6923b67 100644 --- a/cloud/pkg/edgecontroller/edgecontroller.go +++ b/cloud/pkg/edgecontroller/edgecontroller.go @@ -18,7 +18,7 @@ type EdgeController struct { downstream *controller.DownstreamController } -func newEdgeController(enable bool) *EdgeController { +func newEdgeController(enable bool, tunnelPort int) *EdgeController { ec := &EdgeController{enable: enable} if !enable { return ec @@ -28,6 +28,8 @@ func newEdgeController(enable bool) *EdgeController { if err != nil { klog.Fatalf("new upstream controller failed with error: %s", err) } + ec.upstream.TunnelPort = tunnelPort + ec.downstream, err = controller.NewDownstreamController(informers.GetInformersManager().GetK8sInformerFactory(), informers.GetInformersManager(), informers.GetInformersManager().GetCRDInformerFactory()) if err != nil { klog.Fatalf("new downstream controller failed with error: %s", err) @@ -35,10 +37,10 @@ func newEdgeController(enable bool) *EdgeController { return ec } -func Register(ec *v1alpha1.EdgeController) { +func Register(ec *v1alpha1.EdgeController, commonConfig *v1alpha1.CommonConfig) { // TODO move module config into EdgeController struct @kadisi config.InitConfigure(ec) - core.Register(newEdgeController(ec.Enable)) + core.Register(newEdgeController(ec.Enable, commonConfig.TunnelPort)) } // Name of controller diff --git a/cloud/pkg/leaderelection/leaderelection.go b/cloud/pkg/leaderelection/leaderelection.go deleted file mode 100644 index c3608ffb1..000000000 --- a/cloud/pkg/leaderelection/leaderelection.go +++ /dev/null @@ -1,227 +0,0 @@ -package leaderelection - -import ( - "context" - "encoding/json" - "fmt" - "os" - "syscall" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/util/uuid" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/tools/record" - componentbaseconfig "k8s.io/component-base/config" - "k8s.io/klog/v2" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" - - "github.com/kubeedge/beehive/pkg/core" - beehiveContext "github.com/kubeedge/beehive/pkg/core/context" - "github.com/kubeedge/kubeedge/cloud/pkg/common/client" - "github.com/kubeedge/kubeedge/cloud/pkg/common/informers" - config "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1" -) - -func Run(cfg *config.CloudCoreConfig, readyzAdaptor *ReadyzAdaptor) { - // To help debugging, immediately log config for LeaderElection - klog.Infof("Config for LeaderElection : %v", *cfg.LeaderElection) - // Init Context for leaderElection - beehiveContext.InitContext(beehiveContext.MsgCtxTypeChannel) - // Init podReadinessGate to false at the begin of Run - if err := TryToPatchPodReadinessGate(corev1.ConditionFalse); err != nil { - klog.Errorf("Error init pod readinessGate: %v", err) - } - - cli := client.GetKubeClient() - if err := CreateNamespaceIfNeeded(cli, "kubeedge"); err != nil { - klog.Warningf("Create Namespace kubeedge failed with error: %s", err) - return - } - - coreBroadcaster := record.NewBroadcaster() - coreBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: cli.CoreV1().Events("")}) - coreRecorder := coreBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "CloudCore"}) - - leaderElectionConfig, err := makeLeaderElectionConfig(*cfg.LeaderElection, cli, coreRecorder) - if err != nil { - klog.Errorf("couldn't create leaderElectorConfig: %v", err) - return - } - - leaderElectionConfig.Callbacks = leaderelection.LeaderCallbacks{ - OnStartedLeading: func(ctx context.Context) { - // Start all modules, - core.StartModules() - informers.GetInformersManager().Start(beehiveContext.Done()) - - // Patch PodReadinessGate if program run in pod - if err := TryToPatchPodReadinessGate(corev1.ConditionTrue); err != nil { - // Terminate the program gracefully - klog.Errorf("Error patching pod readinessGate: %v", err) - if err := TriggerGracefulShutdown(); err != nil { - klog.Fatalf("failed to gracefully terminate program: %v", err) - } - } - }, - OnStoppedLeading: func() { - klog.Errorf("leaderelection lost, gracefully terminate program") - - // Reset PodReadinessGate to false if cloudcore stop - if err := TryToPatchPodReadinessGate(corev1.ConditionFalse); err != nil { - klog.Errorf("Error reset pod readinessGate: %v", err) - } - - // Trigger core.GracefulShutdown() - if err := TriggerGracefulShutdown(); err != nil { - klog.Fatalf("failed to gracefully terminate program: %v", err) - } - }, - } - - leaderElector, err := leaderelection.NewLeaderElector(*leaderElectionConfig) - if err != nil { - klog.Errorf("couldn't create leader elector: %v", err) - return - } - readyzAdaptor.SetLeaderElection(leaderElector) - - // Start leaderElection until becoming leader, terminate program if leader lost or context.cancel - go leaderElector.Run(beehiveContext.GetContext()) - - // Monitor system signal and shutdown gracefully and it should be in main gorutine - core.GracefulShutdown() -} - -// makeLeaderElectionConfig builds a leader election configuration. It will -// create a new resource lock associated with the configuration. -func makeLeaderElectionConfig(config componentbaseconfig.LeaderElectionConfiguration, client clientset.Interface, recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) { - hostname, err := os.Hostname() - if err != nil { - return nil, fmt.Errorf("unable to get hostname: %v", err) - } - // add a uniquifier so that two processes on the same host don't accidentally both become active - id := hostname + "_" + string(uuid.NewUUID()) - - rl, err := resourcelock.New(config.ResourceLock, - config.ResourceNamespace, - config.ResourceName, - client.CoreV1(), - client.CoordinationV1(), - resourcelock.ResourceLockConfig{ - Identity: id, - EventRecorder: recorder, - }) - if err != nil { - return nil, fmt.Errorf("couldn't create resource lock: %v", err) - } - - return &leaderelection.LeaderElectionConfig{ - Lock: rl, - LeaseDuration: config.LeaseDuration.Duration, - RenewDeadline: config.RenewDeadline.Duration, - RetryPeriod: config.RetryPeriod.Duration, - WatchDog: nil, - Name: "cloudcore", - }, nil -} - -// Try to patch PodReadinessGate if program runs in pod -func TryToPatchPodReadinessGate(status corev1.ConditionStatus) error { - podname, isInPod := os.LookupEnv("CLOUDCORE_POD_NAME") - if !isInPod { - klog.Infoln("CloudCore is not running in pod") - return nil - } - - namespace := os.Getenv("CLOUDCORE_POD_NAMESPACE") - klog.Infof("CloudCore is running in pod %s/%s, try to patch PodReadinessGate", namespace, podname) - client := client.GetKubeClient() - - //Creat patchBytes - getPod, err := client.CoreV1().Pods(namespace).Get(context.Background(), podname, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get pod(%s/%s): %v", namespace, podname, err) - } - originalJSON, err := json.Marshal(getPod) - if err != nil { - return fmt.Errorf("failed to marshal original pod %q into JSON: %v", podname, err) - } - - //Todo: Read PodReadinessGate from CloudCore configuration or env - condition := corev1.PodCondition{Type: "kubeedge.io/CloudCoreIsLeader", Status: status} - podutil.UpdatePodCondition(&getPod.Status, &condition) - newJSON, err := json.Marshal(getPod) - if err != nil { - return fmt.Errorf("failed to marshal modified pod %q into JSON: %v", podname, err) - } - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(originalJSON, newJSON, corev1.Pod{}) - if err != nil { - return fmt.Errorf("failed to create two way merge patch: %v", err) - } - - var maxRetries = 3 - for i := 1; i <= maxRetries; i++ { - _, err = client.CoreV1().Pods(namespace).Patch(context.Background(), podname, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") - if err == nil { - klog.Infof("Successfully patching podReadinessGate: kubeedge.io/CloudCoreIsLeader to pod %q through apiserver", podname) - return nil - } - if !errors.IsConflict(err) { - return err - } - - // If the patch failure is due to update conflict, the necessary retransmission is performed - if i >= maxRetries { - klog.Errorf("updateMaxRetries(%d) has reached, failed to patching podReadinessGate: kubeedge.io/CloudCoreIsLeader because of update conflict", maxRetries) - } - continue - } - - return err -} - -// TriggerGracefulShutdown triggers core.GracefulShutdown() -func TriggerGracefulShutdown() error { - if beehiveContext.GetContext().Err() != nil { - klog.Infoln("Program is in gracefully shutdown") - return nil - } - - klog.Infoln("Trigger graceful shutdown!") - p, err := os.FindProcess(syscall.Getpid()) - if err != nil { - return fmt.Errorf("Failed to find self process: %v", err) - } - - if err := p.Signal(os.Interrupt); err != nil { - return fmt.Errorf("Failed to trigger graceful shutdown: %v", err) - } - return nil -} - -func CreateNamespaceIfNeeded(cli clientset.Interface, ns string) error { - c := cli.CoreV1() - if _, err := c.Namespaces().Get(context.Background(), ns, metav1.GetOptions{}); err == nil { - // the namespace already exists - return nil - } - newNs := &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: ns, - Namespace: "", - }, - } - _, err := c.Namespaces().Create(context.Background(), newNs, metav1.CreateOptions{}) - if err != nil && errors.IsAlreadyExists(err) { - err = nil - } - return err -} diff --git a/cloud/pkg/leaderelection/readyzadaptor.go b/cloud/pkg/leaderelection/readyzadaptor.go deleted file mode 100644 index 6501817c3..000000000 --- a/cloud/pkg/leaderelection/readyzadaptor.go +++ /dev/null @@ -1,59 +0,0 @@ -package leaderelection - -import ( - "fmt" - "net/http" - "sync" - "time" - - "k8s.io/client-go/tools/leaderelection" -) - -// ReadyzAdaptor associates the /readyz endpoint with the LeaderElection object. -// It helps deal with the /readyz endpoint being set up prior to the LeaderElection. -// This contains the code needed to act as an adaptor between the leader -// election code the health check code. It allows us to provide readyz -// status about the leader election. Most specifically about if the leader -// has failed to renew without exiting the process. In that case we should -// report not healthy and rely on the kubelet to take down the process. -type ReadyzAdaptor struct { - pointerLock sync.Mutex - le *leaderelection.LeaderElector - timeout time.Duration -} - -// Name returns the name of the health check we are implementing. -func (l *ReadyzAdaptor) Name() string { - return "leaderElection" -} - -// Check is called by the readyz endpoint handler. -// It fails (returns an error) if we own the lease but had not been able to renew it. -func (l *ReadyzAdaptor) Check(req *http.Request) error { - l.pointerLock.Lock() - defer l.pointerLock.Unlock() - if l.le == nil { - return fmt.Errorf("leaderElection is not setting") - } - if !l.le.IsLeader() { - return fmt.Errorf("not yet a leader") - } - return l.le.Check(l.timeout) -} - -// SetLeaderElection ties a leader election object to a ReadyzAdaptor -func (l *ReadyzAdaptor) SetLeaderElection(le *leaderelection.LeaderElector) { - l.pointerLock.Lock() - defer l.pointerLock.Unlock() - l.le = le -} - -// NewLeaderReadyzAdaptor creates a basic healthz adaptor to monitor a leader election. -// timeout determines the time beyond the lease expiry to be allowed for timeout. -// checks within the timeout period after the lease expires will still return healthy. -func NewLeaderReadyzAdaptor(timeout time.Duration) *ReadyzAdaptor { - result := &ReadyzAdaptor{ - timeout: timeout, - } - return result -} diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go index 26d384b6a..815de9cf9 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go @@ -18,11 +18,9 @@ package v1alpha1 import ( "path" - "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilnet "k8s.io/apimachinery/pkg/util/net" - componentbaseconfig "k8s.io/component-base/config" "github.com/kubeedge/kubeedge/common/constants" metaconfig "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/meta/v1alpha1" @@ -37,6 +35,9 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig { Kind: Kind, APIVersion: path.Join(GroupName, APIVersion), }, + CommonConfig: &CommonConfig{ + TunnelPort: constants.ServerPort, + }, KubeAPIConfig: &KubeAPIConfig{ Master: "", ContentType: constants.DefaultKubeContentType, @@ -165,15 +166,6 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig { RestTimeout: 60, }, }, - LeaderElection: &componentbaseconfig.LeaderElectionConfiguration{ - LeaderElect: false, - LeaseDuration: metav1.Duration{Duration: 15 * time.Second}, - RenewDeadline: metav1.Duration{Duration: 10 * time.Second}, - RetryPeriod: metav1.Duration{Duration: 2 * time.Second}, - ResourceLock: "endpointsleases", - ResourceNamespace: constants.KubeEdgeNameSpace, - ResourceName: "cloudcorelease", - }, } return c } @@ -221,8 +213,5 @@ func NewMinCloudCoreConfig() *CloudCoreConfig { RestTimeout: 60, }, }, - LeaderElection: &componentbaseconfig.LeaderElectionConfiguration{ - LeaderElect: false, - }, } } diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go index 462c4c01b..e2211ddec 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go @@ -20,7 +20,6 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - componentbaseconfig "k8s.io/component-base/config" metaconfig "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/meta/v1alpha1" ) @@ -28,14 +27,21 @@ import ( // CloudCoreConfig indicates the config of cloudCore which get from cloudCore config file type CloudCoreConfig struct { metav1.TypeMeta + // CommonConfig indicates common config for all modules + // +Required + CommonConfig *CommonConfig `json:"commonConfig,omitempty"` // KubeAPIConfig indicates the kubernetes cluster info which cloudCore will connected // +Required KubeAPIConfig *KubeAPIConfig `json:"kubeAPIConfig,omitempty"` // Modules indicates cloudCore modules config // +Required Modules *Modules `json:"modules,omitempty"` - // Configuration for LeaderElection - LeaderElection *componentbaseconfig.LeaderElectionConfiguration `json:"leaderelection,omitempty"` +} + +// KubeAPIConfig indicates the configuration for interacting with k8s server +type CommonConfig struct { + // TunnelPort indicates the port that the cloudcore tunnel listened + TunnelPort int `json:"tunnelPort,omitempty"` } // KubeAPIConfig indicates the configuration for interacting with k8s server diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/validation/validation.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/validation/validation.go index 5088a94ff..5ac584168 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/validation/validation.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/validation/validation.go @@ -41,7 +41,6 @@ func ValidateCloudCoreConfiguration(c *v1alpha1.CloudCoreConfig) field.ErrorList allErrs = append(allErrs, ValidateModuleDeviceController(*c.Modules.DeviceController)...) allErrs = append(allErrs, ValidateModuleSyncController(*c.Modules.SyncController)...) allErrs = append(allErrs, ValidateModuleDynamicController(*c.Modules.DynamicController)...) - allErrs = append(allErrs, ValidateLeaderElectionConfiguration(*c.LeaderElection)...) allErrs = append(allErrs, ValidateModuleCloudStream(*c.Modules.CloudStream)...) return allErrs } diff --git a/tests/integration/framework/util.go b/tests/integration/framework/util.go index c906064f5..6bb583800 100644 --- a/tests/integration/framework/util.go +++ b/tests/integration/framework/util.go @@ -3,53 +3,10 @@ package framework import ( "k8s.io/klog/v2" - "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub" - "github.com/kubeedge/kubeedge/cloud/pkg/cloudstream" - "github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller" - "github.com/kubeedge/kubeedge/cloud/pkg/dynamiccontroller" - "github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller" - "github.com/kubeedge/kubeedge/cloud/pkg/router" - "github.com/kubeedge/kubeedge/cloud/pkg/synccontroller" - "github.com/kubeedge/kubeedge/edge/pkg/common/dbm" - "github.com/kubeedge/kubeedge/edge/pkg/devicetwin" - "github.com/kubeedge/kubeedge/edge/pkg/edgehub" - "github.com/kubeedge/kubeedge/edge/pkg/edgestream" - "github.com/kubeedge/kubeedge/edge/pkg/eventbus" - "github.com/kubeedge/kubeedge/edge/pkg/metamanager" - "github.com/kubeedge/kubeedge/edge/pkg/servicebus" - "github.com/kubeedge/kubeedge/edge/test" cloudconfig "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1" edgeconfig "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha1" ) -// registerModules register all the modules -func registerModules(i interface{}) { - switch c := i.(type) { - case *cloudconfig.CloudCoreConfig: - cloudhub.Register(c.Modules.CloudHub) - edgecontroller.Register(c.Modules.EdgeController) - devicecontroller.Register(c.Modules.DeviceController) - synccontroller.Register(c.Modules.SyncController) - cloudstream.Register(c.Modules.CloudStream) - router.Register(c.Modules.Router) - dynamiccontroller.Register(c.Modules.DynamicController) - case *edgeconfig.EdgeCoreConfig: - devicetwin.Register(c.Modules.DeviceTwin, c.Modules.Edged.HostnameOverride) - //edged.Register(c.Modules.Edged) - edgehub.Register(c.Modules.EdgeHub, c.Modules.Edged.HostnameOverride) - eventbus.Register(c.Modules.EventBus, c.Modules.Edged.HostnameOverride) - //edgemesh.Register(c.Modules.EdgeMesh) - metamanager.Register(c.Modules.MetaManager) - servicebus.Register(c.Modules.ServiceBus) - edgestream.Register(c.Modules.EdgeStream, c.Modules.Edged.HostnameOverride, c.Modules.Edged.NodeIP) - test.Register(c.Modules.DBTest) - // Note: Need to put it to the end, and wait for all models to register before executing - dbm.InitDBConfig(c.DataBase.DriverName, c.DataBase.AliasName, c.DataBase.DataSource) - default: - klog.Error("unsupport config type!") - } -} - func DisableAllModules(i interface{}) { switch config := i.(type) { case *cloudconfig.CloudCoreConfig: diff --git a/vendor/k8s.io/client-go/tools/leaderelection/OWNERS b/vendor/k8s.io/client-go/tools/leaderelection/OWNERS deleted file mode 100644 index 9ece5e1ea..000000000 --- a/vendor/k8s.io/client-go/tools/leaderelection/OWNERS +++ /dev/null @@ -1,13 +0,0 @@ -# See the OWNERS docs at https://go.k8s.io/owners - -approvers: -- mikedanese -- timothysc -reviewers: -- wojtek-t -- deads2k -- mikedanese -- gmarek -- timothysc -- ingvagabund -- resouer diff --git a/vendor/k8s.io/client-go/tools/leaderelection/healthzadaptor.go b/vendor/k8s.io/client-go/tools/leaderelection/healthzadaptor.go deleted file mode 100644 index b93537291..000000000 --- a/vendor/k8s.io/client-go/tools/leaderelection/healthzadaptor.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package leaderelection - -import ( - "net/http" - "sync" - "time" -) - -// HealthzAdaptor associates the /healthz endpoint with the LeaderElection object. -// It helps deal with the /healthz endpoint being set up prior to the LeaderElection. -// This contains the code needed to act as an adaptor between the leader -// election code the health check code. It allows us to provide health -// status about the leader election. Most specifically about if the leader -// has failed to renew without exiting the process. In that case we should -// report not healthy and rely on the kubelet to take down the process. -type HealthzAdaptor struct { - pointerLock sync.Mutex - le *LeaderElector - timeout time.Duration -} - -// Name returns the name of the health check we are implementing. -func (l *HealthzAdaptor) Name() string { - return "leaderElection" -} - -// Check is called by the healthz endpoint handler. -// It fails (returns an error) if we own the lease but had not been able to renew it. -func (l *HealthzAdaptor) Check(req *http.Request) error { - l.pointerLock.Lock() - defer l.pointerLock.Unlock() - if l.le == nil { - return nil - } - return l.le.Check(l.timeout) -} - -// SetLeaderElection ties a leader election object to a HealthzAdaptor -func (l *HealthzAdaptor) SetLeaderElection(le *LeaderElector) { - l.pointerLock.Lock() - defer l.pointerLock.Unlock() - l.le = le -} - -// NewLeaderHealthzAdaptor creates a basic healthz adaptor to monitor a leader election. -// timeout determines the time beyond the lease expiry to be allowed for timeout. -// checks within the timeout period after the lease expires will still return healthy. -func NewLeaderHealthzAdaptor(timeout time.Duration) *HealthzAdaptor { - result := &HealthzAdaptor{ - timeout: timeout, - } - return result -} diff --git a/vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go b/vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go deleted file mode 100644 index 483f0cba4..000000000 --- a/vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go +++ /dev/null @@ -1,394 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package leaderelection implements leader election of a set of endpoints. -// It uses an annotation in the endpoints object to store the record of the -// election state. This implementation does not guarantee that only one -// client is acting as a leader (a.k.a. fencing). -// -// A client only acts on timestamps captured locally to infer the state of the -// leader election. The client does not consider timestamps in the leader -// election record to be accurate because these timestamps may not have been -// produced by a local clock. The implemention does not depend on their -// accuracy and only uses their change to indicate that another client has -// renewed the leader lease. Thus the implementation is tolerant to arbitrary -// clock skew, but is not tolerant to arbitrary clock skew rate. -// -// However the level of tolerance to skew rate can be configured by setting -// RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a -// maximum tolerated ratio of time passed on the fastest node to time passed on -// the slowest node can be approximately achieved with a configuration that sets -// the same ratio of LeaseDuration to RenewDeadline. For example if a user wanted -// to tolerate some nodes progressing forward in time twice as fast as other nodes, -// the user could set LeaseDuration to 60 seconds and RenewDeadline to 30 seconds. -// -// While not required, some method of clock synchronization between nodes in the -// cluster is highly recommended. It's important to keep in mind when configuring -// this client that the tolerance to skew rate varies inversely to master -// availability. -// -// Larger clusters often have a more lenient SLA for API latency. This should be -// taken into account when configuring the client. The rate of leader transitions -// should be monitored and RetryPeriod and LeaseDuration should be increased -// until the rate is stable and acceptably low. It's important to keep in mind -// when configuring this client that the tolerance to API latency varies inversely -// to master availability. -// -// DISCLAIMER: this is an alpha API. This library will likely change significantly -// or even be removed entirely in subsequent releases. Depend on this API at -// your own risk. -package leaderelection - -import ( - "bytes" - "context" - "fmt" - "time" - - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - rl "k8s.io/client-go/tools/leaderelection/resourcelock" - - "k8s.io/klog/v2" -) - -const ( - JitterFactor = 1.2 -) - -// NewLeaderElector creates a LeaderElector from a LeaderElectionConfig -func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) { - if lec.LeaseDuration <= lec.RenewDeadline { - return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline") - } - if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) { - return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor") - } - if lec.LeaseDuration < 1 { - return nil, fmt.Errorf("leaseDuration must be greater than zero") - } - if lec.RenewDeadline < 1 { - return nil, fmt.Errorf("renewDeadline must be greater than zero") - } - if lec.RetryPeriod < 1 { - return nil, fmt.Errorf("retryPeriod must be greater than zero") - } - if lec.Callbacks.OnStartedLeading == nil { - return nil, fmt.Errorf("OnStartedLeading callback must not be nil") - } - if lec.Callbacks.OnStoppedLeading == nil { - return nil, fmt.Errorf("OnStoppedLeading callback must not be nil") - } - - if lec.Lock == nil { - return nil, fmt.Errorf("Lock must not be nil.") - } - le := LeaderElector{ - config: lec, - clock: clock.RealClock{}, - metrics: globalMetricsFactory.newLeaderMetrics(), - } - le.metrics.leaderOff(le.config.Name) - return &le, nil -} - -type LeaderElectionConfig struct { - // Lock is the resource that will be used for locking - Lock rl.Interface - - // LeaseDuration is the duration that non-leader candidates will - // wait to force acquire leadership. This is measured against time of - // last observed ack. - // - // A client needs to wait a full LeaseDuration without observing a change to - // the record before it can attempt to take over. When all clients are - // shutdown and a new set of clients are started with different names against - // the same leader record, they must wait the full LeaseDuration before - // attempting to acquire the lease. Thus LeaseDuration should be as short as - // possible (within your tolerance for clock skew rate) to avoid a possible - // long waits in the scenario. - // - // Core clients default this value to 15 seconds. - LeaseDuration time.Duration - // RenewDeadline is the duration that the acting master will retry - // refreshing leadership before giving up. - // - // Core clients default this value to 10 seconds. - RenewDeadline time.Duration - // RetryPeriod is the duration the LeaderElector clients should wait - // between tries of actions. - // - // Core clients default this value to 2 seconds. - RetryPeriod time.Duration - - // Callbacks are callbacks that are triggered during certain lifecycle - // events of the LeaderElector - Callbacks LeaderCallbacks - - // WatchDog is the associated health checker - // WatchDog may be null if its not needed/configured. - WatchDog *HealthzAdaptor - - // ReleaseOnCancel should be set true if the lock should be released - // when the run context is cancelled. If you set this to true, you must - // ensure all code guarded by this lease has successfully completed - // prior to cancelling the context, or you may have two processes - // simultaneously acting on the critical path. - ReleaseOnCancel bool - - // Name is the name of the resource lock for debugging - Name string -} - -// LeaderCallbacks are callbacks that are triggered during certain -// lifecycle events of the LeaderElector. These are invoked asynchronously. -// -// possible future callbacks: -// * OnChallenge() -type LeaderCallbacks struct { - // OnStartedLeading is called when a LeaderElector client starts leading - OnStartedLeading func(context.Context) - // OnStoppedLeading is called when a LeaderElector client stops leading - OnStoppedLeading func() - // OnNewLeader is called when the client observes a leader that is - // not the previously observed leader. This includes the first observed - // leader when the client starts. - OnNewLeader func(identity string) -} - -// LeaderElector is a leader election client. -type LeaderElector struct { - config LeaderElectionConfig - // internal bookkeeping - observedRecord rl.LeaderElectionRecord - observedRawRecord []byte - observedTime time.Time - // used to implement OnNewLeader(), may lag slightly from the - // value observedRecord.HolderIdentity if the transition has - // not yet been reported. - reportedLeader string - - // clock is wrapper around time to allow for less flaky testing - clock clock.Clock - - metrics leaderMetricsAdapter - - // name is the name of the resource lock for debugging - name string -} - -// Run starts the leader election loop -func (le *LeaderElector) Run(ctx context.Context) { - defer runtime.HandleCrash() - defer func() { - le.config.Callbacks.OnStoppedLeading() - }() - - if !le.acquire(ctx) { - return // ctx signalled done - } - ctx, cancel := context.WithCancel(ctx) - defer cancel() - go le.config.Callbacks.OnStartedLeading(ctx) - le.renew(ctx) -} - -// RunOrDie starts a client with the provided config or panics if the config -// fails to validate. -func RunOrDie(ctx context.Context, lec LeaderElectionConfig) { - le, err := NewLeaderElector(lec) - if err != nil { - panic(err) - } - if lec.WatchDog != nil { - lec.WatchDog.SetLeaderElection(le) - } - le.Run(ctx) -} - -// GetLeader returns the identity of the last observed leader or returns the empty string if -// no leader has yet been observed. -func (le *LeaderElector) GetLeader() string { - return le.observedRecord.HolderIdentity -} - -// IsLeader returns true if the last observed leader was this client else returns false. -func (le *LeaderElector) IsLeader() bool { - return le.observedRecord.HolderIdentity == le.config.Lock.Identity() -} - -// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds. -// Returns false if ctx signals done. -func (le *LeaderElector) acquire(ctx context.Context) bool { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - succeeded := false - desc := le.config.Lock.Describe() - klog.Infof("attempting to acquire leader lease %v...", desc) - wait.JitterUntil(func() { - succeeded = le.tryAcquireOrRenew(ctx) - le.maybeReportTransition() - if !succeeded { - klog.V(4).Infof("failed to acquire lease %v", desc) - return - } - le.config.Lock.RecordEvent("became leader") - le.metrics.leaderOn(le.config.Name) - klog.Infof("successfully acquired lease %v", desc) - cancel() - }, le.config.RetryPeriod, JitterFactor, true, ctx.Done()) - return succeeded -} - -// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. -func (le *LeaderElector) renew(ctx context.Context) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - wait.Until(func() { - timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline) - defer timeoutCancel() - err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) { - return le.tryAcquireOrRenew(timeoutCtx), nil - }, timeoutCtx.Done()) - - le.maybeReportTransition() - desc := le.config.Lock.Describe() - if err == nil { - klog.V(5).Infof("successfully renewed lease %v", desc) - return - } - le.config.Lock.RecordEvent("stopped leading") - le.metrics.leaderOff(le.config.Name) - klog.Infof("failed to renew lease %v: %v", desc, err) - cancel() - }, le.config.RetryPeriod, ctx.Done()) - - // if we hold the lease, give it up - if le.config.ReleaseOnCancel { - le.release() - } -} - -// release attempts to release the leader lease if we have acquired it. -func (le *LeaderElector) release() bool { - if !le.IsLeader() { - return true - } - now := metav1.Now() - leaderElectionRecord := rl.LeaderElectionRecord{ - LeaderTransitions: le.observedRecord.LeaderTransitions, - LeaseDurationSeconds: 1, - RenewTime: now, - AcquireTime: now, - } - if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil { - klog.Errorf("Failed to release lock: %v", err) - return false - } - le.observedRecord = leaderElectionRecord - le.observedTime = le.clock.Now() - return true -} - -// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, -// else it tries to renew the lease if it has already been acquired. Returns true -// on success else returns false. -func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { - now := metav1.Now() - leaderElectionRecord := rl.LeaderElectionRecord{ - HolderIdentity: le.config.Lock.Identity(), - LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), - RenewTime: now, - AcquireTime: now, - } - - // 1. obtain or create the ElectionRecord - oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx) - if err != nil { - if !errors.IsNotFound(err) { - klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) - return false - } - if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil { - klog.Errorf("error initially creating leader election record: %v", err) - return false - } - le.observedRecord = leaderElectionRecord - le.observedTime = le.clock.Now() - return true - } - - // 2. Record obtained, check the Identity & Time - if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { - le.observedRecord = *oldLeaderElectionRecord - le.observedRawRecord = oldLeaderElectionRawRecord - le.observedTime = le.clock.Now() - } - if len(oldLeaderElectionRecord.HolderIdentity) > 0 && - le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && - !le.IsLeader() { - klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) - return false - } - - // 3. We're going to try to update. The leaderElectionRecord is set to it's default - // here. Let's correct it before updating. - if le.IsLeader() { - leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime - leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions - } else { - leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 - } - - // update the lock itself - if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil { - klog.Errorf("Failed to update lock: %v", err) - return false - } - - le.observedRecord = leaderElectionRecord - le.observedTime = le.clock.Now() - return true -} - -func (le *LeaderElector) maybeReportTransition() { - if le.observedRecord.HolderIdentity == le.reportedLeader { - return - } - le.reportedLeader = le.observedRecord.HolderIdentity - if le.config.Callbacks.OnNewLeader != nil { - go le.config.Callbacks.OnNewLeader(le.reportedLeader) - } -} - -// Check will determine if the current lease is expired by more than timeout. -func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error { - if !le.IsLeader() { - // Currently not concerned with the case that we are hot standby - return nil - } - // If we are more than timeout seconds after the lease duration that is past the timeout - // on the lease renew. Time to start reporting ourselves as unhealthy. We should have - // died but conditions like deadlock can prevent this. (See #70819) - if le.clock.Since(le.observedTime) > le.config.LeaseDuration+maxTolerableExpiredLease { - return fmt.Errorf("failed election to renew leadership on lease %s", le.config.Name) - } - - return nil -} diff --git a/vendor/k8s.io/client-go/tools/leaderelection/metrics.go b/vendor/k8s.io/client-go/tools/leaderelection/metrics.go deleted file mode 100644 index 65917bf88..000000000 --- a/vendor/k8s.io/client-go/tools/leaderelection/metrics.go +++ /dev/null @@ -1,109 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package leaderelection - -import ( - "sync" -) - -// This file provides abstractions for setting the provider (e.g., prometheus) -// of metrics. - -type leaderMetricsAdapter interface { - leaderOn(name string) - leaderOff(name string) -} - -// GaugeMetric represents a single numerical value that can arbitrarily go up -// and down. -type SwitchMetric interface { - On(name string) - Off(name string) -} - -type noopMetric struct{} - -func (noopMetric) On(name string) {} -func (noopMetric) Off(name string) {} - -// defaultLeaderMetrics expects the caller to lock before setting any metrics. -type defaultLeaderMetrics struct { - // leader's value indicates if the current process is the owner of name lease - leader SwitchMetric -} - -func (m *defaultLeaderMetrics) leaderOn(name string) { - if m == nil { - return - } - m.leader.On(name) -} - -func (m *defaultLeaderMetrics) leaderOff(name string) { - if m == nil { - return - } - m.leader.Off(name) -} - -type noMetrics struct{} - -func (noMetrics) leaderOn(name string) {} -func (noMetrics) leaderOff(name string) {} - -// MetricsProvider generates various metrics used by the leader election. -type MetricsProvider interface { - NewLeaderMetric() SwitchMetric -} - -type noopMetricsProvider struct{} - -func (_ noopMetricsProvider) NewLeaderMetric() SwitchMetric { - return noopMetric{} -} - -var globalMetricsFactory = leaderMetricsFactory{ - metricsProvider: noopMetricsProvider{}, -} - -type leaderMetricsFactory struct { - metricsProvider MetricsProvider - - onlyOnce sync.Once -} - -func (f *leaderMetricsFactory) setProvider(mp MetricsProvider) { - f.onlyOnce.Do(func() { - f.metricsProvider = mp - }) -} - -func (f *leaderMetricsFactory) newLeaderMetrics() leaderMetricsAdapter { - mp := f.metricsProvider - if mp == (noopMetricsProvider{}) { - return noMetrics{} - } - return &defaultLeaderMetrics{ - leader: mp.NewLeaderMetric(), - } -} - -// SetProvider sets the metrics provider for all subsequently created work -// queues. Only the first call has an effect. -func SetProvider(metricsProvider MetricsProvider) { - globalMetricsFactory.setProvider(metricsProvider) -} diff --git a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go deleted file mode 100644 index 8c28026ba..000000000 --- a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go +++ /dev/null @@ -1,121 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resourcelock - -import ( - "context" - "encoding/json" - "errors" - "fmt" - - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" -) - -// TODO: This is almost a exact replica of Endpoints lock. -// going forwards as we self host more and more components -// and use ConfigMaps as the means to pass that configuration -// data we will likely move to deprecate the Endpoints lock. - -type ConfigMapLock struct { - // ConfigMapMeta should contain a Name and a Namespace of a - // ConfigMapMeta object that the LeaderElector will attempt to lead. - ConfigMapMeta metav1.ObjectMeta - Client corev1client.ConfigMapsGetter - LockConfig ResourceLockConfig - cm *v1.ConfigMap -} - -// Get returns the election record from a ConfigMap Annotation -func (cml *ConfigMapLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { - var record LeaderElectionRecord - var err error - cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(ctx, cml.ConfigMapMeta.Name, metav1.GetOptions{}) - if err != nil { - return nil, nil, err - } - if cml.cm.Annotations == nil { - cml.cm.Annotations = make(map[string]string) - } - recordBytes, found := cml.cm.Annotations[LeaderElectionRecordAnnotationKey] - if found { - if err := json.Unmarshal([]byte(recordBytes), &record); err != nil { - return nil, nil, err - } - } - return &record, []byte(recordBytes), nil -} - -// Create attempts to create a LeaderElectionRecord annotation -func (cml *ConfigMapLock) Create(ctx context.Context, ler LeaderElectionRecord) error { - recordBytes, err := json.Marshal(ler) - if err != nil { - return err - } - cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(ctx, &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: cml.ConfigMapMeta.Name, - Namespace: cml.ConfigMapMeta.Namespace, - Annotations: map[string]string{ - LeaderElectionRecordAnnotationKey: string(recordBytes), - }, - }, - }, metav1.CreateOptions{}) - return err -} - -// Update will update an existing annotation on a given resource. -func (cml *ConfigMapLock) Update(ctx context.Context, ler LeaderElectionRecord) error { - if cml.cm == nil { - return errors.New("configmap not initialized, call get or create first") - } - recordBytes, err := json.Marshal(ler) - if err != nil { - return err - } - if cml.cm.Annotations == nil { - cml.cm.Annotations = make(map[string]string) - } - cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes) - cm, err := cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{}) - if err != nil { - return err - } - cml.cm = cm - return nil -} - -// RecordEvent in leader election while adding meta-data -func (cml *ConfigMapLock) RecordEvent(s string) { - if cml.LockConfig.EventRecorder == nil { - return - } - events := fmt.Sprintf("%v %v", cml.LockConfig.Identity, s) - cml.LockConfig.EventRecorder.Eventf(&v1.ConfigMap{ObjectMeta: cml.cm.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) -} - -// Describe is used to convert details on current resource lock -// into a string -func (cml *ConfigMapLock) Describe() string { - return fmt.Sprintf("%v/%v", cml.ConfigMapMeta.Namespace, cml.ConfigMapMeta.Name) -} - -// Identity returns the Identity of the lock -func (cml *ConfigMapLock) Identity() string { - return cml.LockConfig.Identity -} diff --git a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go deleted file mode 100644 index d11e43e33..000000000 --- a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go +++ /dev/null @@ -1,116 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resourcelock - -import ( - "context" - "encoding/json" - "errors" - "fmt" - - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" -) - -type EndpointsLock struct { - // EndpointsMeta should contain a Name and a Namespace of an - // Endpoints object that the LeaderElector will attempt to lead. - EndpointsMeta metav1.ObjectMeta - Client corev1client.EndpointsGetter - LockConfig ResourceLockConfig - e *v1.Endpoints -} - -// Get returns the election record from a Endpoints Annotation -func (el *EndpointsLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { - var record LeaderElectionRecord - var err error - el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(ctx, el.EndpointsMeta.Name, metav1.GetOptions{}) - if err != nil { - return nil, nil, err - } - if el.e.Annotations == nil { - el.e.Annotations = make(map[string]string) - } - recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey] - if found { - if err := json.Unmarshal([]byte(recordBytes), &record); err != nil { - return nil, nil, err - } - } - return &record, []byte(recordBytes), nil -} - -// Create attempts to create a LeaderElectionRecord annotation -func (el *EndpointsLock) Create(ctx context.Context, ler LeaderElectionRecord) error { - recordBytes, err := json.Marshal(ler) - if err != nil { - return err - } - el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(ctx, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: el.EndpointsMeta.Name, - Namespace: el.EndpointsMeta.Namespace, - Annotations: map[string]string{ - LeaderElectionRecordAnnotationKey: string(recordBytes), - }, - }, - }, metav1.CreateOptions{}) - return err -} - -// Update will update and existing annotation on a given resource. -func (el *EndpointsLock) Update(ctx context.Context, ler LeaderElectionRecord) error { - if el.e == nil { - return errors.New("endpoint not initialized, call get or create first") - } - recordBytes, err := json.Marshal(ler) - if err != nil { - return err - } - if el.e.Annotations == nil { - el.e.Annotations = make(map[string]string) - } - el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes) - e, err := el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(ctx, el.e, metav1.UpdateOptions{}) - if err != nil { - return err - } - el.e = e - return nil -} - -// RecordEvent in leader election while adding meta-data -func (el *EndpointsLock) RecordEvent(s string) { - if el.LockConfig.EventRecorder == nil { - return - } - events := fmt.Sprintf("%v %v", el.LockConfig.Identity, s) - el.LockConfig.EventRecorder.Eventf(&v1.Endpoints{ObjectMeta: el.e.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) -} - -// Describe is used to convert details on current resource lock -// into a string -func (el *EndpointsLock) Describe() string { - return fmt.Sprintf("%v/%v", el.EndpointsMeta.Namespace, el.EndpointsMeta.Name) -} - -// Identity returns the Identity of the lock -func (el *EndpointsLock) Identity() string { - return el.LockConfig.Identity -} diff --git a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go deleted file mode 100644 index 74630a31f..000000000 --- a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go +++ /dev/null @@ -1,142 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resourcelock - -import ( - "context" - "fmt" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" -) - -const ( - LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader" - EndpointsResourceLock = "endpoints" - ConfigMapsResourceLock = "configmaps" - LeasesResourceLock = "leases" - EndpointsLeasesResourceLock = "endpointsleases" - ConfigMapsLeasesResourceLock = "configmapsleases" -) - -// LeaderElectionRecord is the record that is stored in the leader election annotation. -// This information should be used for observational purposes only and could be replaced -// with a random string (e.g. UUID) with only slight modification of this code. -// TODO(mikedanese): this should potentially be versioned -type LeaderElectionRecord struct { - // HolderIdentity is the ID that owns the lease. If empty, no one owns this lease and - // all callers may acquire. Versions of this library prior to Kubernetes 1.14 will not - // attempt to acquire leases with empty identities and will wait for the full lease - // interval to expire before attempting to reacquire. This value is set to empty when - // a client voluntarily steps down. - HolderIdentity string `json:"holderIdentity"` - LeaseDurationSeconds int `json:"leaseDurationSeconds"` - AcquireTime metav1.Time `json:"acquireTime"` - RenewTime metav1.Time `json:"renewTime"` - LeaderTransitions int `json:"leaderTransitions"` -} - -// EventRecorder records a change in the ResourceLock. -type EventRecorder interface { - Eventf(obj runtime.Object, eventType, reason, message string, args ...interface{}) -} - -// ResourceLockConfig common data that exists across different -// resource locks -type ResourceLockConfig struct { - // Identity is the unique string identifying a lease holder across - // all participants in an election. - Identity string - // EventRecorder is optional. - EventRecorder EventRecorder -} - -// Interface offers a common interface for locking on arbitrary -// resources used in leader election. The Interface is used -// to hide the details on specific implementations in order to allow -// them to change over time. This interface is strictly for use -// by the leaderelection code. -type Interface interface { - // Get returns the LeaderElectionRecord - Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) - - // Create attempts to create a LeaderElectionRecord - Create(ctx context.Context, ler LeaderElectionRecord) error - - // Update will update and existing LeaderElectionRecord - Update(ctx context.Context, ler LeaderElectionRecord) error - - // RecordEvent is used to record events - RecordEvent(string) - - // Identity will return the locks Identity - Identity() string - - // Describe is used to convert details on current resource lock - // into a string - Describe() string -} - -// Manufacture will create a lock of a given type according to the input parameters -func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) { - endpointsLock := &EndpointsLock{ - EndpointsMeta: metav1.ObjectMeta{ - Namespace: ns, - Name: name, - }, - Client: coreClient, - LockConfig: rlc, - } - configmapLock := &ConfigMapLock{ - ConfigMapMeta: metav1.ObjectMeta{ - Namespace: ns, - Name: name, - }, - Client: coreClient, - LockConfig: rlc, - } - leaseLock := &LeaseLock{ - LeaseMeta: metav1.ObjectMeta{ - Namespace: ns, - Name: name, - }, - Client: coordinationClient, - LockConfig: rlc, - } - switch lockType { - case EndpointsResourceLock: - return endpointsLock, nil - case ConfigMapsResourceLock: - return configmapLock, nil - case LeasesResourceLock: - return leaseLock, nil - case EndpointsLeasesResourceLock: - return &MultiLock{ - Primary: endpointsLock, - Secondary: leaseLock, - }, nil - case ConfigMapsLeasesResourceLock: - return &MultiLock{ - Primary: configmapLock, - Secondary: leaseLock, - }, nil - default: - return nil, fmt.Errorf("Invalid lock-type %s", lockType) - } -} diff --git a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go deleted file mode 100644 index a40349727..000000000 --- a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go +++ /dev/null @@ -1,135 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resourcelock - -import ( - "context" - "encoding/json" - "errors" - "fmt" - - coordinationv1 "k8s.io/api/coordination/v1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" -) - -type LeaseLock struct { - // LeaseMeta should contain a Name and a Namespace of a - // LeaseMeta object that the LeaderElector will attempt to lead. - LeaseMeta metav1.ObjectMeta - Client coordinationv1client.LeasesGetter - LockConfig ResourceLockConfig - lease *coordinationv1.Lease -} - -// Get returns the election record from a Lease spec -func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { - var err error - ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{}) - if err != nil { - return nil, nil, err - } - record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec) - recordByte, err := json.Marshal(*record) - if err != nil { - return nil, nil, err - } - return record, recordByte, nil -} - -// Create attempts to create a Lease -func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error { - var err error - ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{ - ObjectMeta: metav1.ObjectMeta{ - Name: ll.LeaseMeta.Name, - Namespace: ll.LeaseMeta.Namespace, - }, - Spec: LeaderElectionRecordToLeaseSpec(&ler), - }, metav1.CreateOptions{}) - return err -} - -// Update will update an existing Lease spec. -func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error { - if ll.lease == nil { - return errors.New("lease not initialized, call get or create first") - } - ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler) - - lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{}) - if err != nil { - return err - } - - ll.lease = lease - return nil -} - -// RecordEvent in leader election while adding meta-data -func (ll *LeaseLock) RecordEvent(s string) { - if ll.LockConfig.EventRecorder == nil { - return - } - events := fmt.Sprintf("%v %v", ll.LockConfig.Identity, s) - ll.LockConfig.EventRecorder.Eventf(&coordinationv1.Lease{ObjectMeta: ll.lease.ObjectMeta}, corev1.EventTypeNormal, "LeaderElection", events) -} - -// Describe is used to convert details on current resource lock -// into a string -func (ll *LeaseLock) Describe() string { - return fmt.Sprintf("%v/%v", ll.LeaseMeta.Namespace, ll.LeaseMeta.Name) -} - -// Identity returns the Identity of the lock -func (ll *LeaseLock) Identity() string { - return ll.LockConfig.Identity -} - -func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElectionRecord { - var r LeaderElectionRecord - if spec.HolderIdentity != nil { - r.HolderIdentity = *spec.HolderIdentity - } - if spec.LeaseDurationSeconds != nil { - r.LeaseDurationSeconds = int(*spec.LeaseDurationSeconds) - } - if spec.LeaseTransitions != nil { - r.LeaderTransitions = int(*spec.LeaseTransitions) - } - if spec.AcquireTime != nil { - r.AcquireTime = metav1.Time{spec.AcquireTime.Time} - } - if spec.RenewTime != nil { - r.RenewTime = metav1.Time{spec.RenewTime.Time} - } - return &r - -} - -func LeaderElectionRecordToLeaseSpec(ler *LeaderElectionRecord) coordinationv1.LeaseSpec { - leaseDurationSeconds := int32(ler.LeaseDurationSeconds) - leaseTransitions := int32(ler.LeaderTransitions) - return coordinationv1.LeaseSpec{ - HolderIdentity: &ler.HolderIdentity, - LeaseDurationSeconds: &leaseDurationSeconds, - AcquireTime: &metav1.MicroTime{ler.AcquireTime.Time}, - RenewTime: &metav1.MicroTime{ler.RenewTime.Time}, - LeaseTransitions: &leaseTransitions, - } -} diff --git a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/multilock.go b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/multilock.go deleted file mode 100644 index 5ee1dcbb5..000000000 --- a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/multilock.go +++ /dev/null @@ -1,104 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resourcelock - -import ( - "bytes" - "context" - "encoding/json" - - apierrors "k8s.io/apimachinery/pkg/api/errors" -) - -const ( - UnknownLeader = "leaderelection.k8s.io/unknown" -) - -// MultiLock is used for lock's migration -type MultiLock struct { - Primary Interface - Secondary Interface -} - -// Get returns the older election record of the lock -func (ml *MultiLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { - primary, primaryRaw, err := ml.Primary.Get(ctx) - if err != nil { - return nil, nil, err - } - - secondary, secondaryRaw, err := ml.Secondary.Get(ctx) - if err != nil { - // Lock is held by old client - if apierrors.IsNotFound(err) && primary.HolderIdentity != ml.Identity() { - return primary, primaryRaw, nil - } - return nil, nil, err - } - - if primary.HolderIdentity != secondary.HolderIdentity { - primary.HolderIdentity = UnknownLeader - primaryRaw, err = json.Marshal(primary) - if err != nil { - return nil, nil, err - } - } - return primary, ConcatRawRecord(primaryRaw, secondaryRaw), nil -} - -// Create attempts to create both primary lock and secondary lock -func (ml *MultiLock) Create(ctx context.Context, ler LeaderElectionRecord) error { - err := ml.Primary.Create(ctx, ler) - if err != nil && !apierrors.IsAlreadyExists(err) { - return err - } - return ml.Secondary.Create(ctx, ler) -} - -// Update will update and existing annotation on both two resources. -func (ml *MultiLock) Update(ctx context.Context, ler LeaderElectionRecord) error { - err := ml.Primary.Update(ctx, ler) - if err != nil { - return err - } - _, _, err = ml.Secondary.Get(ctx) - if err != nil && apierrors.IsNotFound(err) { - return ml.Secondary.Create(ctx, ler) - } - return ml.Secondary.Update(ctx, ler) -} - -// RecordEvent in leader election while adding meta-data -func (ml *MultiLock) RecordEvent(s string) { - ml.Primary.RecordEvent(s) - ml.Secondary.RecordEvent(s) -} - -// Describe is used to convert details on current resource lock -// into a string -func (ml *MultiLock) Describe() string { - return ml.Primary.Describe() -} - -// Identity returns the Identity of the lock -func (ml *MultiLock) Identity() string { - return ml.Primary.Identity() -} - -func ConcatRawRecord(primaryRaw, secondaryRaw []byte) []byte { - return bytes.Join([][]byte{primaryRaw, secondaryRaw}, []byte(",")) -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 8b9a7c731..46693aaa1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1293,8 +1293,6 @@ k8s.io/client-go/tools/clientcmd/api k8s.io/client-go/tools/clientcmd/api/latest k8s.io/client-go/tools/clientcmd/api/v1 k8s.io/client-go/tools/events -k8s.io/client-go/tools/leaderelection -k8s.io/client-go/tools/leaderelection/resourcelock k8s.io/client-go/tools/metrics k8s.io/client-go/tools/pager k8s.io/client-go/tools/record |
