summaryrefslogtreecommitdiff
path: root/edge/pkg
diff options
context:
space:
mode:
authorzhengxinwei <zhengxinwei@huawei.com>2023-07-22 14:55:12 +0800
committerzhengxinwei <zhengxinwei@huawei.com>2023-10-07 09:46:55 +0800
commitaa3ba72d383d125ec3d84783a539d5b0d4c5c62c (patch)
treecac86dfb50fb6608e1b77c2e722d0661593fdabf /edge/pkg
parentMerge pull request #5040 from Onion-of-dreamed/fix/without-mqtt-tag (diff)
downloadkubeedge-aa3ba72d383d125ec3d84783a539d5b0d4c5c62c.tar.gz
metaserver support pass through API, open /version path
Signed-off-by: zhengxinwei <zhengxinwei@huawei.com>
Diffstat (limited to 'edge/pkg')
-rw-r--r--edge/pkg/metamanager/metaserver/handlerfactory/handler.go17
-rw-r--r--edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/client.go2
-rw-r--r--edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/fake/fake.go75
-rw-r--r--edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/imitator.go35
-rw-r--r--edge/pkg/metamanager/metaserver/kubernetes/storage/storage.go38
-rw-r--r--edge/pkg/metamanager/metaserver/kubernetes/storage/storage_test.go128
-rw-r--r--edge/pkg/metamanager/metaserver/server.go16
7 files changed, 307 insertions, 4 deletions
diff --git a/edge/pkg/metamanager/metaserver/handlerfactory/handler.go b/edge/pkg/metamanager/metaserver/handlerfactory/handler.go
index 1a33ec65b..777c4102c 100644
--- a/edge/pkg/metamanager/metaserver/handlerfactory/handler.go
+++ b/edge/pkg/metamanager/metaserver/handlerfactory/handler.go
@@ -111,6 +111,23 @@ func (f *Factory) Update(req *request.RequestInfo) http.Handler {
return h
}
+// PassThrough
+// handel with the pass through request
+func (f *Factory) PassThrough() http.Handler {
+ h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ options := metav1.GetOptions{}
+ result, err := f.storage.PassThrough(req.Context(), &options)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write(result)
+ })
+ return h
+}
+
func (f *Factory) Patch(reqInfo *request.RequestInfo) http.Handler {
scope := wrapScope{RequestScope: scope.NewRequestScope()}
scope.Kind = schema.GroupVersionKind{
diff --git a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/client.go b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/client.go
index 15c2f3116..5afc461f0 100644
--- a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/client.go
+++ b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/client.go
@@ -27,6 +27,8 @@ type Client interface {
Inject(msg model.Message)
InsertOrUpdateObj(ctx context.Context, obj runtime.Object) error
DeleteObj(ctx context.Context, obj runtime.Object) error
+ InsertOrUpdatePassThroughObj(ctx context.Context, obj []byte, key string) error
+ GetPassThroughObj(ctx context.Context, key string) ([]byte, error)
GetRevision() uint64
SetRevision(version interface{})
diff --git a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/fake/fake.go b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/fake/fake.go
new file mode 100644
index 000000000..2d3d5c0c8
--- /dev/null
+++ b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/fake/fake.go
@@ -0,0 +1,75 @@
+package fake
+
+import (
+ "context"
+
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/watch"
+
+ "github.com/kubeedge/beehive/pkg/core/model"
+ "github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator"
+)
+
+// Client fake
+type Client struct {
+ InjectF func(msg model.Message)
+ InsertOrUpdateObjF func(ctx context.Context, obj runtime.Object) error
+ DeleteObjF func(ctx context.Context, obj runtime.Object) error
+ InsertOrUpdatePassThroughObjF func(ctx context.Context, obj []byte, key string) error
+ GetPassThroughObjF func(ctx context.Context, key string) ([]byte, error)
+ GetRevisionF func() uint64
+ SetRevisionF func(version interface{})
+ ListF func(ctx context.Context, key string) (imitator.Resp, error)
+ GetF func(ctx context.Context, key string) (imitator.Resp, error)
+ WatchF func(ctx context.Context, key string, ResourceVersion uint64) <-chan watch.Event
+}
+
+// Inject fake
+func (c Client) Inject(msg model.Message) {
+ c.InjectF(msg)
+}
+
+// InsertOrUpdateObj fake
+func (c Client) InsertOrUpdateObj(ctx context.Context, obj runtime.Object) error {
+ return c.InsertOrUpdateObjF(ctx, obj)
+}
+
+// DeleteObj fake
+func (c Client) DeleteObj(ctx context.Context, obj runtime.Object) error {
+ return c.DeleteObjF(ctx, obj)
+}
+
+// InsertOrUpdatePassThroughObj fake
+func (c Client) InsertOrUpdatePassThroughObj(ctx context.Context, obj []byte, key string) error {
+ return c.InsertOrUpdatePassThroughObjF(ctx, obj, key)
+}
+
+// GetPassThroughObj fake
+func (c Client) GetPassThroughObj(ctx context.Context, key string) ([]byte, error) {
+ return c.GetPassThroughObjF(ctx, key)
+}
+
+// GetRevision fake
+func (c Client) GetRevision() uint64 {
+ return c.GetRevisionF()
+}
+
+// SetRevision fake
+func (c Client) SetRevision(version interface{}) {
+ c.SetRevisionF(version)
+}
+
+// List fake
+func (c Client) List(ctx context.Context, key string) (imitator.Resp, error) {
+ return c.ListF(ctx, key)
+}
+
+// Get fake
+func (c Client) Get(ctx context.Context, key string) (imitator.Resp, error) {
+ return c.GetF(ctx, key)
+}
+
+// Watch fake
+func (c Client) Watch(ctx context.Context, key string, ResourceVersion uint64) <-chan watch.Event {
+ return c.WatchF(ctx, key, ResourceVersion)
+}
diff --git a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/imitator.go b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/imitator.go
index 04d1be044..75415d7b2 100644
--- a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/imitator.go
+++ b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/imitator.go
@@ -83,9 +83,13 @@ func (s *imitator) InsertOrUpdateObj(ctx context.Context, obj runtime.Object) er
ResourceVersion: objRv,
Value: buf.String(),
}
+ return s.insertOrReplaceMetaV2(m, objRv)
+}
+
+func (s *imitator) insertOrReplaceMetaV2(m v2.MetaV2, objRv uint64) error {
s.lock.Lock()
defer s.lock.Unlock()
- _, err = dbm.DBAccess.Raw("INSERT OR REPLACE INTO meta_v2 (key, groupversionresource, namespace,name,resourceversion,value) VALUES (?,?,?,?,?,?)", m.Key, m.GroupVersionResource, m.Namespace, m.Name, m.ResourceVersion, m.Value).Exec()
+ _, err := dbm.DBAccess.Raw("INSERT OR REPLACE INTO meta_v2 (key, groupversionresource, namespace,name,resourceversion,value) VALUES (?,?,?,?,?,?)", m.Key, m.GroupVersionResource, m.Namespace, m.Name, m.ResourceVersion, m.Value).Exec()
var maxRetryTimes = 3
for i := 1; err != nil; i++ {
klog.Errorf("failed to access database:%v", err)
@@ -97,9 +101,36 @@ func (s *imitator) InsertOrUpdateObj(ctx context.Context, obj runtime.Object) er
if objRv > s.GetRevision() {
s.SetRevision(objRv)
}
- klog.V(4).Infof("[metaserver]successfully insert or update obj:%v", key)
+ klog.V(4).Infof("[metaserver]successfully insert or update obj:%v", m.Key)
return nil
}
+
+func (s *imitator) InsertOrUpdatePassThroughObj(ctx context.Context, obj []byte, key string) error {
+ m := v2.MetaV2{
+ Key: key,
+ Value: string(obj),
+ }
+ return s.insertOrReplaceMetaV2(m, 0)
+}
+
+func (s *imitator) GetPassThroughObj(ctx context.Context, key string) ([]byte, error) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ results := new([]v2.MetaV2)
+ _, err := dbm.DBAccess.QueryTable(v2.NewMetaTableName).Filter(v2.KEY, key).All(results)
+ if err != nil {
+ return nil, err
+ }
+
+ switch {
+ case len(*results) == 1:
+ klog.V(4).Infof("[metaserver]successfully insert or update obj:%v", key)
+ return []byte((*results)[0].Value), nil
+ default:
+ return nil, fmt.Errorf("the server could not find the requested resource")
+ }
+}
+
func (s *imitator) Delete(ctx context.Context, key string) error {
m := v2.MetaV2{
Key: key,
diff --git a/edge/pkg/metamanager/metaserver/kubernetes/storage/storage.go b/edge/pkg/metamanager/metaserver/kubernetes/storage/storage.go
index e82964112..f28a6307b 100644
--- a/edge/pkg/metamanager/metaserver/kubernetes/storage/storage.go
+++ b/edge/pkg/metamanager/metaserver/kubernetes/storage/storage.go
@@ -114,6 +114,44 @@ func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions)
return obj, err
}
+// PassThrough
+// The request is routed to the dynamic controller via the metaServer.
+// If the request is approved, the response will be saved to local storage.
+// It will be acquired from local data storage if it fails.
+func (r *REST) PassThrough(ctx context.Context, options *metav1.GetOptions) ([]byte, error) {
+ info, _ := apirequest.RequestInfoFrom(ctx)
+ resp, err := func() ([]byte, error) {
+ app, err := r.Agent.Generate(ctx, metaserver.ApplicationVerb(info.Verb), *options, nil)
+ if err != nil {
+ klog.Errorf("[metaserver/passThrough] failed to generate application: %v", err)
+ return nil, err
+ }
+ err = r.Agent.Apply(app)
+ defer app.Close()
+ if err != nil {
+ klog.Errorf("[metaserver/passThrough] failed to request from cloud: %v", err)
+ return nil, err
+ }
+
+ err = imitator.DefaultV2Client.InsertOrUpdatePassThroughObj(context.TODO(), app.RespBody, app.Key)
+ if err != nil {
+ klog.Warningf("[metaserver/passThrough] failed to insert version information into database: %v", err)
+ }
+ return app.RespBody, nil
+ }()
+ if err != nil {
+ resp, err = imitator.DefaultV2Client.GetPassThroughObj(ctx, info.Path)
+ if err != nil {
+ klog.Errorf("[metaserver/reststorage] failed to get req at local: %v", err)
+ return nil, errors.NewNotFound(schema.GroupResource{Group: info.APIGroup, Resource: info.Resource}, info.Name)
+ }
+ klog.Infof("[metaserver/reststorage] successfully process get req (%v) at local", info.Path)
+ }
+
+ klog.Infof("[metaserver/passThrough] successfully process request (%v)", info.Path)
+ return resp, nil
+}
+
func (r *REST) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
info, _ := apirequest.RequestInfoFrom(ctx)
// First try to list the object from remote cloud
diff --git a/edge/pkg/metamanager/metaserver/kubernetes/storage/storage_test.go b/edge/pkg/metamanager/metaserver/kubernetes/storage/storage_test.go
new file mode 100644
index 000000000..298d734dd
--- /dev/null
+++ b/edge/pkg/metamanager/metaserver/kubernetes/storage/storage_test.go
@@ -0,0 +1,128 @@
+package storage
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "reflect"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/agiledragon/gomonkey"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apiserver/pkg/endpoints/request"
+
+ beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
+ "github.com/kubeedge/beehive/pkg/core/model"
+ connect "github.com/kubeedge/kubeedge/edge/pkg/common/cloudconnection"
+ "github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/agent"
+ "github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator"
+ fakeclient "github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/fake"
+ "github.com/kubeedge/kubeedge/pkg/metaserver"
+)
+
+func TestREST_PassThrough(t *testing.T) {
+ type testCase struct {
+ isConnectFailed bool
+ isSendSyncFailed bool
+ isLocalStored bool
+ isInsertLocalStorageFailed bool
+ }
+ cases := testCase{}
+
+ fakeClient := fakeclient.Client{
+ InsertOrUpdatePassThroughObjF: func(ctx context.Context, obj []byte, key string) error {
+ if cases.isInsertLocalStorageFailed {
+ return fmt.Errorf("insert local storage failed")
+ }
+ return nil
+ },
+ GetPassThroughObjF: func(ctx context.Context, key string) ([]byte, error) {
+ if !cases.isLocalStored {
+ return nil, fmt.Errorf("local does not store it")
+ }
+ return []byte("test"), nil
+ },
+ }
+ patch := gomonkey.NewPatches()
+ defer patch.Reset()
+ patch.ApplyFunc(connect.IsConnected, func() bool {
+ return !cases.isConnectFailed
+ }).ApplyFunc(beehiveContext.SendSync, func(string, model.Message, time.Duration) (model.Message, error) {
+ app := metaserver.Application{
+ RespBody: []byte("test"),
+ Status: metaserver.Approved,
+ Reason: "ok",
+ }
+ if cases.isSendSyncFailed {
+ app.Status = metaserver.Failed
+ app.Reason = "isSendSyncFailed"
+ }
+ content, _ := json.Marshal(app)
+ return model.Message{
+ Content: content,
+ }, nil
+ }).ApplyGlobalVar(&imitator.DefaultV2Client, fakeClient)
+
+ var tests = []struct {
+ name string
+ rest *REST
+ info request.RequestInfo
+ cases testCase
+ want []byte
+ wantErr bool
+ }{
+ {
+ name: "test isConnectFailed ",
+ info: request.RequestInfo{},
+ cases: testCase{isConnectFailed: true},
+ wantErr: true,
+ }, {
+ name: "test isSendSyncFailed ",
+ info: request.RequestInfo{},
+ cases: testCase{isSendSyncFailed: true},
+ wantErr: true,
+ }, {
+ name: "test get version from cloud failed, but local stored",
+ info: request.RequestInfo{
+ Path: "/versions",
+ Verb: "get",
+ },
+ cases: testCase{isSendSyncFailed: true, isLocalStored: true},
+ want: []byte("test"),
+ }, {
+ name: "test successfully get the version from the cloud, but insert local storage failed ",
+ info: request.RequestInfo{
+ Path: "/versions",
+ Verb: "get",
+ },
+ cases: testCase{isInsertLocalStorageFailed: true},
+ want: []byte("test"),
+ }, {
+ name: "test successfully get the version from the cloud ",
+ info: request.RequestInfo{
+ Path: "/versions",
+ Verb: "get",
+ },
+ want: []byte("test"),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ctx := request.WithRequestInfo(context.TODO(), &tt.info)
+ rest := &REST{
+ Agent: &agent.Agent{Applications: sync.Map{}},
+ }
+ cases = tt.cases
+ got, err := rest.PassThrough(ctx, &metav1.GetOptions{})
+ if (err != nil) != tt.wantErr {
+ t.Errorf("PassThrough() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("PassThrough() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/edge/pkg/metamanager/metaserver/server.go b/edge/pkg/metamanager/metaserver/server.go
index 66c3684c5..7bc1b9d49 100644
--- a/edge/pkg/metamanager/metaserver/server.go
+++ b/edge/pkg/metamanager/metaserver/server.go
@@ -40,6 +40,7 @@ import (
"github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/handlerfactory"
"github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/kubernetes/serializer"
kefeatures "github.com/kubeedge/kubeedge/pkg/features"
+ "github.com/kubeedge/kubeedge/pkg/util/pass-through"
)
// MetaServer is simplification of server.GenericAPIServer
@@ -214,7 +215,13 @@ func (ls *MetaServer) BuildBasicHandler() http.Handler {
reqInfo, ok := apirequest.RequestInfoFrom(ctx)
//klog.Infof("[metaserver]get a req(%v)(%v)", reqInfo.Path, reqInfo.Verb)
//klog.Infof("[metaserver]get a req(\nPath:%v; \nVerb:%v; \nHeader:%+v)", reqInfo.Path, reqInfo.Verb, req.Header)
- if ok && reqInfo.IsResourceRequest {
+ if !ok {
+ err := fmt.Errorf("invalid request")
+ responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
+ return
+ }
+
+ if reqInfo.IsResourceRequest {
switch {
case reqInfo.Verb == "get":
ls.Factory.Get().ServeHTTP(w, req)
@@ -235,7 +242,12 @@ func (ls *MetaServer) BuildBasicHandler() http.Handler {
return
}
- err := fmt.Errorf("not a resource req")
+ if passthrough.IsPassThroughPath(reqInfo.Path, reqInfo.Verb) {
+ ls.Factory.PassThrough().ServeHTTP(w, req)
+ return
+ }
+
+ err := fmt.Errorf("request[%s::%s] isn't supported", reqInfo.Path, reqInfo.Verb)
responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
})
}