summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cloud/cmd/cloudcore/app/options/options.go5
-rw-r--r--cloud/cmd/cloudcore/app/server.go138
-rw-r--r--cloud/pkg/cloudhub/config/config.go2
-rw-r--r--cloud/pkg/cloudhub/handler/messagehandler.go12
-rw-r--r--cloud/pkg/cloudhub/servers/httpserver/secretsutil.go15
-rw-r--r--cloud/pkg/cloudhub/servers/httpserver/server.go33
-rw-r--r--cloud/pkg/common/modules/modules.go5
-rw-r--r--cloud/pkg/edgecontroller/controller/upstream.go5
-rw-r--r--cloud/pkg/edgecontroller/edgecontroller.go8
-rw-r--r--cloud/pkg/leaderelection/leaderelection.go227
-rw-r--r--cloud/pkg/leaderelection/readyzadaptor.go59
-rw-r--r--pkg/apis/componentconfig/cloudcore/v1alpha1/default.go17
-rw-r--r--pkg/apis/componentconfig/cloudcore/v1alpha1/types.go12
-rw-r--r--pkg/apis/componentconfig/cloudcore/v1alpha1/validation/validation.go1
-rw-r--r--tests/integration/framework/util.go43
-rw-r--r--vendor/k8s.io/client-go/tools/leaderelection/OWNERS13
-rw-r--r--vendor/k8s.io/client-go/tools/leaderelection/healthzadaptor.go69
-rw-r--r--vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go394
-rw-r--r--vendor/k8s.io/client-go/tools/leaderelection/metrics.go109
-rw-r--r--vendor/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go121
-rw-r--r--vendor/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go116
-rw-r--r--vendor/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go142
-rw-r--r--vendor/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go135
-rw-r--r--vendor/k8s.io/client-go/tools/leaderelection/resourcelock/multilock.go104
-rw-r--r--vendor/modules.txt2
25 files changed, 182 insertions, 1605 deletions
diff --git a/cloud/cmd/cloudcore/app/options/options.go b/cloud/cmd/cloudcore/app/options/options.go
index b23366dfb..b36900d64 100644
--- a/cloud/cmd/cloudcore/app/options/options.go
+++ b/cloud/cmd/cloudcore/app/options/options.go
@@ -32,6 +32,11 @@ type CloudCoreOptions struct {
ConfigFile string
}
+type TunnelPortRecord struct {
+ IPTunnelPort map[string]int `json:"ipTunnelPort"`
+ Port map[int]bool `json:"port"`
+}
+
func NewCloudCoreOptions() *CloudCoreOptions {
return &CloudCoreOptions{
ConfigFile: path.Join(constants.DefaultConfigDir, "cloudcore.yaml"),
diff --git a/cloud/cmd/cloudcore/app/server.go b/cloud/cmd/cloudcore/app/server.go
index 507c69a45..6009ffa47 100644
--- a/cloud/cmd/cloudcore/app/server.go
+++ b/cloud/cmd/cloudcore/app/server.go
@@ -1,10 +1,18 @@
package app
import (
+ "context"
+ "encoding/json"
+ "errors"
"fmt"
+ "math/rand"
+ "os"
"time"
"github.com/spf13/cobra"
+ v1 "k8s.io/api/core/v1"
+ apierror "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/cli/globalflag"
"k8s.io/component-base/term"
@@ -14,16 +22,17 @@ import (
beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
"github.com/kubeedge/kubeedge/cloud/cmd/cloudcore/app/options"
"github.com/kubeedge/kubeedge/cloud/pkg/cloudhub"
- hubconfig "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/config"
+ "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/servers/httpserver"
"github.com/kubeedge/kubeedge/cloud/pkg/cloudstream"
"github.com/kubeedge/kubeedge/cloud/pkg/common/client"
"github.com/kubeedge/kubeedge/cloud/pkg/common/informers"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
"github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller"
"github.com/kubeedge/kubeedge/cloud/pkg/dynamiccontroller"
"github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller"
- kele "github.com/kubeedge/kubeedge/cloud/pkg/leaderelection"
"github.com/kubeedge/kubeedge/cloud/pkg/router"
"github.com/kubeedge/kubeedge/cloud/pkg/synccontroller"
+ "github.com/kubeedge/kubeedge/common/constants"
"github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1"
"github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1/validation"
"github.com/kubeedge/kubeedge/pkg/util"
@@ -63,17 +72,21 @@ kubernetes controller which manages devices so that the device metadata/status d
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
client.InitKubeEdgeClient(config.KubeAPIConfig)
- gis := informers.GetInformersManager()
- registerModules(config)
- // If leader election is enabled, runCommand via LeaderElector until done and exit.
- if config.LeaderElection.LeaderElect {
- electionChecker := kele.NewLeaderReadyzAdaptor(time.Second * 20)
- hubconfig.Config.Checker = electionChecker
- kele.Run(config, electionChecker)
- return
+ // Negotiate TunnelPort for multi cloudcore instances
+ waitTime := rand.Int31n(10)
+ time.Sleep(time.Duration(waitTime) * time.Second)
+ tunnelport, err := NegotiateTunnelPort()
+ if err != nil {
+ panic(err)
}
+ config.CommonConfig.TunnelPort = *tunnelport
+
+ gis := informers.GetInformersManager()
+
+ registerModules(config)
+
// Start all modules if disable leader election
core.StartModules()
gis.Start(beehiveContext.Done())
@@ -107,10 +120,113 @@ kubernetes controller which manages devices so that the device metadata/status d
// registerModules register all the modules started in cloudcore
func registerModules(c *v1alpha1.CloudCoreConfig) {
cloudhub.Register(c.Modules.CloudHub)
- edgecontroller.Register(c.Modules.EdgeController)
+ edgecontroller.Register(c.Modules.EdgeController, c.CommonConfig)
devicecontroller.Register(c.Modules.DeviceController)
synccontroller.Register(c.Modules.SyncController)
cloudstream.Register(c.Modules.CloudStream)
router.Register(c.Modules.Router)
dynamiccontroller.Register(c.Modules.DynamicController)
}
+
+func NegotiateTunnelPort() (*int, error) {
+ kubeClient := client.GetKubeClient()
+ err := httpserver.CreateNamespaceIfNeeded(kubeClient, modules.NamespaceSystem)
+ if err != nil {
+ return nil, errors.New("failed to create system namespace")
+ }
+
+ tunnelPort, err := kubeClient.CoreV1().ConfigMaps(modules.NamespaceSystem).Get(context.TODO(), modules.TunnelPort, metav1.GetOptions{})
+
+ if err != nil && !apierror.IsNotFound(err) {
+ return nil, err
+ }
+
+ localIP := getLocalIP()
+
+ var record options.TunnelPortRecord
+ if err == nil {
+ recordStr, found := tunnelPort.Annotations[modules.TunnelPortRecordAnnotationKey]
+ recordBytes := []byte(recordStr)
+ if !found {
+ return nil, errors.New("failed to get tunnel port record")
+ }
+
+ if err := json.Unmarshal(recordBytes, &record); err != nil {
+ return nil, err
+ }
+
+ _, found = record.IPTunnelPort[localIP]
+ if !found {
+ port := negotiatePort(record.Port)
+
+ record.IPTunnelPort[localIP] = port
+ record.Port[port] = true
+
+ recordBytes, err := json.Marshal(record)
+ if err != nil {
+ return nil, err
+ }
+
+ tunnelPort.Annotations[modules.TunnelPortRecordAnnotationKey] = string(recordBytes)
+
+ _, err = kubeClient.CoreV1().ConfigMaps(modules.NamespaceSystem).Update(context.TODO(), tunnelPort, metav1.UpdateOptions{})
+ if err != nil {
+ return nil, err
+ }
+
+ return &port, nil
+ }
+ }
+
+ if apierror.IsNotFound(err) {
+ record := options.TunnelPortRecord{
+ IPTunnelPort: map[string]int{
+ localIP: constants.ServerPort,
+ },
+ Port: map[int]bool{
+ constants.ServerPort: true,
+ },
+ }
+ recordBytes, err := json.Marshal(record)
+ if err != nil {
+ return nil, err
+ }
+
+ _, err = kubeClient.CoreV1().ConfigMaps(modules.NamespaceSystem).Create(context.TODO(), &v1.ConfigMap{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: modules.TunnelPort,
+ Namespace: modules.NamespaceSystem,
+ Annotations: map[string]string{
+ modules.TunnelPortRecordAnnotationKey: string(recordBytes),
+ },
+ },
+ }, metav1.CreateOptions{})
+
+ if err != nil {
+ return nil, err
+ }
+
+ port := constants.ServerPort
+ return &port, nil
+ }
+
+ return nil, nil
+}
+
+func negotiatePort(portRecord map[int]bool) int {
+ for port := constants.ServerPort; ; {
+ if _, found := portRecord[port]; !found {
+ return port
+ }
+ port++
+ }
+}
+
+func getLocalIP() string {
+ hostnameOverride, err := os.Hostname()
+ if err != nil {
+ hostnameOverride = constants.DefaultHostnameOverride
+ }
+ localIP, _ := util.GetLocalIP(hostnameOverride)
+ return localIP
+}
diff --git a/cloud/pkg/cloudhub/config/config.go b/cloud/pkg/cloudhub/config/config.go
index 3e45cf01d..714f701d0 100644
--- a/cloud/pkg/cloudhub/config/config.go
+++ b/cloud/pkg/cloudhub/config/config.go
@@ -11,7 +11,6 @@ import (
"github.com/kubeedge/kubeedge/cloud/pkg/client/clientset/versioned"
syncinformer "github.com/kubeedge/kubeedge/cloud/pkg/client/informers/externalversions/reliablesyncs/v1alpha1"
synclister "github.com/kubeedge/kubeedge/cloud/pkg/client/listers/reliablesyncs/v1alpha1"
- kele "github.com/kubeedge/kubeedge/cloud/pkg/leaderelection"
"github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1"
)
@@ -25,7 +24,6 @@ type Configure struct {
CaKey []byte
Cert []byte
Key []byte
- Checker *kele.ReadyzAdaptor
}
func InitConfigure(hub *v1alpha1.CloudHub) {
diff --git a/cloud/pkg/cloudhub/handler/messagehandler.go b/cloud/pkg/cloudhub/handler/messagehandler.go
index 9f450d986..e76f045ac 100644
--- a/cloud/pkg/cloudhub/handler/messagehandler.go
+++ b/cloud/pkg/cloudhub/handler/messagehandler.go
@@ -446,10 +446,20 @@ func (mh *MessageHandle) MessageWriteLoop(info *model.HubInfo, stopServe chan Ex
copyMsg := deepcopy(msg)
trimMessage(copyMsg)
+ // initialize timer and retry count for sending message
+ var (
+ retry = 0
+ retryInterval time.Duration = 2
+ )
+
for {
conn, ok := mh.nodeConns.Load(info.NodeID)
if !ok {
- time.Sleep(time.Second * 2)
+ if retry == 1 {
+ break
+ }
+ retry++
+ time.Sleep(time.Second * retryInterval)
continue
}
err := mh.sendMsg(conn.(hubio.CloudHubIO), info, copyMsg, msg, nodeStore)
diff --git a/cloud/pkg/cloudhub/servers/httpserver/secretsutil.go b/cloud/pkg/cloudhub/servers/httpserver/secretsutil.go
index 7b2e56e6a..cdca41411 100644
--- a/cloud/pkg/cloudhub/servers/httpserver/secretsutil.go
+++ b/cloud/pkg/cloudhub/servers/httpserver/secretsutil.go
@@ -10,11 +10,10 @@ import (
"k8s.io/client-go/kubernetes"
"github.com/kubeedge/kubeedge/cloud/pkg/common/client"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
)
const (
- NamespaceSystem string = "kubeedge"
-
TokenSecretName string = "tokensecret"
TokenDataName string = "tokendata"
CaSecretName string = "casecret"
@@ -53,7 +52,7 @@ func CreateTokenSecret(caHashAndToken []byte) error {
TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Name: TokenSecretName,
- Namespace: NamespaceSystem,
+ Namespace: modules.NamespaceSystem,
},
Data: map[string][]byte{
TokenDataName: caHashAndToken,
@@ -61,7 +60,7 @@ func CreateTokenSecret(caHashAndToken []byte) error {
StringData: map[string]string{},
Type: "Opaque",
}
- return CreateSecret(token, NamespaceSystem)
+ return CreateSecret(token, modules.NamespaceSystem)
}
func CreateCaSecret(certDER, key []byte) error {
@@ -69,7 +68,7 @@ func CreateCaSecret(certDER, key []byte) error {
TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Name: CaSecretName,
- Namespace: NamespaceSystem,
+ Namespace: modules.NamespaceSystem,
},
Data: map[string][]byte{
CaDataName: certDER,
@@ -78,7 +77,7 @@ func CreateCaSecret(certDER, key []byte) error {
StringData: map[string]string{},
Type: "Opaque",
}
- return CreateSecret(caSecret, NamespaceSystem)
+ return CreateSecret(caSecret, modules.NamespaceSystem)
}
func CreateCloudCoreSecret(certDER, key []byte) error {
@@ -86,7 +85,7 @@ func CreateCloudCoreSecret(certDER, key []byte) error {
TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Name: CloudCoreSecretName,
- Namespace: NamespaceSystem,
+ Namespace: modules.NamespaceSystem,
},
Data: map[string][]byte{
CloudCoreCertName: certDER,
@@ -95,7 +94,7 @@ func CreateCloudCoreSecret(certDER, key []byte) error {
StringData: map[string]string{},
Type: "Opaque",
}
- return CreateSecret(cloudCoreCert, NamespaceSystem)
+ return CreateSecret(cloudCoreCert, modules.NamespaceSystem)
}
func CreateNamespaceIfNeeded(cli kubernetes.Interface, ns string) error {
diff --git a/cloud/pkg/cloudhub/servers/httpserver/server.go b/cloud/pkg/cloudhub/servers/httpserver/server.go
index 832c3efdb..88b97a9d1 100644
--- a/cloud/pkg/cloudhub/servers/httpserver/server.go
+++ b/cloud/pkg/cloudhub/servers/httpserver/server.go
@@ -34,6 +34,7 @@ import (
"k8s.io/klog/v2"
hubconfig "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/config"
+ "github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
"github.com/kubeedge/kubeedge/common/constants"
)
@@ -42,7 +43,6 @@ func StartHTTPServer() {
router := mux.NewRouter()
router.HandleFunc(constants.DefaultCertURL, edgeCoreClientCert).Methods("GET")
router.HandleFunc(constants.DefaultCAURL, getCA).Methods("GET")
- router.HandleFunc(constants.DefaultCloudCoreReadyCheckURL, electionHandler).Methods("GET")
addr := fmt.Sprintf("%s:%d", hubconfig.Config.HTTPS.Address, hubconfig.Config.HTTPS.Port)
@@ -71,29 +71,6 @@ func getCA(w http.ResponseWriter, r *http.Request) {
}
}
-//electionHandler returns the status whether the cloudcore is ready
-func electionHandler(w http.ResponseWriter, r *http.Request) {
- checker := hubconfig.Config.Checker
- if checker == nil {
- w.WriteHeader(http.StatusOK)
- if _, err := w.Write([]byte("Cloudcore is ready with no leaderelection")); err != nil {
- klog.Errorf("failed to write http response, err: %v", err)
- }
- return
- }
- if checker.Check(r) != nil {
- w.WriteHeader(http.StatusNotFound)
- if _, err := w.Write([]byte("Cloudcore is not ready")); err != nil {
- klog.Errorf("failed to write http response, err: %v", err)
- }
- } else {
- w.WriteHeader(http.StatusOK)
- if _, err := w.Write([]byte("Cloudcore is ready")); err != nil {
- klog.Errorf("failed to write http response, err: %v", err)
- }
- }
-}
-
// EncodeCertPEM returns PEM-encoded certificate data
func EncodeCertPEM(cert *x509.Certificate) []byte {
block := pem.Block{
@@ -244,7 +221,7 @@ func signCerts(subInfo pkix.Name, pbKey crypto.PublicKey) ([]byte, error) {
// CheckCaExistsFromSecret checks ca from secret
func CheckCaExistsFromSecret() bool {
- if _, err := GetSecret(CaSecretName, NamespaceSystem); err != nil {
+ if _, err := GetSecret(CaSecretName, modules.NamespaceSystem); err != nil {
return false
}
return true
@@ -252,7 +229,7 @@ func CheckCaExistsFromSecret() bool {
// CheckCertExistsFromSecret checks CloudCore certificate from secret
func CheckCertExistsFromSecret() bool {
- if _, err := GetSecret(CloudCoreSecretName, NamespaceSystem); err != nil {
+ if _, err := GetSecret(CloudCoreSecretName, modules.NamespaceSystem); err != nil {
return false
}
return true
@@ -288,7 +265,7 @@ func PrepareAllCerts() error {
UpdateConfig(caDER, caKeyDER, nil, nil)
} else {
- s, err := GetSecret(CaSecretName, NamespaceSystem)
+ s, err := GetSecret(CaSecretName, modules.NamespaceSystem)
if err != nil {
klog.Errorf("failed to get CaSecret, error: %v", err)
return err
@@ -327,7 +304,7 @@ func PrepareAllCerts() error {
UpdateConfig(nil, nil, certDER, keyDER)
} else {
- s, err := GetSecret(CloudCoreSecretName, NamespaceSystem)
+ s, err := GetSecret(CloudCoreSecretName, modules.NamespaceSystem)
if err != nil {
klog.Errorf("failed to get CloudCore secret, error: %v", err)
return err
diff --git a/cloud/pkg/common/modules/modules.go b/cloud/pkg/common/modules/modules.go
index be45bfc4e..a59ffec38 100644
--- a/cloud/pkg/common/modules/modules.go
+++ b/cloud/pkg/common/modules/modules.go
@@ -23,4 +23,9 @@ const (
RouterGroupName = "router"
UserGroup = "user"
+
+ NamespaceSystem string = "kubeedge"
+ TunnelPort string = "tunnelport"
+
+ TunnelPortRecordAnnotationKey string = "tunnelportrecord.kubeedge.io"
)
diff --git a/cloud/pkg/edgecontroller/controller/upstream.go b/cloud/pkg/edgecontroller/controller/upstream.go
index edeec1c2b..93e56a64f 100644
--- a/cloud/pkg/edgecontroller/controller/upstream.go
+++ b/cloud/pkg/edgecontroller/controller/upstream.go
@@ -88,6 +88,7 @@ type UpstreamController struct {
kubeClient kubernetes.Interface
messageLayer messagelayer.MessageLayer
crdClient crdClientset.Interface
+ TunnelPort int
// message channel
nodeStatusChan chan model.Message
@@ -402,6 +403,7 @@ func (uc *UpstreamController) updatePodStatus() {
// createNode create new edge node to kubernetes
func (uc *UpstreamController) createNode(name string, node *v1.Node) (*v1.Node, error) {
node.Name = name
+ node.Status.DaemonEndpoints.KubeletEndpoint.Port = int32(uc.TunnelPort)
return uc.kubeClient.CoreV1().Nodes().Create(context.Background(), node, metaV1.CreateOptions{})
}
@@ -524,6 +526,9 @@ func (uc *UpstreamController) updateNodeStatus() {
nodeStatusRequest.Status.VolumesAttached = getNode.Status.VolumesAttached
getNode.Status = nodeStatusRequest.Status
+
+ getNode.Status.DaemonEndpoints.KubeletEndpoint.Port = int32(uc.TunnelPort)
+
node, err := uc.kubeClient.CoreV1().Nodes().UpdateStatus(context.Background(), getNode, metaV1.UpdateOptions{})
if err != nil {
klog.Warningf("message: %s process failure, update node failed with error: %s, namespace: %s, name: %s", msg.GetID(), err, getNode.Namespace, getNode.Name)
diff --git a/cloud/pkg/edgecontroller/edgecontroller.go b/cloud/pkg/edgecontroller/edgecontroller.go
index 6f83d46c0..3f6923b67 100644
--- a/cloud/pkg/edgecontroller/edgecontroller.go
+++ b/cloud/pkg/edgecontroller/edgecontroller.go
@@ -18,7 +18,7 @@ type EdgeController struct {
downstream *controller.DownstreamController
}
-func newEdgeController(enable bool) *EdgeController {
+func newEdgeController(enable bool, tunnelPort int) *EdgeController {
ec := &EdgeController{enable: enable}
if !enable {
return ec
@@ -28,6 +28,8 @@ func newEdgeController(enable bool) *EdgeController {
if err != nil {
klog.Fatalf("new upstream controller failed with error: %s", err)
}
+ ec.upstream.TunnelPort = tunnelPort
+
ec.downstream, err = controller.NewDownstreamController(informers.GetInformersManager().GetK8sInformerFactory(), informers.GetInformersManager(), informers.GetInformersManager().GetCRDInformerFactory())
if err != nil {
klog.Fatalf("new downstream controller failed with error: %s", err)
@@ -35,10 +37,10 @@ func newEdgeController(enable bool) *EdgeController {
return ec
}
-func Register(ec *v1alpha1.EdgeController) {
+func Register(ec *v1alpha1.EdgeController, commonConfig *v1alpha1.CommonConfig) {
// TODO move module config into EdgeController struct @kadisi
config.InitConfigure(ec)
- core.Register(newEdgeController(ec.Enable))
+ core.Register(newEdgeController(ec.Enable, commonConfig.TunnelPort))
}
// Name of controller
diff --git a/cloud/pkg/leaderelection/leaderelection.go b/cloud/pkg/leaderelection/leaderelection.go
deleted file mode 100644
index c3608ffb1..000000000
--- a/cloud/pkg/leaderelection/leaderelection.go
+++ /dev/null
@@ -1,227 +0,0 @@
-package leaderelection
-
-import (
- "context"
- "encoding/json"
- "fmt"
- "os"
- "syscall"
-
- corev1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/strategicpatch"
- "k8s.io/apimachinery/pkg/util/uuid"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/scheme"
- typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
- "k8s.io/client-go/tools/leaderelection"
- "k8s.io/client-go/tools/leaderelection/resourcelock"
- "k8s.io/client-go/tools/record"
- componentbaseconfig "k8s.io/component-base/config"
- "k8s.io/klog/v2"
- podutil "k8s.io/kubernetes/pkg/api/v1/pod"
-
- "github.com/kubeedge/beehive/pkg/core"
- beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/client"
- "github.com/kubeedge/kubeedge/cloud/pkg/common/informers"
- config "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1"
-)
-
-func Run(cfg *config.CloudCoreConfig, readyzAdaptor *ReadyzAdaptor) {
- // To help debugging, immediately log config for LeaderElection
- klog.Infof("Config for LeaderElection : %v", *cfg.LeaderElection)
- // Init Context for leaderElection
- beehiveContext.InitContext(beehiveContext.MsgCtxTypeChannel)
- // Init podReadinessGate to false at the begin of Run
- if err := TryToPatchPodReadinessGate(corev1.ConditionFalse); err != nil {
- klog.Errorf("Error init pod readinessGate: %v", err)
- }
-
- cli := client.GetKubeClient()
- if err := CreateNamespaceIfNeeded(cli, "kubeedge"); err != nil {
- klog.Warningf("Create Namespace kubeedge failed with error: %s", err)
- return
- }
-
- coreBroadcaster := record.NewBroadcaster()
- coreBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: cli.CoreV1().Events("")})
- coreRecorder := coreBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "CloudCore"})
-
- leaderElectionConfig, err := makeLeaderElectionConfig(*cfg.LeaderElection, cli, coreRecorder)
- if err != nil {
- klog.Errorf("couldn't create leaderElectorConfig: %v", err)
- return
- }
-
- leaderElectionConfig.Callbacks = leaderelection.LeaderCallbacks{
- OnStartedLeading: func(ctx context.Context) {
- // Start all modules,
- core.StartModules()
- informers.GetInformersManager().Start(beehiveContext.Done())
-
- // Patch PodReadinessGate if program run in pod
- if err := TryToPatchPodReadinessGate(corev1.ConditionTrue); err != nil {
- // Terminate the program gracefully
- klog.Errorf("Error patching pod readinessGate: %v", err)
- if err := TriggerGracefulShutdown(); err != nil {
- klog.Fatalf("failed to gracefully terminate program: %v", err)
- }
- }
- },
- OnStoppedLeading: func() {
- klog.Errorf("leaderelection lost, gracefully terminate program")
-
- // Reset PodReadinessGate to false if cloudcore stop
- if err := TryToPatchPodReadinessGate(corev1.ConditionFalse); err != nil {
- klog.Errorf("Error reset pod readinessGate: %v", err)
- }
-
- // Trigger core.GracefulShutdown()
- if err := TriggerGracefulShutdown(); err != nil {
- klog.Fatalf("failed to gracefully terminate program: %v", err)
- }
- },
- }
-
- leaderElector, err := leaderelection.NewLeaderElector(*leaderElectionConfig)
- if err != nil {
- klog.Errorf("couldn't create leader elector: %v", err)
- return
- }
- readyzAdaptor.SetLeaderElection(leaderElector)
-
- // Start leaderElection until becoming leader, terminate program if leader lost or context.cancel
- go leaderElector.Run(beehiveContext.GetContext())
-
- // Monitor system signal and shutdown gracefully and it should be in main gorutine
- core.GracefulShutdown()
-}
-
-// makeLeaderElectionConfig builds a leader election configuration. It will
-// create a new resource lock associated with the configuration.
-func makeLeaderElectionConfig(config componentbaseconfig.LeaderElectionConfiguration, client clientset.Interface, recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) {
- hostname, err := os.Hostname()
- if err != nil {
- return nil, fmt.Errorf("unable to get hostname: %v", err)
- }
- // add a uniquifier so that two processes on the same host don't accidentally both become active
- id := hostname + "_" + string(uuid.NewUUID())
-
- rl, err := resourcelock.New(config.ResourceLock,
- config.ResourceNamespace,
- config.ResourceName,
- client.CoreV1(),
- client.CoordinationV1(),
- resourcelock.ResourceLockConfig{
- Identity: id,
- EventRecorder: recorder,
- })
- if err != nil {
- return nil, fmt.Errorf("couldn't create resource lock: %v", err)
- }
-
- return &leaderelection.LeaderElectionConfig{
- Lock: rl,
- LeaseDuration: config.LeaseDuration.Duration,
- RenewDeadline: config.RenewDeadline.Duration,
- RetryPeriod: config.RetryPeriod.Duration,
- WatchDog: nil,
- Name: "cloudcore",
- }, nil
-}
-
-// Try to patch PodReadinessGate if program runs in pod
-func TryToPatchPodReadinessGate(status corev1.ConditionStatus) error {
- podname, isInPod := os.LookupEnv("CLOUDCORE_POD_NAME")
- if !isInPod {
- klog.Infoln("CloudCore is not running in pod")
- return nil
- }
-
- namespace := os.Getenv("CLOUDCORE_POD_NAMESPACE")
- klog.Infof("CloudCore is running in pod %s/%s, try to patch PodReadinessGate", namespace, podname)
- client := client.GetKubeClient()
-
- //Creat patchBytes
- getPod, err := client.CoreV1().Pods(namespace).Get(context.Background(), podname, metav1.GetOptions{})
- if err != nil {
- return fmt.Errorf("failed to get pod(%s/%s): %v", namespace, podname, err)
- }
- originalJSON, err := json.Marshal(getPod)
- if err != nil {
- return fmt.Errorf("failed to marshal original pod %q into JSON: %v", podname, err)
- }
-
- //Todo: Read PodReadinessGate from CloudCore configuration or env
- condition := corev1.PodCondition{Type: "kubeedge.io/CloudCoreIsLeader", Status: status}
- podutil.UpdatePodCondition(&getPod.Status, &condition)
- newJSON, err := json.Marshal(getPod)
- if err != nil {
- return fmt.Errorf("failed to marshal modified pod %q into JSON: %v", podname, err)
- }
- patchBytes, err := strategicpatch.CreateTwoWayMergePatch(originalJSON, newJSON, corev1.Pod{})
- if err != nil {
- return fmt.Errorf("failed to create two way merge patch: %v", err)
- }
-
- var maxRetries = 3
- for i := 1; i <= maxRetries; i++ {
- _, err = client.CoreV1().Pods(namespace).Patch(context.Background(), podname, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
- if err == nil {
- klog.Infof("Successfully patching podReadinessGate: kubeedge.io/CloudCoreIsLeader to pod %q through apiserver", podname)
- return nil
- }
- if !errors.IsConflict(err) {
- return err
- }
-
- // If the patch failure is due to update conflict, the necessary retransmission is performed
- if i >= maxRetries {
- klog.Errorf("updateMaxRetries(%d) has reached, failed to patching podReadinessGate: kubeedge.io/CloudCoreIsLeader because of update conflict", maxRetries)
- }
- continue
- }
-
- return err
-}
-
-// TriggerGracefulShutdown triggers core.GracefulShutdown()
-func TriggerGracefulShutdown() error {
- if beehiveContext.GetContext().Err() != nil {
- klog.Infoln("Program is in gracefully shutdown")
- return nil
- }
-
- klog.Infoln("Trigger graceful shutdown!")
- p, err := os.FindProcess(syscall.Getpid())
- if err != nil {
- return fmt.Errorf("Failed to find self process: %v", err)
- }
-
- if err := p.Signal(os.Interrupt); err != nil {
- return fmt.Errorf("Failed to trigger graceful shutdown: %v", err)
- }
- return nil
-}
-
-func CreateNamespaceIfNeeded(cli clientset.Interface, ns string) error {
- c := cli.CoreV1()
- if _, err := c.Namespaces().Get(context.Background(), ns, metav1.GetOptions{}); err == nil {
- // the namespace already exists
- return nil
- }
- newNs := &corev1.Namespace{
- ObjectMeta: metav1.ObjectMeta{
- Name: ns,
- Namespace: "",
- },
- }
- _, err := c.Namespaces().Create(context.Background(), newNs, metav1.CreateOptions{})
- if err != nil && errors.IsAlreadyExists(err) {
- err = nil
- }
- return err
-}
diff --git a/cloud/pkg/leaderelection/readyzadaptor.go b/cloud/pkg/leaderelection/readyzadaptor.go
deleted file mode 100644
index 6501817c3..000000000
--- a/cloud/pkg/leaderelection/readyzadaptor.go
+++ /dev/null
@@ -1,59 +0,0 @@
-package leaderelection
-
-import (
- "fmt"
- "net/http"
- "sync"
- "time"
-
- "k8s.io/client-go/tools/leaderelection"
-)
-
-// ReadyzAdaptor associates the /readyz endpoint with the LeaderElection object.
-// It helps deal with the /readyz endpoint being set up prior to the LeaderElection.
-// This contains the code needed to act as an adaptor between the leader
-// election code the health check code. It allows us to provide readyz
-// status about the leader election. Most specifically about if the leader
-// has failed to renew without exiting the process. In that case we should
-// report not healthy and rely on the kubelet to take down the process.
-type ReadyzAdaptor struct {
- pointerLock sync.Mutex
- le *leaderelection.LeaderElector
- timeout time.Duration
-}
-
-// Name returns the name of the health check we are implementing.
-func (l *ReadyzAdaptor) Name() string {
- return "leaderElection"
-}
-
-// Check is called by the readyz endpoint handler.
-// It fails (returns an error) if we own the lease but had not been able to renew it.
-func (l *ReadyzAdaptor) Check(req *http.Request) error {
- l.pointerLock.Lock()
- defer l.pointerLock.Unlock()
- if l.le == nil {
- return fmt.Errorf("leaderElection is not setting")
- }
- if !l.le.IsLeader() {
- return fmt.Errorf("not yet a leader")
- }
- return l.le.Check(l.timeout)
-}
-
-// SetLeaderElection ties a leader election object to a ReadyzAdaptor
-func (l *ReadyzAdaptor) SetLeaderElection(le *leaderelection.LeaderElector) {
- l.pointerLock.Lock()
- defer l.pointerLock.Unlock()
- l.le = le
-}
-
-// NewLeaderReadyzAdaptor creates a basic healthz adaptor to monitor a leader election.
-// timeout determines the time beyond the lease expiry to be allowed for timeout.
-// checks within the timeout period after the lease expires will still return healthy.
-func NewLeaderReadyzAdaptor(timeout time.Duration) *ReadyzAdaptor {
- result := &ReadyzAdaptor{
- timeout: timeout,
- }
- return result
-}
diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go
index 26d384b6a..815de9cf9 100644
--- a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go
+++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go
@@ -18,11 +18,9 @@ package v1alpha1
import (
"path"
- "time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilnet "k8s.io/apimachinery/pkg/util/net"
- componentbaseconfig "k8s.io/component-base/config"
"github.com/kubeedge/kubeedge/common/constants"
metaconfig "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/meta/v1alpha1"
@@ -37,6 +35,9 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig {
Kind: Kind,
APIVersion: path.Join(GroupName, APIVersion),
},
+ CommonConfig: &CommonConfig{
+ TunnelPort: constants.ServerPort,
+ },
KubeAPIConfig: &KubeAPIConfig{
Master: "",
ContentType: constants.DefaultKubeContentType,
@@ -165,15 +166,6 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig {
RestTimeout: 60,
},
},
- LeaderElection: &componentbaseconfig.LeaderElectionConfiguration{
- LeaderElect: false,
- LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
- RenewDeadline: metav1.Duration{Duration: 10 * time.Second},
- RetryPeriod: metav1.Duration{Duration: 2 * time.Second},
- ResourceLock: "endpointsleases",
- ResourceNamespace: constants.KubeEdgeNameSpace,
- ResourceName: "cloudcorelease",
- },
}
return c
}
@@ -221,8 +213,5 @@ func NewMinCloudCoreConfig() *CloudCoreConfig {
RestTimeout: 60,
},
},
- LeaderElection: &componentbaseconfig.LeaderElectionConfiguration{
- LeaderElect: false,
- },
}
}
diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go
index 462c4c01b..e2211ddec 100644
--- a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go
+++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go
@@ -20,7 +20,6 @@ import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- componentbaseconfig "k8s.io/component-base/config"
metaconfig "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/meta/v1alpha1"
)
@@ -28,14 +27,21 @@ import (
// CloudCoreConfig indicates the config of cloudCore which get from cloudCore config file
type CloudCoreConfig struct {
metav1.TypeMeta
+ // CommonConfig indicates common config for all modules
+ // +Required
+ CommonConfig *CommonConfig `json:"commonConfig,omitempty"`
// KubeAPIConfig indicates the kubernetes cluster info which cloudCore will connected
// +Required
KubeAPIConfig *KubeAPIConfig `json:"kubeAPIConfig,omitempty"`
// Modules indicates cloudCore modules config
// +Required
Modules *Modules `json:"modules,omitempty"`
- // Configuration for LeaderElection
- LeaderElection *componentbaseconfig.LeaderElectionConfiguration `json:"leaderelection,omitempty"`
+}
+
+// KubeAPIConfig indicates the configuration for interacting with k8s server
+type CommonConfig struct {
+ // TunnelPort indicates the port that the cloudcore tunnel listened
+ TunnelPort int `json:"tunnelPort,omitempty"`
}
// KubeAPIConfig indicates the configuration for interacting with k8s server
diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/validation/validation.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/validation/validation.go
index 5088a94ff..5ac584168 100644
--- a/pkg/apis/componentconfig/cloudcore/v1alpha1/validation/validation.go
+++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/validation/validation.go
@@ -41,7 +41,6 @@ func ValidateCloudCoreConfiguration(c *v1alpha1.CloudCoreConfig) field.ErrorList
allErrs = append(allErrs, ValidateModuleDeviceController(*c.Modules.DeviceController)...)
allErrs = append(allErrs, ValidateModuleSyncController(*c.Modules.SyncController)...)
allErrs = append(allErrs, ValidateModuleDynamicController(*c.Modules.DynamicController)...)
- allErrs = append(allErrs, ValidateLeaderElectionConfiguration(*c.LeaderElection)...)
allErrs = append(allErrs, ValidateModuleCloudStream(*c.Modules.CloudStream)...)
return allErrs
}
diff --git a/tests/integration/framework/util.go b/tests/integration/framework/util.go
index c906064f5..6bb583800 100644
--- a/tests/integration/framework/util.go
+++ b/tests/integration/framework/util.go
@@ -3,53 +3,10 @@ package framework
import (
"k8s.io/klog/v2"
- "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub"
- "github.com/kubeedge/kubeedge/cloud/pkg/cloudstream"
- "github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller"
- "github.com/kubeedge/kubeedge/cloud/pkg/dynamiccontroller"
- "github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller"
- "github.com/kubeedge/kubeedge/cloud/pkg/router"
- "github.com/kubeedge/kubeedge/cloud/pkg/synccontroller"
- "github.com/kubeedge/kubeedge/edge/pkg/common/dbm"
- "github.com/kubeedge/kubeedge/edge/pkg/devicetwin"
- "github.com/kubeedge/kubeedge/edge/pkg/edgehub"
- "github.com/kubeedge/kubeedge/edge/pkg/edgestream"
- "github.com/kubeedge/kubeedge/edge/pkg/eventbus"
- "github.com/kubeedge/kubeedge/edge/pkg/metamanager"
- "github.com/kubeedge/kubeedge/edge/pkg/servicebus"
- "github.com/kubeedge/kubeedge/edge/test"
cloudconfig "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1"
edgeconfig "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/edgecore/v1alpha1"
)
-// registerModules register all the modules
-func registerModules(i interface{}) {
- switch c := i.(type) {
- case *cloudconfig.CloudCoreConfig:
- cloudhub.Register(c.Modules.CloudHub)
- edgecontroller.Register(c.Modules.EdgeController)
- devicecontroller.Register(c.Modules.DeviceController)
- synccontroller.Register(c.Modules.SyncController)
- cloudstream.Register(c.Modules.CloudStream)
- router.Register(c.Modules.Router)
- dynamiccontroller.Register(c.Modules.DynamicController)
- case *edgeconfig.EdgeCoreConfig:
- devicetwin.Register(c.Modules.DeviceTwin, c.Modules.Edged.HostnameOverride)
- //edged.Register(c.Modules.Edged)
- edgehub.Register(c.Modules.EdgeHub, c.Modules.Edged.HostnameOverride)
- eventbus.Register(c.Modules.EventBus, c.Modules.Edged.HostnameOverride)
- //edgemesh.Register(c.Modules.EdgeMesh)
- metamanager.Register(c.Modules.MetaManager)
- servicebus.Register(c.Modules.ServiceBus)
- edgestream.Register(c.Modules.EdgeStream, c.Modules.Edged.HostnameOverride, c.Modules.Edged.NodeIP)
- test.Register(c.Modules.DBTest)
- // Note: Need to put it to the end, and wait for all models to register before executing
- dbm.InitDBConfig(c.DataBase.DriverName, c.DataBase.AliasName, c.DataBase.DataSource)
- default:
- klog.Error("unsupport config type!")
- }
-}
-
func DisableAllModules(i interface{}) {
switch config := i.(type) {
case *cloudconfig.CloudCoreConfig:
diff --git a/vendor/k8s.io/client-go/tools/leaderelection/OWNERS b/vendor/k8s.io/client-go/tools/leaderelection/OWNERS
deleted file mode 100644
index 9ece5e1ea..000000000
--- a/vendor/k8s.io/client-go/tools/leaderelection/OWNERS
+++ /dev/null
@@ -1,13 +0,0 @@
-# See the OWNERS docs at https://go.k8s.io/owners
-
-approvers:
-- mikedanese
-- timothysc
-reviewers:
-- wojtek-t
-- deads2k
-- mikedanese
-- gmarek
-- timothysc
-- ingvagabund
-- resouer
diff --git a/vendor/k8s.io/client-go/tools/leaderelection/healthzadaptor.go b/vendor/k8s.io/client-go/tools/leaderelection/healthzadaptor.go
deleted file mode 100644
index b93537291..000000000
--- a/vendor/k8s.io/client-go/tools/leaderelection/healthzadaptor.go
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
-Copyright 2015 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package leaderelection
-
-import (
- "net/http"
- "sync"
- "time"
-)
-
-// HealthzAdaptor associates the /healthz endpoint with the LeaderElection object.
-// It helps deal with the /healthz endpoint being set up prior to the LeaderElection.
-// This contains the code needed to act as an adaptor between the leader
-// election code the health check code. It allows us to provide health
-// status about the leader election. Most specifically about if the leader
-// has failed to renew without exiting the process. In that case we should
-// report not healthy and rely on the kubelet to take down the process.
-type HealthzAdaptor struct {
- pointerLock sync.Mutex
- le *LeaderElector
- timeout time.Duration
-}
-
-// Name returns the name of the health check we are implementing.
-func (l *HealthzAdaptor) Name() string {
- return "leaderElection"
-}
-
-// Check is called by the healthz endpoint handler.
-// It fails (returns an error) if we own the lease but had not been able to renew it.
-func (l *HealthzAdaptor) Check(req *http.Request) error {
- l.pointerLock.Lock()
- defer l.pointerLock.Unlock()
- if l.le == nil {
- return nil
- }
- return l.le.Check(l.timeout)
-}
-
-// SetLeaderElection ties a leader election object to a HealthzAdaptor
-func (l *HealthzAdaptor) SetLeaderElection(le *LeaderElector) {
- l.pointerLock.Lock()
- defer l.pointerLock.Unlock()
- l.le = le
-}
-
-// NewLeaderHealthzAdaptor creates a basic healthz adaptor to monitor a leader election.
-// timeout determines the time beyond the lease expiry to be allowed for timeout.
-// checks within the timeout period after the lease expires will still return healthy.
-func NewLeaderHealthzAdaptor(timeout time.Duration) *HealthzAdaptor {
- result := &HealthzAdaptor{
- timeout: timeout,
- }
- return result
-}
diff --git a/vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go b/vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go
deleted file mode 100644
index 483f0cba4..000000000
--- a/vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
-Copyright 2015 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-// Package leaderelection implements leader election of a set of endpoints.
-// It uses an annotation in the endpoints object to store the record of the
-// election state. This implementation does not guarantee that only one
-// client is acting as a leader (a.k.a. fencing).
-//
-// A client only acts on timestamps captured locally to infer the state of the
-// leader election. The client does not consider timestamps in the leader
-// election record to be accurate because these timestamps may not have been
-// produced by a local clock. The implemention does not depend on their
-// accuracy and only uses their change to indicate that another client has
-// renewed the leader lease. Thus the implementation is tolerant to arbitrary
-// clock skew, but is not tolerant to arbitrary clock skew rate.
-//
-// However the level of tolerance to skew rate can be configured by setting
-// RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a
-// maximum tolerated ratio of time passed on the fastest node to time passed on
-// the slowest node can be approximately achieved with a configuration that sets
-// the same ratio of LeaseDuration to RenewDeadline. For example if a user wanted
-// to tolerate some nodes progressing forward in time twice as fast as other nodes,
-// the user could set LeaseDuration to 60 seconds and RenewDeadline to 30 seconds.
-//
-// While not required, some method of clock synchronization between nodes in the
-// cluster is highly recommended. It's important to keep in mind when configuring
-// this client that the tolerance to skew rate varies inversely to master
-// availability.
-//
-// Larger clusters often have a more lenient SLA for API latency. This should be
-// taken into account when configuring the client. The rate of leader transitions
-// should be monitored and RetryPeriod and LeaseDuration should be increased
-// until the rate is stable and acceptably low. It's important to keep in mind
-// when configuring this client that the tolerance to API latency varies inversely
-// to master availability.
-//
-// DISCLAIMER: this is an alpha API. This library will likely change significantly
-// or even be removed entirely in subsequent releases. Depend on this API at
-// your own risk.
-package leaderelection
-
-import (
- "bytes"
- "context"
- "fmt"
- "time"
-
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/clock"
- "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- rl "k8s.io/client-go/tools/leaderelection/resourcelock"
-
- "k8s.io/klog/v2"
-)
-
-const (
- JitterFactor = 1.2
-)
-
-// NewLeaderElector creates a LeaderElector from a LeaderElectionConfig
-func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
- if lec.LeaseDuration <= lec.RenewDeadline {
- return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline")
- }
- if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
- return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
- }
- if lec.LeaseDuration < 1 {
- return nil, fmt.Errorf("leaseDuration must be greater than zero")
- }
- if lec.RenewDeadline < 1 {
- return nil, fmt.Errorf("renewDeadline must be greater than zero")
- }
- if lec.RetryPeriod < 1 {
- return nil, fmt.Errorf("retryPeriod must be greater than zero")
- }
- if lec.Callbacks.OnStartedLeading == nil {
- return nil, fmt.Errorf("OnStartedLeading callback must not be nil")
- }
- if lec.Callbacks.OnStoppedLeading == nil {
- return nil, fmt.Errorf("OnStoppedLeading callback must not be nil")
- }
-
- if lec.Lock == nil {
- return nil, fmt.Errorf("Lock must not be nil.")
- }
- le := LeaderElector{
- config: lec,
- clock: clock.RealClock{},
- metrics: globalMetricsFactory.newLeaderMetrics(),
- }
- le.metrics.leaderOff(le.config.Name)
- return &le, nil
-}
-
-type LeaderElectionConfig struct {
- // Lock is the resource that will be used for locking
- Lock rl.Interface
-
- // LeaseDuration is the duration that non-leader candidates will
- // wait to force acquire leadership. This is measured against time of
- // last observed ack.
- //
- // A client needs to wait a full LeaseDuration without observing a change to
- // the record before it can attempt to take over. When all clients are
- // shutdown and a new set of clients are started with different names against
- // the same leader record, they must wait the full LeaseDuration before
- // attempting to acquire the lease. Thus LeaseDuration should be as short as
- // possible (within your tolerance for clock skew rate) to avoid a possible
- // long waits in the scenario.
- //
- // Core clients default this value to 15 seconds.
- LeaseDuration time.Duration
- // RenewDeadline is the duration that the acting master will retry
- // refreshing leadership before giving up.
- //
- // Core clients default this value to 10 seconds.
- RenewDeadline time.Duration
- // RetryPeriod is the duration the LeaderElector clients should wait
- // between tries of actions.
- //
- // Core clients default this value to 2 seconds.
- RetryPeriod time.Duration
-
- // Callbacks are callbacks that are triggered during certain lifecycle
- // events of the LeaderElector
- Callbacks LeaderCallbacks
-
- // WatchDog is the associated health checker
- // WatchDog may be null if its not needed/configured.
- WatchDog *HealthzAdaptor
-
- // ReleaseOnCancel should be set true if the lock should be released
- // when the run context is cancelled. If you set this to true, you must
- // ensure all code guarded by this lease has successfully completed
- // prior to cancelling the context, or you may have two processes
- // simultaneously acting on the critical path.
- ReleaseOnCancel bool
-
- // Name is the name of the resource lock for debugging
- Name string
-}
-
-// LeaderCallbacks are callbacks that are triggered during certain
-// lifecycle events of the LeaderElector. These are invoked asynchronously.
-//
-// possible future callbacks:
-// * OnChallenge()
-type LeaderCallbacks struct {
- // OnStartedLeading is called when a LeaderElector client starts leading
- OnStartedLeading func(context.Context)
- // OnStoppedLeading is called when a LeaderElector client stops leading
- OnStoppedLeading func()
- // OnNewLeader is called when the client observes a leader that is
- // not the previously observed leader. This includes the first observed
- // leader when the client starts.
- OnNewLeader func(identity string)
-}
-
-// LeaderElector is a leader election client.
-type LeaderElector struct {
- config LeaderElectionConfig
- // internal bookkeeping
- observedRecord rl.LeaderElectionRecord
- observedRawRecord []byte
- observedTime time.Time
- // used to implement OnNewLeader(), may lag slightly from the
- // value observedRecord.HolderIdentity if the transition has
- // not yet been reported.
- reportedLeader string
-
- // clock is wrapper around time to allow for less flaky testing
- clock clock.Clock
-
- metrics leaderMetricsAdapter
-
- // name is the name of the resource lock for debugging
- name string
-}
-
-// Run starts the leader election loop
-func (le *LeaderElector) Run(ctx context.Context) {
- defer runtime.HandleCrash()
- defer func() {
- le.config.Callbacks.OnStoppedLeading()
- }()
-
- if !le.acquire(ctx) {
- return // ctx signalled done
- }
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
- go le.config.Callbacks.OnStartedLeading(ctx)
- le.renew(ctx)
-}
-
-// RunOrDie starts a client with the provided config or panics if the config
-// fails to validate.
-func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
- le, err := NewLeaderElector(lec)
- if err != nil {
- panic(err)
- }
- if lec.WatchDog != nil {
- lec.WatchDog.SetLeaderElection(le)
- }
- le.Run(ctx)
-}
-
-// GetLeader returns the identity of the last observed leader or returns the empty string if
-// no leader has yet been observed.
-func (le *LeaderElector) GetLeader() string {
- return le.observedRecord.HolderIdentity
-}
-
-// IsLeader returns true if the last observed leader was this client else returns false.
-func (le *LeaderElector) IsLeader() bool {
- return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
-}
-
-// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
-// Returns false if ctx signals done.
-func (le *LeaderElector) acquire(ctx context.Context) bool {
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
- succeeded := false
- desc := le.config.Lock.Describe()
- klog.Infof("attempting to acquire leader lease %v...", desc)
- wait.JitterUntil(func() {
- succeeded = le.tryAcquireOrRenew(ctx)
- le.maybeReportTransition()
- if !succeeded {
- klog.V(4).Infof("failed to acquire lease %v", desc)
- return
- }
- le.config.Lock.RecordEvent("became leader")
- le.metrics.leaderOn(le.config.Name)
- klog.Infof("successfully acquired lease %v", desc)
- cancel()
- }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
- return succeeded
-}
-
-// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
-func (le *LeaderElector) renew(ctx context.Context) {
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
- wait.Until(func() {
- timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
- defer timeoutCancel()
- err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
- return le.tryAcquireOrRenew(timeoutCtx), nil
- }, timeoutCtx.Done())
-
- le.maybeReportTransition()
- desc := le.config.Lock.Describe()
- if err == nil {
- klog.V(5).Infof("successfully renewed lease %v", desc)
- return
- }
- le.config.Lock.RecordEvent("stopped leading")
- le.metrics.leaderOff(le.config.Name)
- klog.Infof("failed to renew lease %v: %v", desc, err)
- cancel()
- }, le.config.RetryPeriod, ctx.Done())
-
- // if we hold the lease, give it up
- if le.config.ReleaseOnCancel {
- le.release()
- }
-}
-
-// release attempts to release the leader lease if we have acquired it.
-func (le *LeaderElector) release() bool {
- if !le.IsLeader() {
- return true
- }
- now := metav1.Now()
- leaderElectionRecord := rl.LeaderElectionRecord{
- LeaderTransitions: le.observedRecord.LeaderTransitions,
- LeaseDurationSeconds: 1,
- RenewTime: now,
- AcquireTime: now,
- }
- if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
- klog.Errorf("Failed to release lock: %v", err)
- return false
- }
- le.observedRecord = leaderElectionRecord
- le.observedTime = le.clock.Now()
- return true
-}
-
-// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
-// else it tries to renew the lease if it has already been acquired. Returns true
-// on success else returns false.
-func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
- now := metav1.Now()
- leaderElectionRecord := rl.LeaderElectionRecord{
- HolderIdentity: le.config.Lock.Identity(),
- LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
- RenewTime: now,
- AcquireTime: now,
- }
-
- // 1. obtain or create the ElectionRecord
- oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
- if err != nil {
- if !errors.IsNotFound(err) {
- klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
- return false
- }
- if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
- klog.Errorf("error initially creating leader election record: %v", err)
- return false
- }
- le.observedRecord = leaderElectionRecord
- le.observedTime = le.clock.Now()
- return true
- }
-
- // 2. Record obtained, check the Identity & Time
- if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
- le.observedRecord = *oldLeaderElectionRecord
- le.observedRawRecord = oldLeaderElectionRawRecord
- le.observedTime = le.clock.Now()
- }
- if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
- le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
- !le.IsLeader() {
- klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
- return false
- }
-
- // 3. We're going to try to update. The leaderElectionRecord is set to it's default
- // here. Let's correct it before updating.
- if le.IsLeader() {
- leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
- leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
- } else {
- leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
- }
-
- // update the lock itself
- if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
- klog.Errorf("Failed to update lock: %v", err)
- return false
- }
-
- le.observedRecord = leaderElectionRecord
- le.observedTime = le.clock.Now()
- return true
-}
-
-func (le *LeaderElector) maybeReportTransition() {
- if le.observedRecord.HolderIdentity == le.reportedLeader {
- return
- }
- le.reportedLeader = le.observedRecord.HolderIdentity
- if le.config.Callbacks.OnNewLeader != nil {
- go le.config.Callbacks.OnNewLeader(le.reportedLeader)
- }
-}
-
-// Check will determine if the current lease is expired by more than timeout.
-func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
- if !le.IsLeader() {
- // Currently not concerned with the case that we are hot standby
- return nil
- }
- // If we are more than timeout seconds after the lease duration that is past the timeout
- // on the lease renew. Time to start reporting ourselves as unhealthy. We should have
- // died but conditions like deadlock can prevent this. (See #70819)
- if le.clock.Since(le.observedTime) > le.config.LeaseDuration+maxTolerableExpiredLease {
- return fmt.Errorf("failed election to renew leadership on lease %s", le.config.Name)
- }
-
- return nil
-}
diff --git a/vendor/k8s.io/client-go/tools/leaderelection/metrics.go b/vendor/k8s.io/client-go/tools/leaderelection/metrics.go
deleted file mode 100644
index 65917bf88..000000000
--- a/vendor/k8s.io/client-go/tools/leaderelection/metrics.go
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
-Copyright 2018 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package leaderelection
-
-import (
- "sync"
-)
-
-// This file provides abstractions for setting the provider (e.g., prometheus)
-// of metrics.
-
-type leaderMetricsAdapter interface {
- leaderOn(name string)
- leaderOff(name string)
-}
-
-// GaugeMetric represents a single numerical value that can arbitrarily go up
-// and down.
-type SwitchMetric interface {
- On(name string)
- Off(name string)
-}
-
-type noopMetric struct{}
-
-func (noopMetric) On(name string) {}
-func (noopMetric) Off(name string) {}
-
-// defaultLeaderMetrics expects the caller to lock before setting any metrics.
-type defaultLeaderMetrics struct {
- // leader's value indicates if the current process is the owner of name lease
- leader SwitchMetric
-}
-
-func (m *defaultLeaderMetrics) leaderOn(name string) {
- if m == nil {
- return
- }
- m.leader.On(name)
-}
-
-func (m *defaultLeaderMetrics) leaderOff(name string) {
- if m == nil {
- return
- }
- m.leader.Off(name)
-}
-
-type noMetrics struct{}
-
-func (noMetrics) leaderOn(name string) {}
-func (noMetrics) leaderOff(name string) {}
-
-// MetricsProvider generates various metrics used by the leader election.
-type MetricsProvider interface {
- NewLeaderMetric() SwitchMetric
-}
-
-type noopMetricsProvider struct{}
-
-func (_ noopMetricsProvider) NewLeaderMetric() SwitchMetric {
- return noopMetric{}
-}
-
-var globalMetricsFactory = leaderMetricsFactory{
- metricsProvider: noopMetricsProvider{},
-}
-
-type leaderMetricsFactory struct {
- metricsProvider MetricsProvider
-
- onlyOnce sync.Once
-}
-
-func (f *leaderMetricsFactory) setProvider(mp MetricsProvider) {
- f.onlyOnce.Do(func() {
- f.metricsProvider = mp
- })
-}
-
-func (f *leaderMetricsFactory) newLeaderMetrics() leaderMetricsAdapter {
- mp := f.metricsProvider
- if mp == (noopMetricsProvider{}) {
- return noMetrics{}
- }
- return &defaultLeaderMetrics{
- leader: mp.NewLeaderMetric(),
- }
-}
-
-// SetProvider sets the metrics provider for all subsequently created work
-// queues. Only the first call has an effect.
-func SetProvider(metricsProvider MetricsProvider) {
- globalMetricsFactory.setProvider(metricsProvider)
-}
diff --git a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go
deleted file mode 100644
index 8c28026ba..000000000
--- a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
-Copyright 2017 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package resourcelock
-
-import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
-
- "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
-)
-
-// TODO: This is almost a exact replica of Endpoints lock.
-// going forwards as we self host more and more components
-// and use ConfigMaps as the means to pass that configuration
-// data we will likely move to deprecate the Endpoints lock.
-
-type ConfigMapLock struct {
- // ConfigMapMeta should contain a Name and a Namespace of a
- // ConfigMapMeta object that the LeaderElector will attempt to lead.
- ConfigMapMeta metav1.ObjectMeta
- Client corev1client.ConfigMapsGetter
- LockConfig ResourceLockConfig
- cm *v1.ConfigMap
-}
-
-// Get returns the election record from a ConfigMap Annotation
-func (cml *ConfigMapLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
- var record LeaderElectionRecord
- var err error
- cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(ctx, cml.ConfigMapMeta.Name, metav1.GetOptions{})
- if err != nil {
- return nil, nil, err
- }
- if cml.cm.Annotations == nil {
- cml.cm.Annotations = make(map[string]string)
- }
- recordBytes, found := cml.cm.Annotations[LeaderElectionRecordAnnotationKey]
- if found {
- if err := json.Unmarshal([]byte(recordBytes), &record); err != nil {
- return nil, nil, err
- }
- }
- return &record, []byte(recordBytes), nil
-}
-
-// Create attempts to create a LeaderElectionRecord annotation
-func (cml *ConfigMapLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
- recordBytes, err := json.Marshal(ler)
- if err != nil {
- return err
- }
- cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(ctx, &v1.ConfigMap{
- ObjectMeta: metav1.ObjectMeta{
- Name: cml.ConfigMapMeta.Name,
- Namespace: cml.ConfigMapMeta.Namespace,
- Annotations: map[string]string{
- LeaderElectionRecordAnnotationKey: string(recordBytes),
- },
- },
- }, metav1.CreateOptions{})
- return err
-}
-
-// Update will update an existing annotation on a given resource.
-func (cml *ConfigMapLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
- if cml.cm == nil {
- return errors.New("configmap not initialized, call get or create first")
- }
- recordBytes, err := json.Marshal(ler)
- if err != nil {
- return err
- }
- if cml.cm.Annotations == nil {
- cml.cm.Annotations = make(map[string]string)
- }
- cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
- cm, err := cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{})
- if err != nil {
- return err
- }
- cml.cm = cm
- return nil
-}
-
-// RecordEvent in leader election while adding meta-data
-func (cml *ConfigMapLock) RecordEvent(s string) {
- if cml.LockConfig.EventRecorder == nil {
- return
- }
- events := fmt.Sprintf("%v %v", cml.LockConfig.Identity, s)
- cml.LockConfig.EventRecorder.Eventf(&v1.ConfigMap{ObjectMeta: cml.cm.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events)
-}
-
-// Describe is used to convert details on current resource lock
-// into a string
-func (cml *ConfigMapLock) Describe() string {
- return fmt.Sprintf("%v/%v", cml.ConfigMapMeta.Namespace, cml.ConfigMapMeta.Name)
-}
-
-// Identity returns the Identity of the lock
-func (cml *ConfigMapLock) Identity() string {
- return cml.LockConfig.Identity
-}
diff --git a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go
deleted file mode 100644
index d11e43e33..000000000
--- a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
-Copyright 2016 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package resourcelock
-
-import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
-
- "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
-)
-
-type EndpointsLock struct {
- // EndpointsMeta should contain a Name and a Namespace of an
- // Endpoints object that the LeaderElector will attempt to lead.
- EndpointsMeta metav1.ObjectMeta
- Client corev1client.EndpointsGetter
- LockConfig ResourceLockConfig
- e *v1.Endpoints
-}
-
-// Get returns the election record from a Endpoints Annotation
-func (el *EndpointsLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
- var record LeaderElectionRecord
- var err error
- el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(ctx, el.EndpointsMeta.Name, metav1.GetOptions{})
- if err != nil {
- return nil, nil, err
- }
- if el.e.Annotations == nil {
- el.e.Annotations = make(map[string]string)
- }
- recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]
- if found {
- if err := json.Unmarshal([]byte(recordBytes), &record); err != nil {
- return nil, nil, err
- }
- }
- return &record, []byte(recordBytes), nil
-}
-
-// Create attempts to create a LeaderElectionRecord annotation
-func (el *EndpointsLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
- recordBytes, err := json.Marshal(ler)
- if err != nil {
- return err
- }
- el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(ctx, &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: el.EndpointsMeta.Name,
- Namespace: el.EndpointsMeta.Namespace,
- Annotations: map[string]string{
- LeaderElectionRecordAnnotationKey: string(recordBytes),
- },
- },
- }, metav1.CreateOptions{})
- return err
-}
-
-// Update will update and existing annotation on a given resource.
-func (el *EndpointsLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
- if el.e == nil {
- return errors.New("endpoint not initialized, call get or create first")
- }
- recordBytes, err := json.Marshal(ler)
- if err != nil {
- return err
- }
- if el.e.Annotations == nil {
- el.e.Annotations = make(map[string]string)
- }
- el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
- e, err := el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(ctx, el.e, metav1.UpdateOptions{})
- if err != nil {
- return err
- }
- el.e = e
- return nil
-}
-
-// RecordEvent in leader election while adding meta-data
-func (el *EndpointsLock) RecordEvent(s string) {
- if el.LockConfig.EventRecorder == nil {
- return
- }
- events := fmt.Sprintf("%v %v", el.LockConfig.Identity, s)
- el.LockConfig.EventRecorder.Eventf(&v1.Endpoints{ObjectMeta: el.e.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events)
-}
-
-// Describe is used to convert details on current resource lock
-// into a string
-func (el *EndpointsLock) Describe() string {
- return fmt.Sprintf("%v/%v", el.EndpointsMeta.Namespace, el.EndpointsMeta.Name)
-}
-
-// Identity returns the Identity of the lock
-func (el *EndpointsLock) Identity() string {
- return el.LockConfig.Identity
-}
diff --git a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go
deleted file mode 100644
index 74630a31f..000000000
--- a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
-Copyright 2016 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package resourcelock
-
-import (
- "context"
- "fmt"
-
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
- coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1"
- corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
-)
-
-const (
- LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader"
- EndpointsResourceLock = "endpoints"
- ConfigMapsResourceLock = "configmaps"
- LeasesResourceLock = "leases"
- EndpointsLeasesResourceLock = "endpointsleases"
- ConfigMapsLeasesResourceLock = "configmapsleases"
-)
-
-// LeaderElectionRecord is the record that is stored in the leader election annotation.
-// This information should be used for observational purposes only and could be replaced
-// with a random string (e.g. UUID) with only slight modification of this code.
-// TODO(mikedanese): this should potentially be versioned
-type LeaderElectionRecord struct {
- // HolderIdentity is the ID that owns the lease. If empty, no one owns this lease and
- // all callers may acquire. Versions of this library prior to Kubernetes 1.14 will not
- // attempt to acquire leases with empty identities and will wait for the full lease
- // interval to expire before attempting to reacquire. This value is set to empty when
- // a client voluntarily steps down.
- HolderIdentity string `json:"holderIdentity"`
- LeaseDurationSeconds int `json:"leaseDurationSeconds"`
- AcquireTime metav1.Time `json:"acquireTime"`
- RenewTime metav1.Time `json:"renewTime"`
- LeaderTransitions int `json:"leaderTransitions"`
-}
-
-// EventRecorder records a change in the ResourceLock.
-type EventRecorder interface {
- Eventf(obj runtime.Object, eventType, reason, message string, args ...interface{})
-}
-
-// ResourceLockConfig common data that exists across different
-// resource locks
-type ResourceLockConfig struct {
- // Identity is the unique string identifying a lease holder across
- // all participants in an election.
- Identity string
- // EventRecorder is optional.
- EventRecorder EventRecorder
-}
-
-// Interface offers a common interface for locking on arbitrary
-// resources used in leader election. The Interface is used
-// to hide the details on specific implementations in order to allow
-// them to change over time. This interface is strictly for use
-// by the leaderelection code.
-type Interface interface {
- // Get returns the LeaderElectionRecord
- Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)
-
- // Create attempts to create a LeaderElectionRecord
- Create(ctx context.Context, ler LeaderElectionRecord) error
-
- // Update will update and existing LeaderElectionRecord
- Update(ctx context.Context, ler LeaderElectionRecord) error
-
- // RecordEvent is used to record events
- RecordEvent(string)
-
- // Identity will return the locks Identity
- Identity() string
-
- // Describe is used to convert details on current resource lock
- // into a string
- Describe() string
-}
-
-// Manufacture will create a lock of a given type according to the input parameters
-func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) {
- endpointsLock := &EndpointsLock{
- EndpointsMeta: metav1.ObjectMeta{
- Namespace: ns,
- Name: name,
- },
- Client: coreClient,
- LockConfig: rlc,
- }
- configmapLock := &ConfigMapLock{
- ConfigMapMeta: metav1.ObjectMeta{
- Namespace: ns,
- Name: name,
- },
- Client: coreClient,
- LockConfig: rlc,
- }
- leaseLock := &LeaseLock{
- LeaseMeta: metav1.ObjectMeta{
- Namespace: ns,
- Name: name,
- },
- Client: coordinationClient,
- LockConfig: rlc,
- }
- switch lockType {
- case EndpointsResourceLock:
- return endpointsLock, nil
- case ConfigMapsResourceLock:
- return configmapLock, nil
- case LeasesResourceLock:
- return leaseLock, nil
- case EndpointsLeasesResourceLock:
- return &MultiLock{
- Primary: endpointsLock,
- Secondary: leaseLock,
- }, nil
- case ConfigMapsLeasesResourceLock:
- return &MultiLock{
- Primary: configmapLock,
- Secondary: leaseLock,
- }, nil
- default:
- return nil, fmt.Errorf("Invalid lock-type %s", lockType)
- }
-}
diff --git a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go
deleted file mode 100644
index a40349727..000000000
--- a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
-Copyright 2018 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package resourcelock
-
-import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
-
- coordinationv1 "k8s.io/api/coordination/v1"
- corev1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1"
-)
-
-type LeaseLock struct {
- // LeaseMeta should contain a Name and a Namespace of a
- // LeaseMeta object that the LeaderElector will attempt to lead.
- LeaseMeta metav1.ObjectMeta
- Client coordinationv1client.LeasesGetter
- LockConfig ResourceLockConfig
- lease *coordinationv1.Lease
-}
-
-// Get returns the election record from a Lease spec
-func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
- var err error
- ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
- if err != nil {
- return nil, nil, err
- }
- record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec)
- recordByte, err := json.Marshal(*record)
- if err != nil {
- return nil, nil, err
- }
- return record, recordByte, nil
-}
-
-// Create attempts to create a Lease
-func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
- var err error
- ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{
- ObjectMeta: metav1.ObjectMeta{
- Name: ll.LeaseMeta.Name,
- Namespace: ll.LeaseMeta.Namespace,
- },
- Spec: LeaderElectionRecordToLeaseSpec(&ler),
- }, metav1.CreateOptions{})
- return err
-}
-
-// Update will update an existing Lease spec.
-func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
- if ll.lease == nil {
- return errors.New("lease not initialized, call get or create first")
- }
- ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler)
-
- lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
- if err != nil {
- return err
- }
-
- ll.lease = lease
- return nil
-}
-
-// RecordEvent in leader election while adding meta-data
-func (ll *LeaseLock) RecordEvent(s string) {
- if ll.LockConfig.EventRecorder == nil {
- return
- }
- events := fmt.Sprintf("%v %v", ll.LockConfig.Identity, s)
- ll.LockConfig.EventRecorder.Eventf(&coordinationv1.Lease{ObjectMeta: ll.lease.ObjectMeta}, corev1.EventTypeNormal, "LeaderElection", events)
-}
-
-// Describe is used to convert details on current resource lock
-// into a string
-func (ll *LeaseLock) Describe() string {
- return fmt.Sprintf("%v/%v", ll.LeaseMeta.Namespace, ll.LeaseMeta.Name)
-}
-
-// Identity returns the Identity of the lock
-func (ll *LeaseLock) Identity() string {
- return ll.LockConfig.Identity
-}
-
-func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElectionRecord {
- var r LeaderElectionRecord
- if spec.HolderIdentity != nil {
- r.HolderIdentity = *spec.HolderIdentity
- }
- if spec.LeaseDurationSeconds != nil {
- r.LeaseDurationSeconds = int(*spec.LeaseDurationSeconds)
- }
- if spec.LeaseTransitions != nil {
- r.LeaderTransitions = int(*spec.LeaseTransitions)
- }
- if spec.AcquireTime != nil {
- r.AcquireTime = metav1.Time{spec.AcquireTime.Time}
- }
- if spec.RenewTime != nil {
- r.RenewTime = metav1.Time{spec.RenewTime.Time}
- }
- return &r
-
-}
-
-func LeaderElectionRecordToLeaseSpec(ler *LeaderElectionRecord) coordinationv1.LeaseSpec {
- leaseDurationSeconds := int32(ler.LeaseDurationSeconds)
- leaseTransitions := int32(ler.LeaderTransitions)
- return coordinationv1.LeaseSpec{
- HolderIdentity: &ler.HolderIdentity,
- LeaseDurationSeconds: &leaseDurationSeconds,
- AcquireTime: &metav1.MicroTime{ler.AcquireTime.Time},
- RenewTime: &metav1.MicroTime{ler.RenewTime.Time},
- LeaseTransitions: &leaseTransitions,
- }
-}
diff --git a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/multilock.go b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/multilock.go
deleted file mode 100644
index 5ee1dcbb5..000000000
--- a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/multilock.go
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
-Copyright 2019 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package resourcelock
-
-import (
- "bytes"
- "context"
- "encoding/json"
-
- apierrors "k8s.io/apimachinery/pkg/api/errors"
-)
-
-const (
- UnknownLeader = "leaderelection.k8s.io/unknown"
-)
-
-// MultiLock is used for lock's migration
-type MultiLock struct {
- Primary Interface
- Secondary Interface
-}
-
-// Get returns the older election record of the lock
-func (ml *MultiLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
- primary, primaryRaw, err := ml.Primary.Get(ctx)
- if err != nil {
- return nil, nil, err
- }
-
- secondary, secondaryRaw, err := ml.Secondary.Get(ctx)
- if err != nil {
- // Lock is held by old client
- if apierrors.IsNotFound(err) && primary.HolderIdentity != ml.Identity() {
- return primary, primaryRaw, nil
- }
- return nil, nil, err
- }
-
- if primary.HolderIdentity != secondary.HolderIdentity {
- primary.HolderIdentity = UnknownLeader
- primaryRaw, err = json.Marshal(primary)
- if err != nil {
- return nil, nil, err
- }
- }
- return primary, ConcatRawRecord(primaryRaw, secondaryRaw), nil
-}
-
-// Create attempts to create both primary lock and secondary lock
-func (ml *MultiLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
- err := ml.Primary.Create(ctx, ler)
- if err != nil && !apierrors.IsAlreadyExists(err) {
- return err
- }
- return ml.Secondary.Create(ctx, ler)
-}
-
-// Update will update and existing annotation on both two resources.
-func (ml *MultiLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
- err := ml.Primary.Update(ctx, ler)
- if err != nil {
- return err
- }
- _, _, err = ml.Secondary.Get(ctx)
- if err != nil && apierrors.IsNotFound(err) {
- return ml.Secondary.Create(ctx, ler)
- }
- return ml.Secondary.Update(ctx, ler)
-}
-
-// RecordEvent in leader election while adding meta-data
-func (ml *MultiLock) RecordEvent(s string) {
- ml.Primary.RecordEvent(s)
- ml.Secondary.RecordEvent(s)
-}
-
-// Describe is used to convert details on current resource lock
-// into a string
-func (ml *MultiLock) Describe() string {
- return ml.Primary.Describe()
-}
-
-// Identity returns the Identity of the lock
-func (ml *MultiLock) Identity() string {
- return ml.Primary.Identity()
-}
-
-func ConcatRawRecord(primaryRaw, secondaryRaw []byte) []byte {
- return bytes.Join([][]byte{primaryRaw, secondaryRaw}, []byte(","))
-}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 8b9a7c731..46693aaa1 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -1293,8 +1293,6 @@ k8s.io/client-go/tools/clientcmd/api
k8s.io/client-go/tools/clientcmd/api/latest
k8s.io/client-go/tools/clientcmd/api/v1
k8s.io/client-go/tools/events
-k8s.io/client-go/tools/leaderelection
-k8s.io/client-go/tools/leaderelection/resourcelock
k8s.io/client-go/tools/metrics
k8s.io/client-go/tools/pager
k8s.io/client-go/tools/record