summaryrefslogtreecommitdiff
path: root/staging/src/github.com
diff options
context:
space:
mode:
authorcl2017 <chenlin.liu@daocloud.io>2023-12-06 17:00:42 +0800
committercl2017 <chenlin.liu@daocloud.io>2024-01-10 17:03:46 +0800
commit8837c1d63c6ef35d2d7c5310f3838413aa234146 (patch)
tree2df542dd385219f22c3523be8889634c6288211c /staging/src/github.com
parentMerge pull request #5303 from luomengY/deviceandmodelorder (diff)
downloadkubeedge-8837c1d63c6ef35d2d7c5310f3838413aa234146.tar.gz
add db handler
Signed-off-by: cl2017 <chenlin.liu@daocloud.io>
Diffstat (limited to 'staging/src/github.com')
-rw-r--r--staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/influxdb2/handler.go73
-rw-r--r--staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/redis/handler.go74
-rw-r--r--staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/tdengine/handler.go74
-rw-r--r--staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go139
4 files changed, 233 insertions, 127 deletions
diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/influxdb2/handler.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/influxdb2/handler.go
new file mode 100644
index 000000000..287bb5616
--- /dev/null
+++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/influxdb2/handler.go
@@ -0,0 +1,73 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package influxdb2
+
+import (
+ "context"
+ "time"
+
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/Template/driver"
+ "github.com/kubeedge/mapper-framework/pkg/common"
+)
+
+func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) {
+ dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2ClientConfig, twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2DataConfig)
+ if err != nil {
+ klog.Errorf("new database client error: %v", err)
+ return
+ }
+ dbClient := dbConfig.InitDbClient()
+ if err != nil {
+ klog.Errorf("init database client err: %v", err)
+ return
+ }
+ reportCycle := time.Duration(twin.Property.ReportCycle)
+ if reportCycle == 0 {
+ reportCycle = common.DefaultReportCycle
+ }
+ ticker := time.NewTicker(reportCycle)
+ go func() {
+ for {
+ select {
+ case <-ticker.C:
+ deviceData, err := client.GetDeviceData(visitorConfig)
+ if err != nil {
+ klog.Errorf("publish error: %v", err)
+ continue
+ }
+ sData, err := common.ConvertToString(deviceData)
+ if err != nil {
+ klog.Errorf("Failed to convert publish method data : %v", err)
+ continue
+ }
+ dataModel.SetValue(sData)
+ dataModel.SetTimeStamp()
+
+ err = dbConfig.AddData(dataModel, dbClient)
+ if err != nil {
+ klog.Errorf("influx database add data error: %v", err)
+ return
+ }
+ case <-ctx.Done():
+ dbConfig.CloseSession(dbClient)
+ return
+ }
+ }
+ }()
+}
diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/redis/handler.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/redis/handler.go
new file mode 100644
index 000000000..9363f6f0e
--- /dev/null
+++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/redis/handler.go
@@ -0,0 +1,74 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package redis
+
+import (
+ "context"
+ "time"
+
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/Template/driver"
+ "github.com/kubeedge/mapper-framework/pkg/common"
+)
+
+func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) {
+ dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.RedisClientConfig)
+ if err != nil {
+ klog.Errorf("new database client error: %v", err)
+ return
+ }
+ err = dbConfig.InitDbClient()
+ if err != nil {
+ klog.Errorf("init redis database client err: %v", err)
+ return
+ }
+ reportCycle := time.Duration(twin.Property.ReportCycle)
+ if reportCycle == 0 {
+ reportCycle = common.DefaultReportCycle
+ }
+ ticker := time.NewTicker(reportCycle)
+ go func() {
+ for {
+ select {
+ case <-ticker.C:
+ deviceData, err := client.GetDeviceData(visitorConfig)
+ if err != nil {
+ klog.Errorf("publish error: %v", err)
+ continue
+ }
+ sData, err := common.ConvertToString(deviceData)
+ if err != nil {
+ klog.Errorf("Failed to convert publish method data : %v", err)
+ continue
+ }
+ dataModel.SetValue(sData)
+ dataModel.SetTimeStamp()
+
+ err = dbConfig.AddData(dataModel)
+ if err != nil {
+ klog.Errorf("redis database add data error: %v", err)
+ return
+ }
+ case <-ctx.Done():
+ dbConfig.CloseSession()
+ return
+ }
+ }
+ }()
+
+}
diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/tdengine/handler.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/tdengine/handler.go
new file mode 100644
index 000000000..2840a6ed3
--- /dev/null
+++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/dbmethod/tdengine/handler.go
@@ -0,0 +1,74 @@
+/*
+Copyright 2023 The KubeEdge Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tdengine
+
+import (
+ "context"
+ "time"
+
+ "k8s.io/klog/v2"
+
+ "github.com/kubeedge/Template/driver"
+ "github.com/kubeedge/mapper-framework/pkg/common"
+)
+
+func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) {
+ dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.TDEngineClientConfig)
+ if err != nil {
+ klog.Errorf("new database client error: %v", err)
+ return
+ }
+ err = dbConfig.InitDbClient()
+ if err != nil {
+ klog.Errorf("init database client err: %v", err)
+ return
+ }
+ reportCycle := time.Duration(twin.Property.ReportCycle)
+ if reportCycle == 0 {
+ reportCycle = common.DefaultReportCycle
+ }
+ ticker := time.NewTicker(reportCycle)
+ go func() {
+ for {
+ select {
+ case <-ticker.C:
+ deviceData, err := client.GetDeviceData(visitorConfig)
+ if err != nil {
+ klog.Errorf("publish error: %v", err)
+ continue
+ }
+ sData, err := common.ConvertToString(deviceData)
+ if err != nil {
+ klog.Errorf("Failed to convert publish method data : %v", err)
+ continue
+ }
+ dataModel.SetValue(sData)
+ dataModel.SetTimeStamp()
+
+ err = dbConfig.AddData(dataModel)
+ if err != nil {
+ klog.Errorf("tdengine database add data error: %v", err)
+ return
+ }
+ case <-ctx.Done():
+ dbConfig.CloseSessio()
+ return
+ }
+ }
+ }()
+
+}
diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go
index 2c780a4e6..e0260c256 100644
--- a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go
+++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go
@@ -141,6 +141,15 @@ func dataHandler(ctx context.Context, dev *driver.CustomizedDev) {
if twin.Property.PushMethod.DBMethod.DBMethodName != "" {
dataModel := common.NewDataModel(dev.Instance.Name, twin.Property.PropertyName, common.WithType(twin.ObservedDesired.Metadata.Type))
dbHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel)
+ switch twin.Property.PushMethod.DBMethod.DBMethodName {
+ // TODO add more database
+ case "influx":
+ dbInflux.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel)
+ case "redis":
+ dbRedis.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel)
+ case "tdengine":
+ dbTdengine.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel)
+ }
}
}
}
@@ -202,137 +211,13 @@ func dbHandler(ctx context.Context, twin *common.Twin, client *driver.Customized
switch twin.Property.PushMethod.DBMethod.DBMethodName {
// TODO add more database
case "influx":
- dbConfig, err := dbInflux.NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2ClientConfig, twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2DataConfig)
- if err != nil {
- klog.Errorf("new database client error: %v", err)
- return
- }
- dbClient := dbConfig.InitDbClient()
- if err != nil {
- klog.Errorf("init database client err: %v", err)
- return
- }
- reportCycle := time.Duration(twin.Property.ReportCycle)
- if reportCycle == 0 {
- reportCycle = common.DefaultReportCycle
- }
- ticker := time.NewTicker(reportCycle)
- go func() {
- for {
- select {
- case <-ticker.C:
- deviceData, err := client.GetDeviceData(visitorConfig)
- if err != nil {
- klog.Errorf("publish error: %v", err)
- continue
- }
- sData, err := common.ConvertToString(deviceData)
- if err != nil {
- klog.Errorf("Failed to convert publish method data : %v", err)
- continue
- }
- dataModel.SetValue(sData)
- dataModel.SetTimeStamp()
+ dbInflux.DataHandler(ctx, twin, client, visitorConfig, dataModel)
- err = dbConfig.AddData(dataModel, dbClient)
- if err != nil {
- klog.Errorf("influx database add data error: %v", err)
- return
- }
- case <-ctx.Done():
- dbConfig.CloseSession(dbClient)
- return
- }
- }
- }()
case "redis":
- dbConfig, err := dbRedis.NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.RedisClientConfig)
- if err != nil {
- klog.Errorf("new database client error: %v", err)
- return
- }
- err = dbConfig.InitDbClient()
- if err != nil {
- klog.Errorf("init redis database client err: %v", err)
- return
- }
- reportCycle := time.Duration(twin.Property.ReportCycle)
- if reportCycle == 0 {
- reportCycle = 1 * time.Second
- }
- ticker := time.NewTicker(reportCycle)
- go func() {
- for {
- select {
- case <-ticker.C:
- deviceData, err := client.GetDeviceData(visitorConfig)
- if err != nil {
- klog.Errorf("publish error: %v", err)
- continue
- }
- sData, err := common.ConvertToString(deviceData)
- if err != nil {
- klog.Errorf("Failed to convert publish method data : %v", err)
- continue
- }
- dataModel.SetValue(sData)
- dataModel.SetTimeStamp()
+ dbRedis.DataHandler(ctx, twin, client, visitorConfig, dataModel)
- err = dbConfig.AddData(dataModel)
- if err != nil {
- klog.Errorf("redis database add data error: %v", err)
- return
- }
- case <-ctx.Done():
- dbConfig.CloseSession()
- return
- }
- }
- }()
case "tdengine":
- dbConfig, err := dbTdengine.NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.TDEngineClientConfig)
- if err != nil {
- klog.Errorf("new database client error: %v", err)
- return
- }
- err = dbConfig.InitDbClient()
- if err != nil {
- klog.Errorf("init database client err: %v", err)
- return
- }
- reportCycle := time.Duration(twin.Property.ReportCycle)
- if reportCycle == 0 {
- reportCycle = 1 * time.Second
- }
- ticker := time.NewTicker(reportCycle)
- go func() {
- for {
- select {
- case <-ticker.C:
- deviceData, err := client.GetDeviceData(visitorConfig)
- if err != nil {
- klog.Errorf("publish error: %v", err)
- continue
- }
- sData, err := common.ConvertToString(deviceData)
- if err != nil {
- klog.Errorf("Failed to convert publish method data : %v", err)
- continue
- }
- dataModel.SetValue(sData)
- dataModel.SetTimeStamp()
-
- err = dbConfig.AddData(dataModel)
- if err != nil {
- klog.Errorf("tdengine database add data error: %v", err)
- return
- }
- case <-ctx.Done():
- dbConfig.CloseSessio()
- return
- }
- }
- }()
+ dbTdengine.DataHandler(ctx, twin, client, visitorConfig, dataModel)
}
}