summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKubeEdge Bot <48982446+kubeedge-bot@users.noreply.github.com>2024-07-19 16:45:02 +0800
committerGitHub <noreply@github.com>2024-07-19 16:45:02 +0800
commit7358af215b7b3c785b9529f0a9ac25e5ed4b6632 (patch)
treed362883a27765f8edeb0aaf8f150a08546148bc8
parentMerge pull request #5742 from 1Shubham7/misspell (diff)
parentcloud support device state (diff)
downloadkubeedge-7358af215b7b3c785b9529f0a9ac25e5ed4b6632.tar.gz
Merge pull request #5649 from JiaweiGithub/feat/dmi_state_cloudv1.18.0-beta.0
cloud support device state
-rw-r--r--build/crds/devices/devices_v1beta1_device.yaml6
-rw-r--r--cloud/pkg/common/messagelayer/util.go4
-rw-r--r--cloud/pkg/devicecontroller/constants/service.go1
-rw-r--r--cloud/pkg/devicecontroller/controller/upstream.go89
-rw-r--r--cloud/pkg/devicecontroller/types/device.go20
-rw-r--r--common/constants/default.go3
-rw-r--r--manifests/charts/cloudcore/crds/devices_v1beta1_device.yaml6
-rw-r--r--pkg/apis/componentconfig/cloudcore/v1alpha1/default.go3
-rw-r--r--pkg/apis/componentconfig/cloudcore/v1alpha1/types.go7
-rw-r--r--pkg/apis/devices/v1beta1/device_instance_types.go11
10 files changed, 129 insertions, 21 deletions
diff --git a/build/crds/devices/devices_v1beta1_device.yaml b/build/crds/devices/devices_v1beta1_device.yaml
index 69640846a..2d7e28203 100644
--- a/build/crds/devices/devices_v1beta1_device.yaml
+++ b/build/crds/devices/devices_v1beta1_device.yaml
@@ -755,6 +755,12 @@ spec:
description: DeviceStatus reports the device state and the desired/reported
values of twin attributes.
properties:
+ lastOnlineTime:
+ description: 'Optional: The last time the device was online.'
+ type: string
+ state:
+ description: 'Optional: The state of the device.'
+ type: string
twins:
description: 'A list of device twins containing desired/reported desired/reported
values of twin properties. Optional: A passive device won''t have
diff --git a/cloud/pkg/common/messagelayer/util.go b/cloud/pkg/common/messagelayer/util.go
index 1677043e7..3b14c7c78 100644
--- a/cloud/pkg/common/messagelayer/util.go
+++ b/cloud/pkg/common/messagelayer/util.go
@@ -42,6 +42,7 @@ const (
ResourceDevice = "device"
ResourceTypeTwinEdgeUpdated = "twin/edge_updated"
ResourceTypeMembershipDetail = "membership/detail"
+ ResourceDeviceStateUpdated = "state/update"
)
// BuildResource return a string as "beehive/pkg/core/model".Message.Router.Resource
@@ -143,7 +144,8 @@ func GetResourceTypeForDevice(resource string) (string, error) {
return ResourceTypeTwinEdgeUpdated, nil
} else if strings.Contains(resource, ResourceTypeMembershipDetail) {
return ResourceTypeMembershipDetail, nil
+ } else if strings.Contains(resource, ResourceDeviceStateUpdated) {
+ return ResourceDeviceStateUpdated, nil
}
-
return "", fmt.Errorf("unknown resource, found: %s", resource)
}
diff --git a/cloud/pkg/devicecontroller/constants/service.go b/cloud/pkg/devicecontroller/constants/service.go
index e0d601d7c..9089214e4 100644
--- a/cloud/pkg/devicecontroller/constants/service.go
+++ b/cloud/pkg/devicecontroller/constants/service.go
@@ -4,6 +4,7 @@ package constants
const (
ResourceTypeTwinEdgeUpdated = "twin/edge_updated"
ResourceTypeMembershipDetail = "membership/detail"
+ ResourceDeviceStateUpdated = "state/update"
// Group
GroupTwin = "twin"
diff --git a/cloud/pkg/devicecontroller/controller/upstream.go b/cloud/pkg/devicecontroller/controller/upstream.go
index 817abc352..a9a9ae4b3 100644
--- a/cloud/pkg/devicecontroller/controller/upstream.go
+++ b/cloud/pkg/devicecontroller/controller/upstream.go
@@ -53,9 +53,10 @@ const (
type UpstreamController struct {
crdClient crdClientset.Interface
messageLayer messagelayer.MessageLayer
- // message channel
- deviceStatusChan chan model.Message
-
+ // deviceTwinsChan message channel
+ deviceTwinsChan chan model.Message
+ // deviceStates message channel
+ deviceStatesChan chan model.Message
// downstream controller to update device status in cache
dc *DownstreamController
}
@@ -64,7 +65,8 @@ type UpstreamController struct {
func (uc *UpstreamController) Start() error {
klog.Info("Start upstream devicecontroller")
- uc.deviceStatusChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStatus)
+ uc.deviceTwinsChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceTwins)
+ uc.deviceStatesChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStates)
go uc.dispatchMessage()
for i := 0; i < int(config.Config.Load.UpdateDeviceStatusWorkers); i++ {
@@ -98,7 +100,9 @@ func (uc *UpstreamController) dispatchMessage() {
switch resourceType {
case constants.ResourceTypeTwinEdgeUpdated:
- uc.deviceStatusChan <- msg
+ uc.deviceTwinsChan <- msg
+ case constants.ResourceDeviceStateUpdated:
+ uc.deviceStatesChan <- msg
case constants.ResourceTypeMembershipDetail:
default:
klog.Warningf("Message: %s, with resource type: %s not intended for device controller", msg.GetID(), resourceType)
@@ -112,7 +116,67 @@ func (uc *UpstreamController) updateDeviceStatus() {
case <-beehiveContext.Done():
klog.Info("Stop updateDeviceStatus")
return
- case msg := <-uc.deviceStatusChan:
+ case msg := <-uc.deviceStatesChan:
+ klog.Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource())
+ msgState, err := uc.unmarshalDeviceStatesMessage(msg)
+ if err != nil {
+ klog.Warningf("Unmarshall failed due to error %v", err)
+ continue
+ }
+ deviceID, err := messagelayer.GetDeviceID(msg.GetResource())
+ if err != nil {
+ klog.Warning("Failed to get device id")
+ continue
+ }
+ device, ok := uc.dc.deviceManager.Device.Load(deviceID)
+ if !ok {
+ klog.Warningf("Device %s does not exist in upstream controller", deviceID)
+ continue
+ }
+ cacheDevice, ok := device.(*v1beta1.Device)
+ if !ok {
+ klog.Warning("Failed to assert to CacheDevice type")
+ continue
+ }
+
+ // Store the status in cache so that when update is received by informer, it is not processed by downstream controller
+ cacheDevice.Status.State = msgState.Device.State
+ cacheDevice.Status.LastOnlineTime = msgState.Device.LastOnlineTime
+ uc.dc.deviceManager.Device.Store(deviceID, cacheDevice)
+
+ body, err := json.Marshal(cacheDevice.Status)
+ if err != nil {
+ klog.Errorf("Failed to marshal device states %v", cacheDevice.Status)
+ continue
+ }
+ err = uc.crdClient.DevicesV1beta1().RESTClient().Patch(MergePatchType).Namespace(cacheDevice.Namespace).Resource(ResourceTypeDevices).Name(cacheDevice.Name).Body(body).Do(context.Background()).Error()
+ if err != nil {
+ klog.Errorf("Failed to patch device states %v of device %v in namespace %v, err: %v", cacheDevice,
+ deviceID, cacheDevice.Namespace, err)
+ continue
+ }
+
+ //send confirm message to edge twin
+ resMsg := model.NewMessage(msg.GetID())
+ nodeID, err := messagelayer.GetNodeID(msg)
+ if err != nil {
+ klog.Warningf("Message: %s process failure, get node id failed with error: %s", msg.GetID(), err)
+ continue
+ }
+ resource, err := messagelayer.BuildResourceForDevice(nodeID, "twin", "")
+ if err != nil {
+ klog.Warningf("Message: %s process failure, build message resource failed with error: %s", msg.GetID(), err)
+ continue
+ }
+ resMsg.BuildRouter(modules.DeviceControllerModuleName, constants.GroupTwin, resource, model.ResponseOperation)
+ resMsg.Content = commonconst.MessageSuccessfulContent
+ err = uc.messageLayer.Response(*resMsg)
+ if err != nil {
+ klog.Warningf("Message: %s process failure, response failed with error: %s", msg.GetID(), err)
+ continue
+ }
+ klog.Infof("Message: %s process successfully", msg.GetID())
+ case msg := <-uc.deviceTwinsChan:
klog.Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource())
msgTwin, err := uc.unmarshalDeviceStatusMessage(msg)
if err != nil {
@@ -217,6 +281,19 @@ func (uc *UpstreamController) unmarshalDeviceStatusMessage(msg model.Message) (*
return twinUpdate, nil
}
+func (uc *UpstreamController) unmarshalDeviceStatesMessage(msg model.Message) (*types.DeviceStateUpdate, error) {
+ contentData, err := msg.GetContentData()
+ if err != nil {
+ return nil, err
+ }
+
+ stateUpdate := &types.DeviceStateUpdate{}
+ if err := json.Unmarshal(contentData, stateUpdate); err != nil {
+ return nil, err
+ }
+ return stateUpdate, nil
+}
+
// NewUpstreamController create UpstreamController from config
func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) {
uc := &UpstreamController{
diff --git a/cloud/pkg/devicecontroller/types/device.go b/cloud/pkg/devicecontroller/types/device.go
index 6eb4a7fea..ea8f7158c 100644
--- a/cloud/pkg/devicecontroller/types/device.go
+++ b/cloud/pkg/devicecontroller/types/device.go
@@ -2,13 +2,13 @@ package types
// Device the struct of device
type Device struct {
- ID string `json:"id,omitempty"`
- Name string `json:"name,omitempty"`
- Description string `json:"description,omitempty"`
- State string `json:"state,omitempty"`
- LastOnline string `json:"last_online,omitempty"`
- Attributes map[string]*MsgAttr `json:"attributes,omitempty"`
- Twin map[string]*MsgTwin `json:"twin,omitempty"`
+ ID string `json:"id,omitempty"`
+ Name string `json:"name,omitempty"`
+ Description string `json:"description,omitempty"`
+ State string `json:"state,omitempty"`
+ LastOnlineTime string `json:"lastOnlineTime,omitempty"`
+ Attributes map[string]*MsgAttr `json:"attributes,omitempty"`
+ Twin map[string]*MsgTwin `json:"twin,omitempty"`
}
// BaseMessage the base struct of event message
@@ -106,3 +106,9 @@ type DeviceTwinUpdate struct {
BaseMessage
Twin map[string]*MsgTwin `json:"twin"`
}
+
+// DeviceStateUpdate the struct of device state update
+type DeviceStateUpdate struct {
+ BaseMessage
+ Device Device
+}
diff --git a/common/constants/default.go b/common/constants/default.go
index db91af7b3..216e721ac 100644
--- a/common/constants/default.go
+++ b/common/constants/default.go
@@ -112,7 +112,8 @@ const (
DefaultRuleEndpointsEventBuffer = 1
// DeviceController
- DefaultUpdateDeviceStatusBuffer = 1024
+ DefaultUpdateDeviceTwinsBuffer = 1024
+ DefaultUpdateDeviceStatesBuffer = 1024
DefaultDeviceEventBuffer = 1
DefaultDeviceModelEventBuffer = 1
DefaultUpdateDeviceStatusWorkers = 1
diff --git a/manifests/charts/cloudcore/crds/devices_v1beta1_device.yaml b/manifests/charts/cloudcore/crds/devices_v1beta1_device.yaml
index 69640846a..2d7e28203 100644
--- a/manifests/charts/cloudcore/crds/devices_v1beta1_device.yaml
+++ b/manifests/charts/cloudcore/crds/devices_v1beta1_device.yaml
@@ -755,6 +755,12 @@ spec:
description: DeviceStatus reports the device state and the desired/reported
values of twin attributes.
properties:
+ lastOnlineTime:
+ description: 'Optional: The last time the device was online.'
+ type: string
+ state:
+ description: 'Optional: The state of the device.'
+ type: string
twins:
description: 'A list of device twins containing desired/reported desired/reported
values of twin properties. Optional: A passive device won''t have
diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go
index ed91a4e19..9aa5cde0c 100644
--- a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go
+++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go
@@ -101,7 +101,8 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig {
DeviceController: &DeviceController{
Enable: true,
Buffer: &DeviceControllerBuffer{
- UpdateDeviceStatus: constants.DefaultUpdateDeviceStatusBuffer,
+ UpdateDeviceTwins: constants.DefaultUpdateDeviceTwinsBuffer,
+ UpdateDeviceStates: constants.DefaultUpdateDeviceStatesBuffer,
DeviceEvent: constants.DefaultDeviceEventBuffer,
DeviceModelEvent: constants.DefaultDeviceModelEventBuffer,
},
diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go
index a1819fe96..95001e2cc 100644
--- a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go
+++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go
@@ -396,9 +396,12 @@ type DeviceController struct {
// DeviceControllerBuffer indicates deviceController buffer
type DeviceControllerBuffer struct {
- // UpdateDeviceStatus indicates the buffer of update device status
+ // UpdateDeviceTwins indicates the buffer of update device twins
// default 1024
- UpdateDeviceStatus int32 `json:"updateDeviceStatus,omitempty"`
+ UpdateDeviceTwins int32 `json:"updateDeviceTwins,omitempty"`
+ // UpdateDeviceStates indicates the buffer of update device states
+ // default 1024
+ UpdateDeviceStates int32 `json:"updateDeviceStatus,omitempty"`
// DeviceEvent indicates the buffer of device event
// default 1
DeviceEvent int32 `json:"deviceEvent,omitempty"`
diff --git a/pkg/apis/devices/v1beta1/device_instance_types.go b/pkg/apis/devices/v1beta1/device_instance_types.go
index 20dbb1f8e..2ab13cbae 100644
--- a/pkg/apis/devices/v1beta1/device_instance_types.go
+++ b/pkg/apis/devices/v1beta1/device_instance_types.go
@@ -47,6 +47,12 @@ type DeviceStatus struct {
// Optional: A passive device won't have twin properties and this list could be empty.
// +optional
Twins []Twin `json:"twins,omitempty"`
+ // Optional: The state of the device.
+ // +optional
+ State string `json:"state,omitempty"`
+ // Optional: The last time the device was online.
+ // +optional
+ LastOnlineTime string `json:"lastOnlineTime,omitempty"`
}
// Twin provides a logical representation of control properties (writable properties in the
@@ -263,9 +269,8 @@ type VisitorConfig struct {
type Device struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
-
- Spec DeviceSpec `json:"spec,omitempty"`
- Status DeviceStatus `json:"status,omitempty"`
+ Spec DeviceSpec `json:"spec,omitempty"`
+ Status DeviceStatus `json:"status,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object