diff options
| author | KubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com> | 2024-01-15 16:18:25 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-01-15 16:18:25 +0800 |
| commit | 395e97716d2c5b24ffbc5d625bd83cba19879418 (patch) | |
| tree | da8eeee323be3f57ef5c2e472a734950fcfb99ed /staging | |
| parent | Merge pull request #5143 from Windrow14/feature/add_more_metaserver_non_resou... (diff) | |
| parent | add db handler (diff) | |
| download | kubeedge-395e97716d2c5b24ffbc5d625bd83cba19879418.tar.gz | |
Merge pull request #5277 from cl2017/dbhandler
Add DB handler
Diffstat (limited to 'staging')
4 files changed, 233 insertions, 127 deletions
diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/influxdb2/handler.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/influxdb2/handler.go new file mode 100644 index 000000000..287bb5616 --- /dev/null +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/influxdb2/handler.go @@ -0,0 +1,73 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package influxdb2 + +import ( + "context" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/Template/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2ClientConfig, twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2DataConfig) + if err != nil { + klog.Errorf("new database client error: %v", err) + return + } + dbClient := dbConfig.InitDbClient() + if err != nil { + klog.Errorf("init database client err: %v", err) + return + } + reportCycle := time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + + err = dbConfig.AddData(dataModel, dbClient) + if err != nil { + klog.Errorf("influx database add data error: %v", err) + return + } + case <-ctx.Done(): + dbConfig.CloseSession(dbClient) + return + } + } + }() +} diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/redis/handler.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/redis/handler.go new file mode 100644 index 000000000..9363f6f0e --- /dev/null +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/redis/handler.go @@ -0,0 +1,74 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package redis + +import ( + "context" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/Template/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.RedisClientConfig) + if err != nil { + klog.Errorf("new database client error: %v", err) + return + } + err = dbConfig.InitDbClient() + if err != nil { + klog.Errorf("init redis database client err: %v", err) + return + } + reportCycle := time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + + err = dbConfig.AddData(dataModel) + if err != nil { + klog.Errorf("redis database add data error: %v", err) + return + } + case <-ctx.Done(): + dbConfig.CloseSession() + return + } + } + }() + +} diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/tdengine/handler.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/tdengine/handler.go new file mode 100644 index 000000000..2840a6ed3 --- /dev/null +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/tdengine/handler.go @@ -0,0 +1,74 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tdengine + +import ( + "context" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/Template/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.TDEngineClientConfig) + if err != nil { + klog.Errorf("new database client error: %v", err) + return + } + err = dbConfig.InitDbClient() + if err != nil { + klog.Errorf("init database client err: %v", err) + return + } + reportCycle := time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + + err = dbConfig.AddData(dataModel) + if err != nil { + klog.Errorf("tdengine database add data error: %v", err) + return + } + case <-ctx.Done(): + dbConfig.CloseSessio() + return + } + } + }() + +} diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go index e003aa9ea..081b48185 100644 --- a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go @@ -143,6 +143,15 @@ func dataHandler(ctx context.Context, dev *driver.CustomizedDev) { if twin.Property.PushMethod.DBMethod.DBMethodName != "" { dataModel := common.NewDataModel(dev.Instance.Name, twin.Property.PropertyName, common.WithType(twin.ObservedDesired.Metadata.Type)) dbHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + switch twin.Property.PushMethod.DBMethod.DBMethodName { + // TODO add more database + case "influx": + dbInflux.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + case "redis": + dbRedis.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + case "tdengine": + dbTdengine.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + } } } } @@ -204,137 +213,13 @@ func dbHandler(ctx context.Context, twin *common.Twin, client *driver.Customized switch twin.Property.PushMethod.DBMethod.DBMethodName { // TODO add more database case "influx": - dbConfig, err := dbInflux.NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2ClientConfig, twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2DataConfig) - if err != nil { - klog.Errorf("new database client error: %v", err) - return - } - dbClient := dbConfig.InitDbClient() - if err != nil { - klog.Errorf("init database client err: %v", err) - return - } - reportCycle := time.Duration(twin.Property.ReportCycle) - if reportCycle == 0 { - reportCycle = common.DefaultReportCycle - } - ticker := time.NewTicker(reportCycle) - go func() { - for { - select { - case <-ticker.C: - deviceData, err := client.GetDeviceData(visitorConfig) - if err != nil { - klog.Errorf("publish error: %v", err) - continue - } - sData, err := common.ConvertToString(deviceData) - if err != nil { - klog.Errorf("Failed to convert publish method data : %v", err) - continue - } - dataModel.SetValue(sData) - dataModel.SetTimeStamp() + dbInflux.DataHandler(ctx, twin, client, visitorConfig, dataModel) - err = dbConfig.AddData(dataModel, dbClient) - if err != nil { - klog.Errorf("influx database add data error: %v", err) - return - } - case <-ctx.Done(): - dbConfig.CloseSession(dbClient) - return - } - } - }() case "redis": - dbConfig, err := dbRedis.NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.RedisClientConfig) - if err != nil { - klog.Errorf("new database client error: %v", err) - return - } - err = dbConfig.InitDbClient() - if err != nil { - klog.Errorf("init redis database client err: %v", err) - return - } - reportCycle := time.Duration(twin.Property.ReportCycle) - if reportCycle == 0 { - reportCycle = 1 * time.Second - } - ticker := time.NewTicker(reportCycle) - go func() { - for { - select { - case <-ticker.C: - deviceData, err := client.GetDeviceData(visitorConfig) - if err != nil { - klog.Errorf("publish error: %v", err) - continue - } - sData, err := common.ConvertToString(deviceData) - if err != nil { - klog.Errorf("Failed to convert publish method data : %v", err) - continue - } - dataModel.SetValue(sData) - dataModel.SetTimeStamp() + dbRedis.DataHandler(ctx, twin, client, visitorConfig, dataModel) - err = dbConfig.AddData(dataModel) - if err != nil { - klog.Errorf("redis database add data error: %v", err) - return - } - case <-ctx.Done(): - dbConfig.CloseSession() - return - } - } - }() case "tdengine": - dbConfig, err := dbTdengine.NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.TDEngineClientConfig) - if err != nil { - klog.Errorf("new database client error: %v", err) - return - } - err = dbConfig.InitDbClient() - if err != nil { - klog.Errorf("init database client err: %v", err) - return - } - reportCycle := time.Duration(twin.Property.ReportCycle) - if reportCycle == 0 { - reportCycle = 1 * time.Second - } - ticker := time.NewTicker(reportCycle) - go func() { - for { - select { - case <-ticker.C: - deviceData, err := client.GetDeviceData(visitorConfig) - if err != nil { - klog.Errorf("publish error: %v", err) - continue - } - sData, err := common.ConvertToString(deviceData) - if err != nil { - klog.Errorf("Failed to convert publish method data : %v", err) - continue - } - dataModel.SetValue(sData) - dataModel.SetTimeStamp() - - err = dbConfig.AddData(dataModel) - if err != nil { - klog.Errorf("tdengine database add data error: %v", err) - return - } - case <-ctx.Done(): - dbConfig.CloseSessio() - return - } - } - }() + dbTdengine.DataHandler(ctx, twin, client, visitorConfig, dataModel) } } |
