diff options
| author | jiawei <jiawei.liu@daocloud.io> | 2024-06-01 22:34:52 +0800 |
|---|---|---|
| committer | jiawei <jiawei.liu@daocloud.io> | 2024-07-19 11:13:42 +0800 |
| commit | 7402bf781701a0f74156721d9cc0d879d2a97bc7 (patch) | |
| tree | 10abb4547ec6312fd15bea98afa1374a49a319c6 | |
| parent | Merge pull request #5544 from WillardHu/enhance-certs (diff) | |
| download | kubeedge-7402bf781701a0f74156721d9cc0d879d2a97bc7.tar.gz | |
cloud support device state
Signed-off-by: jiawei <jiawei.liu@daocloud.io>
| -rw-r--r-- | build/crds/devices/devices_v1beta1_device.yaml | 6 | ||||
| -rw-r--r-- | cloud/pkg/common/messagelayer/util.go | 4 | ||||
| -rw-r--r-- | cloud/pkg/devicecontroller/constants/service.go | 1 | ||||
| -rw-r--r-- | cloud/pkg/devicecontroller/controller/upstream.go | 89 | ||||
| -rw-r--r-- | cloud/pkg/devicecontroller/types/device.go | 20 | ||||
| -rw-r--r-- | common/constants/default.go | 3 | ||||
| -rw-r--r-- | manifests/charts/cloudcore/crds/devices_v1beta1_device.yaml | 6 | ||||
| -rw-r--r-- | pkg/apis/componentconfig/cloudcore/v1alpha1/default.go | 3 | ||||
| -rw-r--r-- | pkg/apis/componentconfig/cloudcore/v1alpha1/types.go | 7 | ||||
| -rw-r--r-- | pkg/apis/devices/v1beta1/device_instance_types.go | 11 |
10 files changed, 129 insertions, 21 deletions
diff --git a/build/crds/devices/devices_v1beta1_device.yaml b/build/crds/devices/devices_v1beta1_device.yaml index 69640846a..2d7e28203 100644 --- a/build/crds/devices/devices_v1beta1_device.yaml +++ b/build/crds/devices/devices_v1beta1_device.yaml @@ -755,6 +755,12 @@ spec: description: DeviceStatus reports the device state and the desired/reported values of twin attributes. properties: + lastOnlineTime: + description: 'Optional: The last time the device was online.' + type: string + state: + description: 'Optional: The state of the device.' + type: string twins: description: 'A list of device twins containing desired/reported desired/reported values of twin properties. Optional: A passive device won''t have diff --git a/cloud/pkg/common/messagelayer/util.go b/cloud/pkg/common/messagelayer/util.go index 1677043e7..3b14c7c78 100644 --- a/cloud/pkg/common/messagelayer/util.go +++ b/cloud/pkg/common/messagelayer/util.go @@ -42,6 +42,7 @@ const ( ResourceDevice = "device" ResourceTypeTwinEdgeUpdated = "twin/edge_updated" ResourceTypeMembershipDetail = "membership/detail" + ResourceDeviceStateUpdated = "state/update" ) // BuildResource return a string as "beehive/pkg/core/model".Message.Router.Resource @@ -143,7 +144,8 @@ func GetResourceTypeForDevice(resource string) (string, error) { return ResourceTypeTwinEdgeUpdated, nil } else if strings.Contains(resource, ResourceTypeMembershipDetail) { return ResourceTypeMembershipDetail, nil + } else if strings.Contains(resource, ResourceDeviceStateUpdated) { + return ResourceDeviceStateUpdated, nil } - return "", fmt.Errorf("unknown resource, found: %s", resource) } diff --git a/cloud/pkg/devicecontroller/constants/service.go b/cloud/pkg/devicecontroller/constants/service.go index e0d601d7c..9089214e4 100644 --- a/cloud/pkg/devicecontroller/constants/service.go +++ b/cloud/pkg/devicecontroller/constants/service.go @@ -4,6 +4,7 @@ package constants const ( ResourceTypeTwinEdgeUpdated = "twin/edge_updated" ResourceTypeMembershipDetail = "membership/detail" + ResourceDeviceStateUpdated = "state/update" // Group GroupTwin = "twin" diff --git a/cloud/pkg/devicecontroller/controller/upstream.go b/cloud/pkg/devicecontroller/controller/upstream.go index 817abc352..a9a9ae4b3 100644 --- a/cloud/pkg/devicecontroller/controller/upstream.go +++ b/cloud/pkg/devicecontroller/controller/upstream.go @@ -53,9 +53,10 @@ const ( type UpstreamController struct { crdClient crdClientset.Interface messageLayer messagelayer.MessageLayer - // message channel - deviceStatusChan chan model.Message - + // deviceTwinsChan message channel + deviceTwinsChan chan model.Message + // deviceStates message channel + deviceStatesChan chan model.Message // downstream controller to update device status in cache dc *DownstreamController } @@ -64,7 +65,8 @@ type UpstreamController struct { func (uc *UpstreamController) Start() error { klog.Info("Start upstream devicecontroller") - uc.deviceStatusChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStatus) + uc.deviceTwinsChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceTwins) + uc.deviceStatesChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStates) go uc.dispatchMessage() for i := 0; i < int(config.Config.Load.UpdateDeviceStatusWorkers); i++ { @@ -98,7 +100,9 @@ func (uc *UpstreamController) dispatchMessage() { switch resourceType { case constants.ResourceTypeTwinEdgeUpdated: - uc.deviceStatusChan <- msg + uc.deviceTwinsChan <- msg + case constants.ResourceDeviceStateUpdated: + uc.deviceStatesChan <- msg case constants.ResourceTypeMembershipDetail: default: klog.Warningf("Message: %s, with resource type: %s not intended for device controller", msg.GetID(), resourceType) @@ -112,7 +116,67 @@ func (uc *UpstreamController) updateDeviceStatus() { case <-beehiveContext.Done(): klog.Info("Stop updateDeviceStatus") return - case msg := <-uc.deviceStatusChan: + case msg := <-uc.deviceStatesChan: + klog.Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource()) + msgState, err := uc.unmarshalDeviceStatesMessage(msg) + if err != nil { + klog.Warningf("Unmarshall failed due to error %v", err) + continue + } + deviceID, err := messagelayer.GetDeviceID(msg.GetResource()) + if err != nil { + klog.Warning("Failed to get device id") + continue + } + device, ok := uc.dc.deviceManager.Device.Load(deviceID) + if !ok { + klog.Warningf("Device %s does not exist in upstream controller", deviceID) + continue + } + cacheDevice, ok := device.(*v1beta1.Device) + if !ok { + klog.Warning("Failed to assert to CacheDevice type") + continue + } + + // Store the status in cache so that when update is received by informer, it is not processed by downstream controller + cacheDevice.Status.State = msgState.Device.State + cacheDevice.Status.LastOnlineTime = msgState.Device.LastOnlineTime + uc.dc.deviceManager.Device.Store(deviceID, cacheDevice) + + body, err := json.Marshal(cacheDevice.Status) + if err != nil { + klog.Errorf("Failed to marshal device states %v", cacheDevice.Status) + continue + } + err = uc.crdClient.DevicesV1beta1().RESTClient().Patch(MergePatchType).Namespace(cacheDevice.Namespace).Resource(ResourceTypeDevices).Name(cacheDevice.Name).Body(body).Do(context.Background()).Error() + if err != nil { + klog.Errorf("Failed to patch device states %v of device %v in namespace %v, err: %v", cacheDevice, + deviceID, cacheDevice.Namespace, err) + continue + } + + //send confirm message to edge twin + resMsg := model.NewMessage(msg.GetID()) + nodeID, err := messagelayer.GetNodeID(msg) + if err != nil { + klog.Warningf("Message: %s process failure, get node id failed with error: %s", msg.GetID(), err) + continue + } + resource, err := messagelayer.BuildResourceForDevice(nodeID, "twin", "") + if err != nil { + klog.Warningf("Message: %s process failure, build message resource failed with error: %s", msg.GetID(), err) + continue + } + resMsg.BuildRouter(modules.DeviceControllerModuleName, constants.GroupTwin, resource, model.ResponseOperation) + resMsg.Content = commonconst.MessageSuccessfulContent + err = uc.messageLayer.Response(*resMsg) + if err != nil { + klog.Warningf("Message: %s process failure, response failed with error: %s", msg.GetID(), err) + continue + } + klog.Infof("Message: %s process successfully", msg.GetID()) + case msg := <-uc.deviceTwinsChan: klog.Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource()) msgTwin, err := uc.unmarshalDeviceStatusMessage(msg) if err != nil { @@ -217,6 +281,19 @@ func (uc *UpstreamController) unmarshalDeviceStatusMessage(msg model.Message) (* return twinUpdate, nil } +func (uc *UpstreamController) unmarshalDeviceStatesMessage(msg model.Message) (*types.DeviceStateUpdate, error) { + contentData, err := msg.GetContentData() + if err != nil { + return nil, err + } + + stateUpdate := &types.DeviceStateUpdate{} + if err := json.Unmarshal(contentData, stateUpdate); err != nil { + return nil, err + } + return stateUpdate, nil +} + // NewUpstreamController create UpstreamController from config func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) { uc := &UpstreamController{ diff --git a/cloud/pkg/devicecontroller/types/device.go b/cloud/pkg/devicecontroller/types/device.go index 6eb4a7fea..ea8f7158c 100644 --- a/cloud/pkg/devicecontroller/types/device.go +++ b/cloud/pkg/devicecontroller/types/device.go @@ -2,13 +2,13 @@ package types // Device the struct of device type Device struct { - ID string `json:"id,omitempty"` - Name string `json:"name,omitempty"` - Description string `json:"description,omitempty"` - State string `json:"state,omitempty"` - LastOnline string `json:"last_online,omitempty"` - Attributes map[string]*MsgAttr `json:"attributes,omitempty"` - Twin map[string]*MsgTwin `json:"twin,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Description string `json:"description,omitempty"` + State string `json:"state,omitempty"` + LastOnlineTime string `json:"lastOnlineTime,omitempty"` + Attributes map[string]*MsgAttr `json:"attributes,omitempty"` + Twin map[string]*MsgTwin `json:"twin,omitempty"` } // BaseMessage the base struct of event message @@ -106,3 +106,9 @@ type DeviceTwinUpdate struct { BaseMessage Twin map[string]*MsgTwin `json:"twin"` } + +// DeviceStateUpdate the struct of device state update +type DeviceStateUpdate struct { + BaseMessage + Device Device +} diff --git a/common/constants/default.go b/common/constants/default.go index db91af7b3..216e721ac 100644 --- a/common/constants/default.go +++ b/common/constants/default.go @@ -112,7 +112,8 @@ const ( DefaultRuleEndpointsEventBuffer = 1 // DeviceController - DefaultUpdateDeviceStatusBuffer = 1024 + DefaultUpdateDeviceTwinsBuffer = 1024 + DefaultUpdateDeviceStatesBuffer = 1024 DefaultDeviceEventBuffer = 1 DefaultDeviceModelEventBuffer = 1 DefaultUpdateDeviceStatusWorkers = 1 diff --git a/manifests/charts/cloudcore/crds/devices_v1beta1_device.yaml b/manifests/charts/cloudcore/crds/devices_v1beta1_device.yaml index 69640846a..2d7e28203 100644 --- a/manifests/charts/cloudcore/crds/devices_v1beta1_device.yaml +++ b/manifests/charts/cloudcore/crds/devices_v1beta1_device.yaml @@ -755,6 +755,12 @@ spec: description: DeviceStatus reports the device state and the desired/reported values of twin attributes. properties: + lastOnlineTime: + description: 'Optional: The last time the device was online.' + type: string + state: + description: 'Optional: The state of the device.' + type: string twins: description: 'A list of device twins containing desired/reported desired/reported values of twin properties. Optional: A passive device won''t have diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go index ed91a4e19..9aa5cde0c 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go @@ -101,7 +101,8 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig { DeviceController: &DeviceController{ Enable: true, Buffer: &DeviceControllerBuffer{ - UpdateDeviceStatus: constants.DefaultUpdateDeviceStatusBuffer, + UpdateDeviceTwins: constants.DefaultUpdateDeviceTwinsBuffer, + UpdateDeviceStates: constants.DefaultUpdateDeviceStatesBuffer, DeviceEvent: constants.DefaultDeviceEventBuffer, DeviceModelEvent: constants.DefaultDeviceModelEventBuffer, }, diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go index a1819fe96..95001e2cc 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go @@ -396,9 +396,12 @@ type DeviceController struct { // DeviceControllerBuffer indicates deviceController buffer type DeviceControllerBuffer struct { - // UpdateDeviceStatus indicates the buffer of update device status + // UpdateDeviceTwins indicates the buffer of update device twins // default 1024 - UpdateDeviceStatus int32 `json:"updateDeviceStatus,omitempty"` + UpdateDeviceTwins int32 `json:"updateDeviceTwins,omitempty"` + // UpdateDeviceStates indicates the buffer of update device states + // default 1024 + UpdateDeviceStates int32 `json:"updateDeviceStatus,omitempty"` // DeviceEvent indicates the buffer of device event // default 1 DeviceEvent int32 `json:"deviceEvent,omitempty"` diff --git a/pkg/apis/devices/v1beta1/device_instance_types.go b/pkg/apis/devices/v1beta1/device_instance_types.go index 20dbb1f8e..2ab13cbae 100644 --- a/pkg/apis/devices/v1beta1/device_instance_types.go +++ b/pkg/apis/devices/v1beta1/device_instance_types.go @@ -47,6 +47,12 @@ type DeviceStatus struct { // Optional: A passive device won't have twin properties and this list could be empty. // +optional Twins []Twin `json:"twins,omitempty"` + // Optional: The state of the device. + // +optional + State string `json:"state,omitempty"` + // Optional: The last time the device was online. + // +optional + LastOnlineTime string `json:"lastOnlineTime,omitempty"` } // Twin provides a logical representation of control properties (writable properties in the @@ -263,9 +269,8 @@ type VisitorConfig struct { type Device struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - - Spec DeviceSpec `json:"spec,omitempty"` - Status DeviceStatus `json:"status,omitempty"` + Spec DeviceSpec `json:"spec,omitempty"` + Status DeviceStatus `json:"status,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object |
