diff options
Diffstat (limited to 'staging/src/github.com')
4 files changed, 233 insertions, 127 deletions
diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/influxdb2/handler.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/influxdb2/handler.go new file mode 100644 index 000000000..287bb5616 --- /dev/null +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/influxdb2/handler.go @@ -0,0 +1,73 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package influxdb2 + +import ( + "context" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/Template/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2ClientConfig, twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2DataConfig) + if err != nil { + klog.Errorf("new database client error: %v", err) + return + } + dbClient := dbConfig.InitDbClient() + if err != nil { + klog.Errorf("init database client err: %v", err) + return + } + reportCycle := time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + + err = dbConfig.AddData(dataModel, dbClient) + if err != nil { + klog.Errorf("influx database add data error: %v", err) + return + } + case <-ctx.Done(): + dbConfig.CloseSession(dbClient) + return + } + } + }() +} diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/redis/handler.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/redis/handler.go new file mode 100644 index 000000000..9363f6f0e --- /dev/null +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/redis/handler.go @@ -0,0 +1,74 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package redis + +import ( + "context" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/Template/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.RedisClientConfig) + if err != nil { + klog.Errorf("new database client error: %v", err) + return + } + err = dbConfig.InitDbClient() + if err != nil { + klog.Errorf("init redis database client err: %v", err) + return + } + reportCycle := time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + + err = dbConfig.AddData(dataModel) + if err != nil { + klog.Errorf("redis database add data error: %v", err) + return + } + case <-ctx.Done(): + dbConfig.CloseSession() + return + } + } + }() + +} diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/tdengine/handler.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/tdengine/handler.go new file mode 100644 index 000000000..2840a6ed3 --- /dev/null +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/tdengine/handler.go @@ -0,0 +1,74 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tdengine + +import ( + "context" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/Template/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.TDEngineClientConfig) + if err != nil { + klog.Errorf("new database client error: %v", err) + return + } + err = dbConfig.InitDbClient() + if err != nil { + klog.Errorf("init database client err: %v", err) + return + } + reportCycle := time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + + err = dbConfig.AddData(dataModel) + if err != nil { + klog.Errorf("tdengine database add data error: %v", err) + return + } + case <-ctx.Done(): + dbConfig.CloseSessio() + return + } + } + }() + +} diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go index 2c780a4e6..e0260c256 100644 --- a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go @@ -141,6 +141,15 @@ func dataHandler(ctx context.Context, dev *driver.CustomizedDev) { if twin.Property.PushMethod.DBMethod.DBMethodName != "" { dataModel := common.NewDataModel(dev.Instance.Name, twin.Property.PropertyName, common.WithType(twin.ObservedDesired.Metadata.Type)) dbHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + switch twin.Property.PushMethod.DBMethod.DBMethodName { + // TODO add more database + case "influx": + dbInflux.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + case "redis": + dbRedis.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + case "tdengine": + dbTdengine.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + } } } } @@ -202,137 +211,13 @@ func dbHandler(ctx context.Context, twin *common.Twin, client *driver.Customized switch twin.Property.PushMethod.DBMethod.DBMethodName { // TODO add more database case "influx": - dbConfig, err := dbInflux.NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2ClientConfig, twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2DataConfig) - if err != nil { - klog.Errorf("new database client error: %v", err) - return - } - dbClient := dbConfig.InitDbClient() - if err != nil { - klog.Errorf("init database client err: %v", err) - return - } - reportCycle := time.Duration(twin.Property.ReportCycle) - if reportCycle == 0 { - reportCycle = common.DefaultReportCycle - } - ticker := time.NewTicker(reportCycle) - go func() { - for { - select { - case <-ticker.C: - deviceData, err := client.GetDeviceData(visitorConfig) - if err != nil { - klog.Errorf("publish error: %v", err) - continue - } - sData, err := common.ConvertToString(deviceData) - if err != nil { - klog.Errorf("Failed to convert publish method data : %v", err) - continue - } - dataModel.SetValue(sData) - dataModel.SetTimeStamp() + dbInflux.DataHandler(ctx, twin, client, visitorConfig, dataModel) - err = dbConfig.AddData(dataModel, dbClient) - if err != nil { - klog.Errorf("influx database add data error: %v", err) - return - } - case <-ctx.Done(): - dbConfig.CloseSession(dbClient) - return - } - } - }() case "redis": - dbConfig, err := dbRedis.NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.RedisClientConfig) - if err != nil { - klog.Errorf("new database client error: %v", err) - return - } - err = dbConfig.InitDbClient() - if err != nil { - klog.Errorf("init redis database client err: %v", err) - return - } - reportCycle := time.Duration(twin.Property.ReportCycle) - if reportCycle == 0 { - reportCycle = 1 * time.Second - } - ticker := time.NewTicker(reportCycle) - go func() { - for { - select { - case <-ticker.C: - deviceData, err := client.GetDeviceData(visitorConfig) - if err != nil { - klog.Errorf("publish error: %v", err) - continue - } - sData, err := common.ConvertToString(deviceData) - if err != nil { - klog.Errorf("Failed to convert publish method data : %v", err) - continue - } - dataModel.SetValue(sData) - dataModel.SetTimeStamp() + dbRedis.DataHandler(ctx, twin, client, visitorConfig, dataModel) - err = dbConfig.AddData(dataModel) - if err != nil { - klog.Errorf("redis database add data error: %v", err) - return - } - case <-ctx.Done(): - dbConfig.CloseSession() - return - } - } - }() case "tdengine": - dbConfig, err := dbTdengine.NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.TDEngineClientConfig) - if err != nil { - klog.Errorf("new database client error: %v", err) - return - } - err = dbConfig.InitDbClient() - if err != nil { - klog.Errorf("init database client err: %v", err) - return - } - reportCycle := time.Duration(twin.Property.ReportCycle) - if reportCycle == 0 { - reportCycle = 1 * time.Second - } - ticker := time.NewTicker(reportCycle) - go func() { - for { - select { - case <-ticker.C: - deviceData, err := client.GetDeviceData(visitorConfig) - if err != nil { - klog.Errorf("publish error: %v", err) - continue - } - sData, err := common.ConvertToString(deviceData) - if err != nil { - klog.Errorf("Failed to convert publish method data : %v", err) - continue - } - dataModel.SetValue(sData) - dataModel.SetTimeStamp() - - err = dbConfig.AddData(dataModel) - if err != nil { - klog.Errorf("tdengine database add data error: %v", err) - return - } - case <-ctx.Done(): - dbConfig.CloseSessio() - return - } - } - }() + dbTdengine.DataHandler(ctx, twin, client, visitorConfig, dataModel) } } |
