diff options
| author | KubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com> | 2021-08-13 09:20:03 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-08-13 09:20:03 +0800 |
| commit | 9307dc7d3557eb1646261fb0366ce60be1babe68 (patch) | |
| tree | a642f6ce5cbdcd8728b999f6a82ad1125e9138c2 | |
| parent | Merge pull request #2885 from gy95/automated-cherry-pick-of-#2876-upstream-re... (diff) | |
| parent | fix cherry-pick #3021 lint errors (diff) | |
| download | kubeedge-origin/release-1.7.tar.gz | |
Merge pull request #3027 from gy95/automated-cherry-pick-of-#3021-upstream-release-1.7v1.7.2origin/release-1.7
Automated cherry pick of #3021: support k8s v1.21.X
| -rw-r--r-- | .github/workflows/main.yaml | 4 | ||||
| -rw-r--r-- | cloud/pkg/cloudhub/channelq/channelq.go | 3 | ||||
| -rw-r--r-- | cloud/pkg/edgecontroller/controller/upstream.go | 49 | ||||
| -rw-r--r-- | common/constants/default.go | 2 | ||||
| -rw-r--r-- | edge/pkg/edged/volume_host.go | 4 | ||||
| -rw-r--r-- | edge/pkg/metamanager/client/metaclient.go | 5 | ||||
| -rw-r--r-- | edge/pkg/metamanager/client/serviceaccount.go | 83 | ||||
| -rw-r--r-- | edge/pkg/metamanager/process.go | 5 | ||||
| -rw-r--r-- | edgesite/README.md | 8 | ||||
| -rwxr-xr-x | hack/lib/install.sh | 2 | ||||
| -rw-r--r-- | pkg/apis/componentconfig/cloudcore/v1alpha1/default.go | 2 | ||||
| -rw-r--r-- | pkg/apis/componentconfig/cloudcore/v1alpha1/types.go | 6 | ||||
| -rw-r--r-- | pkg/apis/componentconfig/edgesite/v1alpha1/default.go | 2 | ||||
| -rw-r--r-- | staging/src/github.com/kubeedge/beehive/pkg/core/model/message.go | 21 |
14 files changed, 173 insertions, 23 deletions
diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 2cf3526a2..3fd860b9a 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -123,7 +123,7 @@ jobs: - name: Install dependences run: | command -v ginkgo || go get github.com/onsi/ginkgo/ginkgo - go get sigs.k8s.io/kind@v0.9.0 + go get sigs.k8s.io/kind@v0.11.1 curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.19.3/bin/linux/amd64/kubectl && sudo install kubectl /usr/local/bin/kubectl - name: Checkout code @@ -153,7 +153,7 @@ jobs: - name: Install dependences run: | command -v ginkgo || go get github.com/onsi/ginkgo/ginkgo - go get sigs.k8s.io/kind@v0.9.0 + go get sigs.k8s.io/kind@v0.11.1 curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.19.3/bin/linux/amd64/kubectl && sudo install kubectl /usr/local/bin/kubectl export RELEASE_VERSION=$(wget -qO - https://kubeedge.io/latestversion | cat) sudo wget -qP /etc/kubeedge https://github.com/kubeedge/kubeedge/releases/download/${RELEASE_VERSION}/checksum_kubeedge-${RELEASE_VERSION}-linux-amd64.tar.gz.txt diff --git a/cloud/pkg/cloudhub/channelq/channelq.go b/cloud/pkg/cloudhub/channelq/channelq.go index 4a20118ef..e3c6e4d3f 100644 --- a/cloud/pkg/cloudhub/channelq/channelq.go +++ b/cloud/pkg/cloudhub/channelq/channelq.go @@ -159,7 +159,8 @@ func isListResource(msg *beehiveModel.Message) bool { strings.Contains(msgResource, commonconst.ResourceTypeServiceList) || strings.Contains(msgResource, commonconst.ResourceTypeEndpointsList) || strings.Contains(msgResource, "membership") || - strings.Contains(msgResource, "twin/cloud_updated") { + strings.Contains(msgResource, "twin/cloud_updated") || + strings.Contains(msgResource, beehiveModel.ResourceTypeServiceAccountToken) { return true } diff --git a/cloud/pkg/edgecontroller/controller/upstream.go b/cloud/pkg/edgecontroller/controller/upstream.go index 93e56a64f..89b820f62 100644 --- a/cloud/pkg/edgecontroller/controller/upstream.go +++ b/cloud/pkg/edgecontroller/controller/upstream.go @@ -32,6 +32,7 @@ import ( "sort" "time" + authenticationv1 "k8s.io/api/authentication/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -94,6 +95,7 @@ type UpstreamController struct { nodeStatusChan chan model.Message podStatusChan chan model.Message secretChan chan model.Message + serviceAccountTokenChan chan model.Message configMapChan chan model.Message serviceChan chan model.Message endpointsChan chan model.Message @@ -131,6 +133,7 @@ func (uc *UpstreamController) Start() error { uc.updateNodeChan = make(chan model.Message, config.Config.Buffer.UpdateNode) uc.podDeleteChan = make(chan model.Message, config.Config.Buffer.DeletePod) uc.ruleStatusChan = make(chan model.Message, config.Config.Buffer.UpdateNodeStatus) + uc.serviceAccountTokenChan = make(chan model.Message, config.Config.Buffer.ServiceAccountToken) go uc.dispatchMessage() @@ -152,6 +155,9 @@ func (uc *UpstreamController) Start() error { for i := 0; i < int(config.Config.Load.QueryEndpointsWorkers); i++ { go uc.queryEndpoints() } + for i := 0; i < int(config.Config.Load.ServiceAccountTokenWorkers); i++ { + go uc.processServiceAccountToken() + } for i := 0; i < int(config.Config.Load.QueryPersistentVolumeWorkers); i++ { go uc.queryPersistentVolume() } @@ -215,6 +221,8 @@ func (uc *UpstreamController) dispatchMessage() { uc.serviceChan <- msg case common.ResourceTypeEndpoints: uc.endpointsChan <- msg + case model.ResourceTypeServiceAccountToken: + uc.serviceAccountTokenChan <- msg case common.ResourceTypePersistentVolume: uc.persistentVolumeChan <- msg case common.ResourceTypePersistentVolumeClaim: @@ -565,7 +573,7 @@ func (uc *UpstreamController) updateNodeStatus() { } } -func kubeClientGet(uc *UpstreamController, namespace string, name string, queryType string) (metaV1.Object, error) { +func kubeClientGet(uc *UpstreamController, namespace string, name string, queryType string, msg model.Message) (metaV1.Object, error) { var obj metaV1.Object var err error switch queryType { @@ -585,6 +593,8 @@ func kubeClientGet(uc *UpstreamController, namespace string, name string, queryT obj, err = uc.kubeClient.StorageV1().VolumeAttachments().Get(context.Background(), name, metaV1.GetOptions{}) case model.ResourceTypeNode: obj, err = uc.nodeLister.Get(name) + case model.ResourceTypeServiceAccountToken: + obj, err = uc.getServiceAccountToken(namespace, name, msg) default: err := stderrors.New("Wrong query type") klog.Error(err) @@ -608,7 +618,7 @@ func queryInner(uc *UpstreamController, msg model.Message, queryType string) { switch msg.GetOperation() { case model.QueryOperation: - object, err := kubeClientGet(uc, namespace, name, queryType) + object, err := kubeClientGet(uc, namespace, name, queryType, msg) if errors.IsNotFound(err) { klog.Warningf("message: %s process failure, resource not found, namespace: %s, name: %s", msg.GetID(), namespace, name) return @@ -676,6 +686,18 @@ func (uc *UpstreamController) queryService() { } } +func (uc *UpstreamController) processServiceAccountToken() { + for { + select { + case <-beehiveContext.Done(): + klog.Warning("stop process service account token") + return + case msg := <-uc.serviceAccountTokenChan: + queryInner(uc, msg, model.ResourceTypeServiceAccountToken) + } + } +} + func (uc *UpstreamController) queryEndpoints() { for { select { @@ -688,6 +710,28 @@ func (uc *UpstreamController) queryEndpoints() { } } +func (uc *UpstreamController) getServiceAccountToken(namespace string, name string, msg model.Message) (metaV1.Object, error) { + data, err := msg.GetContentData() + if err != nil { + klog.Errorf("get message body failed err %v", err) + return nil, err + } + + tr := authenticationv1.TokenRequest{} + if err := json.Unmarshal(data, &tr); err != nil { + klog.Errorf("unmarshal token request failed err %v", err) + return nil, err + } + + tokenRequest, err := uc.kubeClient.CoreV1().ServiceAccounts(namespace).CreateToken(context.TODO(), name, &tr, metaV1.CreateOptions{}) + if err != nil { + klog.Errorf("apiserver get service account token failed: err %v", err) + return nil, err + } + + return tokenRequest, nil +} + func (uc *UpstreamController) queryPersistentVolume() { for { select { @@ -1004,5 +1048,6 @@ func NewUpstreamController(factory k8sinformer.SharedInformerFactory) (*Upstream uc.podLister = factory.Core().V1().Pods().Lister() uc.configMapLister = factory.Core().V1().ConfigMaps().Lister() uc.secretLister = factory.Core().V1().Secrets().Lister() + return uc, nil } diff --git a/common/constants/default.go b/common/constants/default.go index 495f72ec5..22c14bb64 100644 --- a/common/constants/default.go +++ b/common/constants/default.go @@ -95,6 +95,7 @@ const ( DefaultUpdateNodeWorkers = 4 DefaultDeletePodWorkers = 4 DefaultUpdateRuleStatusWorkers = 4 + DefaultServiceAccountTokenWorkers = 4 DefaultUpdatePodStatusBuffer = 1024 DefaultUpdateNodeStatusBuffer = 1024 @@ -108,6 +109,7 @@ const ( DefaultQueryNodeBuffer = 1024 DefaultUpdateNodeBuffer = 1024 DefaultDeletePodBuffer = 1024 + DefaultServiceAccountTokenBuffer = 1024 DefaultPodEventBuffer = 1 DefaultConfigMapEventBuffer = 1 diff --git a/edge/pkg/edged/volume_host.go b/edge/pkg/edged/volume_host.go index 84c422894..cfcb86205 100644 --- a/edge/pkg/edged/volume_host.go +++ b/edge/pkg/edged/volume_host.go @@ -169,8 +169,8 @@ func (evh *edgedVolumeHost) GetPodsDir() string { } func (evh *edgedVolumeHost) GetServiceAccountTokenFunc() func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) { - return func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) { - return nil, fmt.Errorf("GetServiceAccountToken unsupported") + return func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) { + return evh.edge.metaClient.ServiceAccountToken().GetServiceAccountToken(namespace, name, tr) } } diff --git a/edge/pkg/metamanager/client/metaclient.go b/edge/pkg/metamanager/client/metaclient.go index e65241d77..c20642611 100644 --- a/edge/pkg/metamanager/client/metaclient.go +++ b/edge/pkg/metamanager/client/metaclient.go @@ -26,6 +26,7 @@ type CoreInterface interface { SecretsGetter EndpointsGetter ServiceGetter + ServiceAccountTokenGetter PersistentVolumesGetter PersistentVolumeClaimsGetter VolumeAttachmentsGetter @@ -56,6 +57,10 @@ func (m *metaClient) Secrets(namespace string) SecretsInterface { return newSecrets(namespace, m.send) } +func (m *metaClient) ServiceAccountToken() ServiceAccountTokenInterface { + return newServiceAccountToken(m.send) +} + func (m *metaClient) PodStatus(namespace string) PodStatusInterface { return newPodStatus(namespace, m.send) } diff --git a/edge/pkg/metamanager/client/serviceaccount.go b/edge/pkg/metamanager/client/serviceaccount.go new file mode 100644 index 000000000..eac9d5317 --- /dev/null +++ b/edge/pkg/metamanager/client/serviceaccount.go @@ -0,0 +1,83 @@ +package client + +import ( + "encoding/json" + "fmt" + + authenticationv1 "k8s.io/api/authentication/v1" + "k8s.io/klog/v2" + + "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/edge/pkg/common/message" + "github.com/kubeedge/kubeedge/edge/pkg/common/modules" + "github.com/kubeedge/kubeedge/edge/pkg/metamanager" +) + +// ServiceAccountTokenGetter is interface to get client service account token +type ServiceAccountTokenGetter interface { + ServiceAccountToken() ServiceAccountTokenInterface +} + +// ServiceAccountTokenInterface is interface for client service account token +type ServiceAccountTokenInterface interface { + GetServiceAccountToken(namespace string, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) +} + +type serviceAccountToken struct { + send SendInterface +} + +func newServiceAccountToken(s SendInterface) *serviceAccountToken { + return &serviceAccountToken{ + send: s, + } +} + +func (c *serviceAccountToken) GetServiceAccountToken(namespace string, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) { + resource := fmt.Sprintf("%s/%s/%s", namespace, model.ResourceTypeServiceAccountToken, name) + tokenMsg := message.BuildMsg(modules.MetaGroup, "", modules.EdgedModuleName, resource, model.QueryOperation, tr) + msg, err := c.send.SendSync(tokenMsg) + if err != nil { + klog.Errorf("get service account token from metaManager failed, err: %v", err) + return nil, fmt.Errorf("get service account token from metaManager failed, err: %v", err) + } + + content, err := msg.GetContentData() + if err != nil { + klog.Errorf("marshal message to serviceaccount token failed, err: %v", err) + return nil, fmt.Errorf("marshal message to serviceaccount token failed, err: %v", err) + } + + if msg.GetOperation() == model.ResponseOperation && msg.GetSource() == metamanager.MetaManagerModuleName { + return handleServiceAccountTokenFromMetaDB(content) + } + return handleServiceAccountTokenFromMetaManager(content) +} + +func handleServiceAccountTokenFromMetaDB(content []byte) (*authenticationv1.TokenRequest, error) { + var lists []string + err := json.Unmarshal([]byte(content), &lists) + if err != nil { + return nil, fmt.Errorf("unmarshal message to serviceaccount list from db failed, err: %v", err) + } + + if len(lists) != 1 { + return nil, fmt.Errorf("serviceaccount length from meta db is %d", len(lists)) + } + + var tokenRequest authenticationv1.TokenRequest + err = json.Unmarshal([]byte(lists[0]), &tokenRequest) + if err != nil { + return nil, fmt.Errorf("unmarshal message to serviceaccount token from db failed, err: %v", err) + } + return &tokenRequest, nil +} + +func handleServiceAccountTokenFromMetaManager(content []byte) (*authenticationv1.TokenRequest, error) { + var serviceAccount authenticationv1.TokenRequest + err := json.Unmarshal(content, &serviceAccount) + if err != nil { + return nil, fmt.Errorf("unmarshal message to service account failed, err: %v", err) + } + return &serviceAccount, nil +} diff --git a/edge/pkg/metamanager/process.go b/edge/pkg/metamanager/process.go index 9add319f7..03e477f77 100644 --- a/edge/pkg/metamanager/process.go +++ b/edge/pkg/metamanager/process.go @@ -101,7 +101,8 @@ func requireRemoteQuery(resType string) bool { resType == constants.ResourceTypePersistentVolume || resType == constants.ResourceTypePersistentVolumeClaim || resType == constants.ResourceTypeVolumeAttachment || - resType == model.ResourceTypeNode + resType == model.ResourceTypeNode || + resType == model.ResourceTypeServiceAccountToken } // if resource type is EdgeMesh related @@ -663,6 +664,8 @@ func (m *metaManager) process(message model.Message) { constants.CSIOperationTypeControllerPublishVolume, constants.CSIOperationTypeControllerUnpublishVolume: m.processVolume(message) + default: + klog.Errorf("metamanager not supported operation: %v", operation) } } diff --git a/edgesite/README.md b/edgesite/README.md index 44307c9c7..31ac1737e 100644 --- a/edgesite/README.md +++ b/edgesite/README.md @@ -19,12 +19,12 @@ install edgesite-server and edgesite-agent to access kube-apiserver in other sub ```bash bash build/tools/certgen.sh edgesiteAgent ``` - -2. copy **rootCA.crt** file and **edgesite-agent.key、edgesite-agent.crt** files generated in step 1 to your edgesite-agent host. + +2. copy **rootCA.crt** file and **edgesite-agent.key、edgesite-agent.crt** files generated in step 1 to your edgesite-agent host. Make sure that the /etc/kubeedge/ca/ and /etc/kubeedge/certs directories exist on the edgesite-agent host. For example, ```bash - # precondition: create /etc/kubeedge/ca directory and /etc/kubeedge/certs directory on the host which will install edgesite-agent. + # precondition: create /etc/kubeedge/ca directory and /etc/kubeedge/certs directory on the host which will install edgesite-agent. scp /etc/kubeedge/ca/rootCA.crt <account>@<edgesite_agent_ip>:/etc/kubeedge/ca/; \ scp /etc/kubeedge/certs/edgesite-agent.key /etc/kubeedge/certs/edgesite-agent.crt <account>@<edgesite_agent_ip>:/etc/kubeedge/certs/ ``` @@ -34,6 +34,6 @@ Make sure that the /etc/kubeedge/ca/ and /etc/kubeedge/certs directories exist o ```bash EDGESITE_SERVER_IP=<edgesite_server_ip> KUBE_APISERVER_IP=<kube-apiserver_ip> envsubst < build/edgesite/edgesite-agent.yaml | kubectl apply -f - ``` - + diff --git a/hack/lib/install.sh b/hack/lib/install.sh index a6aa87adb..494d7bc0f 100755 --- a/hack/lib/install.sh +++ b/hack/lib/install.sh @@ -32,7 +32,7 @@ function check_kind { command -v kind >/dev/null 2>&1 if [[ $? -ne 0 ]]; then echo "installing kind ." - GO111MODULE="on" go get sigs.k8s.io/kind@v0.9.0 + GO111MODULE="on" go get sigs.k8s.io/kind@v0.11.1 if [[ $? -ne 0 ]]; then echo "kind installed failed, exiting." exit 1 diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go index 815de9cf9..336a5cfa4 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go @@ -103,6 +103,7 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig { QueryNode: constants.DefaultQueryNodeBuffer, UpdateNode: constants.DefaultUpdateNodeBuffer, DeletePod: constants.DefaultDeletePodBuffer, + ServiceAccountToken: constants.DefaultServiceAccountTokenBuffer, }, Context: &ControllerContext{ SendModule: metaconfig.ModuleNameCloudHub, @@ -124,6 +125,7 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig { UpdateNodeWorkers: constants.DefaultUpdateNodeWorkers, DeletePodWorkers: constants.DefaultDeletePodWorkers, UpdateRuleStatusWorkers: constants.DefaultUpdateRuleStatusWorkers, + ServiceAccountTokenWorkers: constants.DefaultServiceAccountTokenWorkers, }, }, DeviceController: &DeviceController{ diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go index e2211ddec..bd1b92690 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go @@ -263,6 +263,9 @@ type EdgeControllerBuffer struct { // DeletePod indicates the buffer of delete pod message from edge // default 1024 DeletePod int32 `json:"deletePod,omitempty"` + // ServiceAccount indicates the buffer of service account token + // default 1024 + ServiceAccountToken int32 `json:"serviceAccountToken,omitempty"` } // ControllerContext indicates the message layer context for all controllers @@ -318,6 +321,9 @@ type EdgeControllerLoad struct { // UpdateRuleStatusWorkers indicates the load of update rule status // default 4 UpdateRuleStatusWorkers int32 `json:"UpdateRuleStatusWorkers,omitempty"` + // ServiceAccountTokenWorkers indicates the load of service account token + // default 4 + ServiceAccountTokenWorkers int32 `json:"ServiceAccountTokenWorkers,omitempty"` } // DeviceController indicates the device controller diff --git a/pkg/apis/componentconfig/edgesite/v1alpha1/default.go b/pkg/apis/componentconfig/edgesite/v1alpha1/default.go index 1745a5528..af5301fe6 100644 --- a/pkg/apis/componentconfig/edgesite/v1alpha1/default.go +++ b/pkg/apis/componentconfig/edgesite/v1alpha1/default.go @@ -75,6 +75,7 @@ func NewDefaultEdgeSiteConfig() *EdgeSiteConfig { QueryNode: constants.DefaultQueryNodeBuffer, UpdateNode: constants.DefaultUpdateNodeBuffer, DeletePod: constants.DefaultDeletePodBuffer, + ServiceAccountToken: constants.DefaultServiceAccountTokenBuffer, }, Context: &cloudcoreconfig.ControllerContext{ SendModule: metaconfig.ModuleNameMetaManager, @@ -95,6 +96,7 @@ func NewDefaultEdgeSiteConfig() *EdgeSiteConfig { UpdateNodeWorkers: constants.DefaultUpdateNodeWorkers, DeletePodWorkers: constants.DefaultDeletePodWorkers, UpdateRuleStatusWorkers: constants.DefaultUpdateRuleStatusWorkers, + ServiceAccountTokenWorkers: constants.DefaultServiceAccountTokenWorkers, }, }, Edged: &edgecoreconfig.Edged{ diff --git a/staging/src/github.com/kubeedge/beehive/pkg/core/model/message.go b/staging/src/github.com/kubeedge/beehive/pkg/core/model/message.go index 4df3b076b..b1fa4c216 100644 --- a/staging/src/github.com/kubeedge/beehive/pkg/core/model/message.go +++ b/staging/src/github.com/kubeedge/beehive/pkg/core/model/message.go @@ -17,16 +17,17 @@ const ( ResponseOperation = "response" ResponseErrorOperation = "error" - ResourceTypePod = "pod" - ResourceTypeConfigmap = "configmap" - ResourceTypeSecret = "secret" - ResourceTypeNode = "node" - ResourceTypePodlist = "podlist" - ResourceTypePodStatus = "podstatus" - ResourceTypeNodeStatus = "nodestatus" - ResourceTypeRule = "rule" - ResourceTypeRuleEndpoint = "ruleendpoint" - ResourceTypeRuleStatus = "rulestatus" + ResourceTypePod = "pod" + ResourceTypeConfigmap = "configmap" + ResourceTypeServiceAccountToken = "serviceaccounttoken" + ResourceTypeSecret = "secret" + ResourceTypeNode = "node" + ResourceTypePodlist = "podlist" + ResourceTypePodStatus = "podstatus" + ResourceTypeNodeStatus = "nodestatus" + ResourceTypeRule = "rule" + ResourceTypeRuleEndpoint = "ruleendpoint" + ResourceTypeRuleStatus = "rulestatus" ) // Message struct |
