diff options
| author | zhengxinwei <zhengxinwei@huawei.com> | 2023-07-22 14:55:12 +0800 |
|---|---|---|
| committer | zhengxinwei <zhengxinwei@huawei.com> | 2023-10-07 09:46:55 +0800 |
| commit | aa3ba72d383d125ec3d84783a539d5b0d4c5c62c (patch) | |
| tree | cac86dfb50fb6608e1b77c2e722d0661593fdabf /edge/pkg | |
| parent | Merge pull request #5040 from Onion-of-dreamed/fix/without-mqtt-tag (diff) | |
| download | kubeedge-aa3ba72d383d125ec3d84783a539d5b0d4c5c62c.tar.gz | |
metaserver support pass through API, open /version path
Signed-off-by: zhengxinwei <zhengxinwei@huawei.com>
Diffstat (limited to 'edge/pkg')
7 files changed, 307 insertions, 4 deletions
diff --git a/edge/pkg/metamanager/metaserver/handlerfactory/handler.go b/edge/pkg/metamanager/metaserver/handlerfactory/handler.go index 1a33ec65b..777c4102c 100644 --- a/edge/pkg/metamanager/metaserver/handlerfactory/handler.go +++ b/edge/pkg/metamanager/metaserver/handlerfactory/handler.go @@ -111,6 +111,23 @@ func (f *Factory) Update(req *request.RequestInfo) http.Handler { return h } +// PassThrough +// handel with the pass through request +func (f *Factory) PassThrough() http.Handler { + h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + options := metav1.GetOptions{} + result, err := f.storage.PassThrough(req.Context(), &options) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(result) + }) + return h +} + func (f *Factory) Patch(reqInfo *request.RequestInfo) http.Handler { scope := wrapScope{RequestScope: scope.NewRequestScope()} scope.Kind = schema.GroupVersionKind{ diff --git a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/client.go b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/client.go index 15c2f3116..5afc461f0 100644 --- a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/client.go +++ b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/client.go @@ -27,6 +27,8 @@ type Client interface { Inject(msg model.Message) InsertOrUpdateObj(ctx context.Context, obj runtime.Object) error DeleteObj(ctx context.Context, obj runtime.Object) error + InsertOrUpdatePassThroughObj(ctx context.Context, obj []byte, key string) error + GetPassThroughObj(ctx context.Context, key string) ([]byte, error) GetRevision() uint64 SetRevision(version interface{}) diff --git a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/fake/fake.go b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/fake/fake.go new file mode 100644 index 000000000..2d3d5c0c8 --- /dev/null +++ b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/fake/fake.go @@ -0,0 +1,75 @@ +package fake + +import ( + "context" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + + "github.com/kubeedge/beehive/pkg/core/model" + "github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator" +) + +// Client fake +type Client struct { + InjectF func(msg model.Message) + InsertOrUpdateObjF func(ctx context.Context, obj runtime.Object) error + DeleteObjF func(ctx context.Context, obj runtime.Object) error + InsertOrUpdatePassThroughObjF func(ctx context.Context, obj []byte, key string) error + GetPassThroughObjF func(ctx context.Context, key string) ([]byte, error) + GetRevisionF func() uint64 + SetRevisionF func(version interface{}) + ListF func(ctx context.Context, key string) (imitator.Resp, error) + GetF func(ctx context.Context, key string) (imitator.Resp, error) + WatchF func(ctx context.Context, key string, ResourceVersion uint64) <-chan watch.Event +} + +// Inject fake +func (c Client) Inject(msg model.Message) { + c.InjectF(msg) +} + +// InsertOrUpdateObj fake +func (c Client) InsertOrUpdateObj(ctx context.Context, obj runtime.Object) error { + return c.InsertOrUpdateObjF(ctx, obj) +} + +// DeleteObj fake +func (c Client) DeleteObj(ctx context.Context, obj runtime.Object) error { + return c.DeleteObjF(ctx, obj) +} + +// InsertOrUpdatePassThroughObj fake +func (c Client) InsertOrUpdatePassThroughObj(ctx context.Context, obj []byte, key string) error { + return c.InsertOrUpdatePassThroughObjF(ctx, obj, key) +} + +// GetPassThroughObj fake +func (c Client) GetPassThroughObj(ctx context.Context, key string) ([]byte, error) { + return c.GetPassThroughObjF(ctx, key) +} + +// GetRevision fake +func (c Client) GetRevision() uint64 { + return c.GetRevisionF() +} + +// SetRevision fake +func (c Client) SetRevision(version interface{}) { + c.SetRevisionF(version) +} + +// List fake +func (c Client) List(ctx context.Context, key string) (imitator.Resp, error) { + return c.ListF(ctx, key) +} + +// Get fake +func (c Client) Get(ctx context.Context, key string) (imitator.Resp, error) { + return c.GetF(ctx, key) +} + +// Watch fake +func (c Client) Watch(ctx context.Context, key string, ResourceVersion uint64) <-chan watch.Event { + return c.WatchF(ctx, key, ResourceVersion) +} diff --git a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/imitator.go b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/imitator.go index 04d1be044..75415d7b2 100644 --- a/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/imitator.go +++ b/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/imitator.go @@ -83,9 +83,13 @@ func (s *imitator) InsertOrUpdateObj(ctx context.Context, obj runtime.Object) er ResourceVersion: objRv, Value: buf.String(), } + return s.insertOrReplaceMetaV2(m, objRv) +} + +func (s *imitator) insertOrReplaceMetaV2(m v2.MetaV2, objRv uint64) error { s.lock.Lock() defer s.lock.Unlock() - _, err = dbm.DBAccess.Raw("INSERT OR REPLACE INTO meta_v2 (key, groupversionresource, namespace,name,resourceversion,value) VALUES (?,?,?,?,?,?)", m.Key, m.GroupVersionResource, m.Namespace, m.Name, m.ResourceVersion, m.Value).Exec() + _, err := dbm.DBAccess.Raw("INSERT OR REPLACE INTO meta_v2 (key, groupversionresource, namespace,name,resourceversion,value) VALUES (?,?,?,?,?,?)", m.Key, m.GroupVersionResource, m.Namespace, m.Name, m.ResourceVersion, m.Value).Exec() var maxRetryTimes = 3 for i := 1; err != nil; i++ { klog.Errorf("failed to access database:%v", err) @@ -97,9 +101,36 @@ func (s *imitator) InsertOrUpdateObj(ctx context.Context, obj runtime.Object) er if objRv > s.GetRevision() { s.SetRevision(objRv) } - klog.V(4).Infof("[metaserver]successfully insert or update obj:%v", key) + klog.V(4).Infof("[metaserver]successfully insert or update obj:%v", m.Key) return nil } + +func (s *imitator) InsertOrUpdatePassThroughObj(ctx context.Context, obj []byte, key string) error { + m := v2.MetaV2{ + Key: key, + Value: string(obj), + } + return s.insertOrReplaceMetaV2(m, 0) +} + +func (s *imitator) GetPassThroughObj(ctx context.Context, key string) ([]byte, error) { + s.lock.Lock() + defer s.lock.Unlock() + results := new([]v2.MetaV2) + _, err := dbm.DBAccess.QueryTable(v2.NewMetaTableName).Filter(v2.KEY, key).All(results) + if err != nil { + return nil, err + } + + switch { + case len(*results) == 1: + klog.V(4).Infof("[metaserver]successfully insert or update obj:%v", key) + return []byte((*results)[0].Value), nil + default: + return nil, fmt.Errorf("the server could not find the requested resource") + } +} + func (s *imitator) Delete(ctx context.Context, key string) error { m := v2.MetaV2{ Key: key, diff --git a/edge/pkg/metamanager/metaserver/kubernetes/storage/storage.go b/edge/pkg/metamanager/metaserver/kubernetes/storage/storage.go index e82964112..f28a6307b 100644 --- a/edge/pkg/metamanager/metaserver/kubernetes/storage/storage.go +++ b/edge/pkg/metamanager/metaserver/kubernetes/storage/storage.go @@ -114,6 +114,44 @@ func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions) return obj, err } +// PassThrough +// The request is routed to the dynamic controller via the metaServer. +// If the request is approved, the response will be saved to local storage. +// It will be acquired from local data storage if it fails. +func (r *REST) PassThrough(ctx context.Context, options *metav1.GetOptions) ([]byte, error) { + info, _ := apirequest.RequestInfoFrom(ctx) + resp, err := func() ([]byte, error) { + app, err := r.Agent.Generate(ctx, metaserver.ApplicationVerb(info.Verb), *options, nil) + if err != nil { + klog.Errorf("[metaserver/passThrough] failed to generate application: %v", err) + return nil, err + } + err = r.Agent.Apply(app) + defer app.Close() + if err != nil { + klog.Errorf("[metaserver/passThrough] failed to request from cloud: %v", err) + return nil, err + } + + err = imitator.DefaultV2Client.InsertOrUpdatePassThroughObj(context.TODO(), app.RespBody, app.Key) + if err != nil { + klog.Warningf("[metaserver/passThrough] failed to insert version information into database: %v", err) + } + return app.RespBody, nil + }() + if err != nil { + resp, err = imitator.DefaultV2Client.GetPassThroughObj(ctx, info.Path) + if err != nil { + klog.Errorf("[metaserver/reststorage] failed to get req at local: %v", err) + return nil, errors.NewNotFound(schema.GroupResource{Group: info.APIGroup, Resource: info.Resource}, info.Name) + } + klog.Infof("[metaserver/reststorage] successfully process get req (%v) at local", info.Path) + } + + klog.Infof("[metaserver/passThrough] successfully process request (%v)", info.Path) + return resp, nil +} + func (r *REST) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { info, _ := apirequest.RequestInfoFrom(ctx) // First try to list the object from remote cloud diff --git a/edge/pkg/metamanager/metaserver/kubernetes/storage/storage_test.go b/edge/pkg/metamanager/metaserver/kubernetes/storage/storage_test.go new file mode 100644 index 000000000..298d734dd --- /dev/null +++ b/edge/pkg/metamanager/metaserver/kubernetes/storage/storage_test.go @@ -0,0 +1,128 @@ +package storage + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "sync" + "testing" + "time" + + "github.com/agiledragon/gomonkey" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/endpoints/request" + + beehiveContext "github.com/kubeedge/beehive/pkg/core/context" + "github.com/kubeedge/beehive/pkg/core/model" + connect "github.com/kubeedge/kubeedge/edge/pkg/common/cloudconnection" + "github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/agent" + "github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator" + fakeclient "github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator/fake" + "github.com/kubeedge/kubeedge/pkg/metaserver" +) + +func TestREST_PassThrough(t *testing.T) { + type testCase struct { + isConnectFailed bool + isSendSyncFailed bool + isLocalStored bool + isInsertLocalStorageFailed bool + } + cases := testCase{} + + fakeClient := fakeclient.Client{ + InsertOrUpdatePassThroughObjF: func(ctx context.Context, obj []byte, key string) error { + if cases.isInsertLocalStorageFailed { + return fmt.Errorf("insert local storage failed") + } + return nil + }, + GetPassThroughObjF: func(ctx context.Context, key string) ([]byte, error) { + if !cases.isLocalStored { + return nil, fmt.Errorf("local does not store it") + } + return []byte("test"), nil + }, + } + patch := gomonkey.NewPatches() + defer patch.Reset() + patch.ApplyFunc(connect.IsConnected, func() bool { + return !cases.isConnectFailed + }).ApplyFunc(beehiveContext.SendSync, func(string, model.Message, time.Duration) (model.Message, error) { + app := metaserver.Application{ + RespBody: []byte("test"), + Status: metaserver.Approved, + Reason: "ok", + } + if cases.isSendSyncFailed { + app.Status = metaserver.Failed + app.Reason = "isSendSyncFailed" + } + content, _ := json.Marshal(app) + return model.Message{ + Content: content, + }, nil + }).ApplyGlobalVar(&imitator.DefaultV2Client, fakeClient) + + var tests = []struct { + name string + rest *REST + info request.RequestInfo + cases testCase + want []byte + wantErr bool + }{ + { + name: "test isConnectFailed ", + info: request.RequestInfo{}, + cases: testCase{isConnectFailed: true}, + wantErr: true, + }, { + name: "test isSendSyncFailed ", + info: request.RequestInfo{}, + cases: testCase{isSendSyncFailed: true}, + wantErr: true, + }, { + name: "test get version from cloud failed, but local stored", + info: request.RequestInfo{ + Path: "/versions", + Verb: "get", + }, + cases: testCase{isSendSyncFailed: true, isLocalStored: true}, + want: []byte("test"), + }, { + name: "test successfully get the version from the cloud, but insert local storage failed ", + info: request.RequestInfo{ + Path: "/versions", + Verb: "get", + }, + cases: testCase{isInsertLocalStorageFailed: true}, + want: []byte("test"), + }, { + name: "test successfully get the version from the cloud ", + info: request.RequestInfo{ + Path: "/versions", + Verb: "get", + }, + want: []byte("test"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := request.WithRequestInfo(context.TODO(), &tt.info) + rest := &REST{ + Agent: &agent.Agent{Applications: sync.Map{}}, + } + cases = tt.cases + got, err := rest.PassThrough(ctx, &metav1.GetOptions{}) + if (err != nil) != tt.wantErr { + t.Errorf("PassThrough() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("PassThrough() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/edge/pkg/metamanager/metaserver/server.go b/edge/pkg/metamanager/metaserver/server.go index 66c3684c5..7bc1b9d49 100644 --- a/edge/pkg/metamanager/metaserver/server.go +++ b/edge/pkg/metamanager/metaserver/server.go @@ -40,6 +40,7 @@ import ( "github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/handlerfactory" "github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/kubernetes/serializer" kefeatures "github.com/kubeedge/kubeedge/pkg/features" + "github.com/kubeedge/kubeedge/pkg/util/pass-through" ) // MetaServer is simplification of server.GenericAPIServer @@ -214,7 +215,13 @@ func (ls *MetaServer) BuildBasicHandler() http.Handler { reqInfo, ok := apirequest.RequestInfoFrom(ctx) //klog.Infof("[metaserver]get a req(%v)(%v)", reqInfo.Path, reqInfo.Verb) //klog.Infof("[metaserver]get a req(\nPath:%v; \nVerb:%v; \nHeader:%+v)", reqInfo.Path, reqInfo.Verb, req.Header) - if ok && reqInfo.IsResourceRequest { + if !ok { + err := fmt.Errorf("invalid request") + responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req) + return + } + + if reqInfo.IsResourceRequest { switch { case reqInfo.Verb == "get": ls.Factory.Get().ServeHTTP(w, req) @@ -235,7 +242,12 @@ func (ls *MetaServer) BuildBasicHandler() http.Handler { return } - err := fmt.Errorf("not a resource req") + if passthrough.IsPassThroughPath(reqInfo.Path, reqInfo.Verb) { + ls.Factory.PassThrough().ServeHTTP(w, req) + return + } + + err := fmt.Errorf("request[%s::%s] isn't supported", reqInfo.Path, reqInfo.Verb) responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req) }) } |
