diff options
18 files changed, 611 insertions, 25 deletions
diff --git a/build/crds/devices/devices_v1beta1_device.yaml b/build/crds/devices/devices_v1beta1_device.yaml index 9602a5bbc..69640846a 100644 --- a/build/crds/devices/devices_v1beta1_device.yaml +++ b/build/crds/devices/devices_v1beta1_device.yaml @@ -588,7 +588,9 @@ spec: type: object name: description: 'Required: The device property name to be accessed. - It must be unique.' + It must be unique. Note: If you need to use the built-in stream + data processing function, you need to define Name as saveFrame + or saveVideo' type: string pushMethod: description: PushMethod represents the protocol used to push diff --git a/build/crds/devices/devices_v1beta1_devicemodel.yaml b/build/crds/devices/devices_v1beta1_devicemodel.yaml index 044d6194e..0022c6d00 100644 --- a/build/crds/devices/devices_v1beta1_devicemodel.yaml +++ b/build/crds/devices/devices_v1beta1_devicemodel.yaml @@ -201,10 +201,12 @@ spec: minimum: type: string name: - description: 'Required: The device property name.' + description: 'Required: The device property name. Note: If you + need to use the built-in stream data processing function, + you need to define Name as saveFrame or saveVideo' type: string type: - description: 'Required: Type of device property, ENUM: INT,FLOAT,DOUBLE,STRING,BOOLEAN,BYTES' + description: 'Required: Type of device property, ENUM: INT,FLOAT,DOUBLE,STRING,BOOLEAN,BYTES,STREAM' enum: - INT - FLOAT @@ -212,6 +214,7 @@ spec: - STRING - BOOLEAN - BYTES + - STREAM type: string unit: description: The unit of the property diff --git a/manifests/charts/cloudcore/crds/devices_v1beta1_device.yaml b/manifests/charts/cloudcore/crds/devices_v1beta1_device.yaml index 9602a5bbc..69640846a 100644 --- a/manifests/charts/cloudcore/crds/devices_v1beta1_device.yaml +++ b/manifests/charts/cloudcore/crds/devices_v1beta1_device.yaml @@ -588,7 +588,9 @@ spec: type: object name: description: 'Required: The device property name to be accessed. - It must be unique.' + It must be unique. Note: If you need to use the built-in stream + data processing function, you need to define Name as saveFrame + or saveVideo' type: string pushMethod: description: PushMethod represents the protocol used to push diff --git a/manifests/charts/cloudcore/crds/devices_v1beta1_devicemodel.yaml b/manifests/charts/cloudcore/crds/devices_v1beta1_devicemodel.yaml index 044d6194e..0022c6d00 100644 --- a/manifests/charts/cloudcore/crds/devices_v1beta1_devicemodel.yaml +++ b/manifests/charts/cloudcore/crds/devices_v1beta1_devicemodel.yaml @@ -201,10 +201,12 @@ spec: minimum: type: string name: - description: 'Required: The device property name.' + description: 'Required: The device property name. Note: If you + need to use the built-in stream data processing function, + you need to define Name as saveFrame or saveVideo' type: string type: - description: 'Required: Type of device property, ENUM: INT,FLOAT,DOUBLE,STRING,BOOLEAN,BYTES' + description: 'Required: Type of device property, ENUM: INT,FLOAT,DOUBLE,STRING,BOOLEAN,BYTES,STREAM' enum: - INT - FLOAT @@ -212,6 +214,7 @@ spec: - STRING - BOOLEAN - BYTES + - STREAM type: string unit: description: The unit of the property diff --git a/pkg/apis/devices/v1beta1/device_instance_types.go b/pkg/apis/devices/v1beta1/device_instance_types.go index f00af8227..20dbb1f8e 100644 --- a/pkg/apis/devices/v1beta1/device_instance_types.go +++ b/pkg/apis/devices/v1beta1/device_instance_types.go @@ -92,6 +92,7 @@ type ProtocolConfig struct { // DeviceProperty describes the specifics all the properties of the device. type DeviceProperty struct { // Required: The device property name to be accessed. It must be unique. + // Note: If you need to use the built-in stream data processing function, you need to define Name as saveFrame or saveVideo Name string `json:"name,omitempty"` // The desired property value. Desired TwinProperty `json:"desired,omitempty"` diff --git a/pkg/apis/devices/v1beta1/device_model_types.go b/pkg/apis/devices/v1beta1/device_model_types.go index 601d7410f..c4964ad1c 100644 --- a/pkg/apis/devices/v1beta1/device_model_types.go +++ b/pkg/apis/devices/v1beta1/device_model_types.go @@ -32,11 +32,12 @@ type DeviceModelSpec struct { // ModelProperty describes an individual device property / attribute like temperature / humidity etc. type ModelProperty struct { // Required: The device property name. + // Note: If you need to use the built-in stream data processing function, you need to define Name as saveFrame or saveVideo Name string `json:"name,omitempty"` // The device property description. // +optional Description string `json:"description,omitempty"` - // Required: Type of device property, ENUM: INT,FLOAT,DOUBLE,STRING,BOOLEAN,BYTES + // Required: Type of device property, ENUM: INT,FLOAT,DOUBLE,STRING,BOOLEAN,BYTES,STREAM Type PropertyType `json:"type,omitempty"` // Required: Access mode of property, ReadWrite or ReadOnly. AccessMode PropertyAccessMode `json:"accessMode,omitempty"` @@ -50,7 +51,7 @@ type ModelProperty struct { } // The type of device property. -// +kubebuilder:validation:Enum=INT;FLOAT;DOUBLE;STRING;BOOLEAN;BYTES +// +kubebuilder:validation:Enum=INT;FLOAT;DOUBLE;STRING;BOOLEAN;BYTES;STREAM type PropertyType string const ( @@ -60,6 +61,7 @@ const ( STRING PropertyType = "STRING" BOOLEAN PropertyType = "BOOLEAN" BYTES PropertyType = "BYTES" + STREAM PropertyType = "STREAM" ) // The access mode for a device property. diff --git a/staging/src/github.com/kubeedge/mapper-framework/README.md b/staging/src/github.com/kubeedge/mapper-framework/README.md index ce3384b74..0828e06d4 100644 --- a/staging/src/github.com/kubeedge/mapper-framework/README.md +++ b/staging/src/github.com/kubeedge/mapper-framework/README.md @@ -11,6 +11,7 @@ The command below will generate a framework for the customized mapper. Run the c ```shell make generate Please input the mapper name (like 'Bluetooth', 'BLE'): foo +Please input the build method (like 'stream', 'nostream'): nostream ``` A project named as your input will be generated. The file tree is as below: ``` @@ -42,5 +43,18 @@ mapper └── Makefile ``` +## 2. Generate the mapper project +After generating the mapper project and filling driver folder, users can make their own mapper image +based on the Dockerfile file and deploy the mapper in the cluster through deployment and other methods. +If your mapper is aimed to processing streaming data +```shell + docker build -f Dockerfile_stream -t [YOUR MAPPER IMAGE NAME] . +``` +If not, Use the following command: +```shell + docker build -f Dockerfile_nostream -t [YOUR MAPPER IMAGE NAME] . +``` + # Where does it come from? -mapper-framework is synced from https://github.com/kubeedge/kubeedge/tree/master/staging/src/github.com/kubeedge/mapper-framework. Code changes are made in that location, merged into kubeedge and later synced here. +mapper-framework is synced from https://github.com/kubeedge/kubeedge/tree/master/staging/src/github.com/kubeedge/mapper-framework. +Code changes are made in that location, merged into kubeedge and later synced here.
\ No newline at end of file diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/Dockerfile b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/Dockerfile_nostream index 8feae0900..89bbbbdfc 100644 --- a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/Dockerfile +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/Dockerfile_nostream @@ -17,4 +17,4 @@ RUN mkdir -p kubeedge COPY --from=builder /build/main kubeedge/ COPY ./config.yaml kubeedge/ -WORKDIR kubeedge +WORKDIR kubeedge
\ No newline at end of file diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/Dockerfile_stream b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/Dockerfile_stream new file mode 100644 index 000000000..a5dfc2bdf --- /dev/null +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/Dockerfile_stream @@ -0,0 +1,35 @@ +FROM golang:1.20.10-bullseye AS builder + +WORKDIR /build + +ENV GO111MODULE=on \ + GOPROXY=https://goproxy.cn,direct + +COPY . . + +RUN apt-get update && \ + apt-get install -y bzip2 curl upx-ucl gcc-aarch64-linux-gnu libc6-dev-arm64-cross gcc-arm-linux-gnueabi libc6-dev-armel-cross libva-dev libva-drm2 libx11-dev libvdpau-dev libxext-dev libsdl1.2-dev libxcb1-dev libxau-dev libxdmcp-dev yasm + +RUN curl -sLO https://ffmpeg.org/releases/ffmpeg-4.1.6.tar.bz2 && \ + tar -jx --strip-components=1 -f ffmpeg-4.1.6.tar.bz2 && \ + ./configure && make && \ + make install + +RUN GOOS=linux go build -o main cmd/main.go + +FROM ubuntu:18.04 + +RUN mkdir -p kubeedge + +RUN apt-get update && \ + apt-get install -y bzip2 curl upx-ucl gcc-aarch64-linux-gnu libc6-dev-arm64-cross gcc-arm-linux-gnueabi libc6-dev-armel-cross libva-dev libva-drm2 libx11-dev libvdpau-dev libxext-dev libsdl1.2-dev libxcb1-dev libxau-dev libxdmcp-dev yasm + +RUN curl -sLO https://ffmpeg.org/releases/ffmpeg-4.1.6.tar.bz2 && \ + tar -jx --strip-components=1 -f ffmpeg-4.1.6.tar.bz2 && \ + ./configure && make && \ + make install + +COPY --from=builder /build/main kubeedge/ +COPY ./config.yaml kubeedge/ + +WORKDIR kubeedge
\ No newline at end of file diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/stream/handler.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/stream/handler.go new file mode 100644 index 000000000..548fa4b59 --- /dev/null +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/stream/handler.go @@ -0,0 +1,67 @@ +/* +Copyright 2024 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stream + +import ( + "encoding/json" + "fmt" + + "k8s.io/klog/v2" + + "github.com/kubeedge/Template/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +type StreamConfig struct { + Format string `json:"format"` + OutputDir string `json:"outputDir"` + FrameCount int `json:"frameCount"` + FrameInterval int `json:"frameInterval"` + VideoNum int `json:"videoNum"` +} + +func StreamHandler(twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig) error { + // Get RTSP URI from camera device + streamURI, err := client.GetDeviceData(visitorConfig) + if err != nil { + return err + } + + // parse streamConfig data from device visitorConfig + var streamConfig StreamConfig + visitorConfigData, err := json.Marshal(visitorConfig.VisitorConfigData) + err = json.Unmarshal(visitorConfigData, &streamConfig) + if err != nil { + return fmt.Errorf("Unmarshal streamConfigs error: %v", err) + } + + switch twin.PropertyName { + // Currently, the function of saving frames and saving videos is built-in according to the configuration. + // Other functions can be expanded here. + case common.SaveFrame: + err = SaveFrame(streamURI.(string), streamConfig.OutputDir, streamConfig.Format, streamConfig.FrameCount, streamConfig.FrameInterval) + case common.SaveVideo: + err = SaveVideo(streamURI.(string), streamConfig.OutputDir, streamConfig.Format, streamConfig.FrameCount, streamConfig.VideoNum) + default: + err = fmt.Errorf("cannot find the processing method for the corresponding Property %s of the stream data", twin.PropertyName) + } + if err != nil { + return err + } + klog.V(2).Infof("Successfully processed streaming data by %s", twin.PropertyName) + return nil +} diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/stream/handler_nostream.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/stream/handler_nostream.go new file mode 100644 index 000000000..4edabd536 --- /dev/null +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/stream/handler_nostream.go @@ -0,0 +1,33 @@ +/* +Copyright 2024 The KubeEdge Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stream + +import ( + "errors" + + "github.com/kubeedge/Template/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +type StreamConfig struct { + Format string `json:"format"` + OutputDir string `json:"outputDir"` + FrameCount int `json:"frameCount"` + FrameInterval int `json:"frameInterval"` + VideoNum int `json:"videoNum"` +} + +func StreamHandler(twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig) error { + return errors.New("need to add the stream flag when make generate if you want to enable stream data processing.") +} diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/stream/img.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/stream/img.go new file mode 100644 index 000000000..d7d05d002 --- /dev/null +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/stream/img.go @@ -0,0 +1,243 @@ +/* +Copyright 2024 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stream + +import ( + "errors" + "fmt" + "time" + "unsafe" + + "github.com/sailorvii/goav/avcodec" + "github.com/sailorvii/goav/avformat" + "github.com/sailorvii/goav/avutil" + "github.com/sailorvii/goav/swscale" + "k8s.io/klog/v2" +) + +// GenFileName generate file name with current time. Formate f<year><month><day><hour><minute><second><millisecond>.<format> +func GenFileName(dir string, format string) string { + return fmt.Sprintf("%s/f%s.%s", dir, time.Now().Format(time.RFC3339Nano), format) +} + +func save(frame *avutil.Frame, width int, height int, dir string, format string) error { + // Save video frames to picture file + outputFile := GenFileName(dir, format) + var outputFmtCtx *avformat.Context + avformat.AvAllocOutputContext2(&outputFmtCtx, nil, nil, &outputFile) + if outputFmtCtx == nil { + return errors.New("Could not create output context") + } + defer outputFmtCtx.AvformatFreeContext() + + ofmt := avformat.AvGuessFormat("", outputFile, "") + outputFmtCtx.SetOformat(ofmt) + + avIOContext, err := avformat.AvIOOpen(outputFile, avformat.AVIO_FLAG_WRITE) + if err != nil { + return fmt.Errorf("Could not open output file '%s'", outputFile) + } + outputFmtCtx.SetPb(avIOContext) + + outStream := outputFmtCtx.AvformatNewStream(nil) + if outStream == nil { + return errors.New("Failed allocating output stream") + } + + // Set the frame format + pCodecCtx := outStream.Codec() + pCodecCtx.SetCodecId(ofmt.GetVideoCodec()) + pCodecCtx.SetCodecType(avformat.AVMEDIA_TYPE_VIDEO) + pCodecCtx.SetPixelFormat(avcodec.AV_PIX_FMT_YUVJ420P) + pCodecCtx.SetWidth(width) + pCodecCtx.SetHeight(height) + pCodecCtx.SetTimeBase(1, 25) + outputFmtCtx.AvDumpFormat(0, outputFile, 1) + + // Get video codec + pCodec := avcodec.AvcodecFindEncoder(pCodecCtx.CodecId()) + if pCodec == nil { + return errors.New("Codec not found.") + } + defer pCodecCtx.AvcodecClose() + + // open video codec + cctx := avcodec.Context(*pCodecCtx) + defer cctx.AvcodecClose() + if cctx.AvcodecOpen2(pCodec, nil) < 0 { + return errors.New("Could not open codec.") + } + + outputFmtCtx.AvformatWriteHeader(nil) + ySize := width * height + + // Write media data to media files + var packet avcodec.Packet + packet.AvNewPacket(ySize * 3) + defer packet.AvPacketUnref() + var gotPicture int + if cctx.AvcodecEncodeVideo2(&packet, frame, &gotPicture) < 0 { + return errors.New("Encode Error") + } + if gotPicture == 1 { + packet.SetStreamIndex(outStream.Index()) + outputFmtCtx.AvWriteFrame(&packet) + } + + outputFmtCtx.AvWriteTrailer() + if outputFmtCtx.Oformat().GetFlags()&avformat.AVFMT_NOFILE == 0 { + if err = outputFmtCtx.Pb().Close(); err != nil { + return fmt.Errorf("close output fmt context failed: %v", err) + } + } + return nil +} + +// SaveFrame save frame. +func SaveFrame(input string, outDir string, format string, frameCount int, frameInterval int) error { + // Open video file + avformat.AvDictSet(&avformat.Dict, "rtsp_transport", "tcp", 0) + avformat.AvDictSet(&avformat.Dict, "max_delay", "5000000", 0) + + pFormatContext := avformat.AvformatAllocContext() + if avformat.AvformatOpenInput(&pFormatContext, input, nil, &avformat.Dict) != 0 { + return fmt.Errorf("Unable to open file %s", input) + } + // Retrieve stream information + if pFormatContext.AvformatFindStreamInfo(nil) < 0 { + return errors.New("Couldn't find stream information") + } + // Dump information about file onto standard error + pFormatContext.AvDumpFormat(0, input, 0) + // Find the first video stream + streamIndex := -1 + for i := 0; i < int(pFormatContext.NbStreams()); i++ { + if pFormatContext.Streams()[i].CodecParameters().AvCodecGetType() == avformat.AVMEDIA_TYPE_VIDEO { + streamIndex = i + break + } + } + if streamIndex == -1 { + return errors.New("couldn't find video stream") + } + // Get a pointer to the codec context for the video stream + pCodecCtxOrig := pFormatContext.Streams()[streamIndex].Codec() + // Find the decoder for the video stream + pCodec := avcodec.AvcodecFindDecoder(pCodecCtxOrig.CodecId()) + if pCodec == nil { + return errors.New("unsupported codec") + } + // Copy context + pCodecCtx := pCodec.AvcodecAllocContext3() + if pCodecCtx.AvcodecCopyContext((*avcodec.Context)(unsafe.Pointer(pCodecCtxOrig))) != 0 { + return errors.New("couldn't copy codec context") + } + + // Open codec + if pCodecCtx.AvcodecOpen2(pCodec, nil) < 0 { + return errors.New("could not open codec") + } + + // Allocate video frame + pFrame := avutil.AvFrameAlloc() + + // Allocate an AVFrame structure + pFrameRGB := avutil.AvFrameAlloc() + if pFrameRGB == nil { + return errors.New("unable to allocate RGB Frame") + } + // Determine required buffer size and allocate buffer + numBytes := uintptr(avcodec.AvpictureGetSize(avcodec.AV_PIX_FMT_YUVJ420P, pCodecCtx.Width(), + pCodecCtx.Height())) + buffer := avutil.AvMalloc(numBytes) + + // Assign appropriate parts of buffer to image planes in pFrameRGB + // Note that pFrameRGB is an AVFrame, but AVFrame is a superset + // of AVPicture + avp := (*avcodec.Picture)(unsafe.Pointer(pFrameRGB)) + avp.AvpictureFill((*uint8)(buffer), avcodec.AV_PIX_FMT_YUVJ420P, pCodecCtx.Width(), pCodecCtx.Height()) + + // initialize SWS context for software scaling + swsCtx := swscale.SwsGetcontext( + pCodecCtx.Width(), + pCodecCtx.Height(), + (swscale.PixelFormat)(pCodecCtx.PixFmt()), + pCodecCtx.Width(), + pCodecCtx.Height(), + avcodec.AV_PIX_FMT_YUVJ420P, + avcodec.SWS_BICUBIC, + nil, + nil, + nil, + ) + frameNum := 0 + failureNum := 0 + failureCount := 5 * frameCount + packet := avcodec.AvPacketAlloc() + // Start capturing and saving video frames + for { + if failureNum >= failureCount { + klog.Error("the number of failed attempts to save frames has reached the upper limit") + return errors.New("the number of failed attempts to save frames has reached the upper limit") + } + + if pFormatContext.AvReadFrame(packet) < 0 { + klog.Error("Read frame failed") + time.Sleep(time.Second) + continue + } + + // Is this a packet from the video stream? + if packet.StreamIndex() != streamIndex { + failureNum++ + continue + } + + // Decode video frame + response := pCodecCtx.AvcodecSendPacket(packet) + if response < 0 { + klog.Errorf("Error while sending a packet to the decoder: %s", avutil.ErrorFromCode(response)) + failureNum++ + continue + } + response = pCodecCtx.AvcodecReceiveFrame((*avutil.Frame)(unsafe.Pointer(pFrame))) + if response == avutil.AvErrorEAGAIN || response == avutil.AvErrorEOF { + failureNum++ + continue + } else if response < 0 { + klog.Errorf("Error while receiving a frame from the decoder: %s", avutil.ErrorFromCode(response)) + failureNum++ + continue + } + // Convert the image from its native format to RGB + swscale.SwsScale2(swsCtx, avutil.Data(pFrame), + avutil.Linesize(pFrame), 0, pCodecCtx.Height(), + avutil.Data(pFrameRGB), avutil.Linesize(pFrameRGB)) + + // Save the frame to disk + err := save(pFrameRGB, pCodecCtx.Width(), pCodecCtx.Height(), outDir, format) + if err != nil { + klog.Error(err) + continue + } + frameNum++ + if frameNum >= frameCount { + return nil + } + time.Sleep(time.Nanosecond * time.Duration(frameInterval)) + } +} diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/stream/video.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/stream/video.go new file mode 100644 index 000000000..7954e215d --- /dev/null +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/data/stream/video.go @@ -0,0 +1,142 @@ +/* +Copyright 2024 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stream + +import ( + "errors" + "fmt" + + "github.com/sailorvii/goav/avcodec" + "github.com/sailorvii/goav/avformat" + "github.com/sailorvii/goav/avutil" + "k8s.io/klog/v2" +) + +// SaveVideo save video. +func SaveVideo(inputFile string, outDir string, format string, frameCount int, videoNum int) error { + var fragmentedMp4Options int + //initialize input file with Context + var inputFmtCtx *avformat.Context + + avformat.AvDictSet(&avformat.Dict, "rtsp_transport", "tcp", 0) + avformat.AvDictSet(&avformat.Dict, "max_delay", "5000000", 0) + + if avformat.AvformatOpenInput(&inputFmtCtx, inputFile, nil, &avformat.Dict) < 0 { + return fmt.Errorf("could not open input file '%s", inputFile) + } + defer inputFmtCtx.AvformatFreeContext() + //read stream information + + if inputFmtCtx.AvformatFindStreamInfo(nil) < 0 { + return errors.New("failed to retrieve input stream information") + } + + //initialize streamMapping + streamMappingSize := int(inputFmtCtx.NbStreams()) + streamMapping := make([]int, streamMappingSize) + var streamIndex int + + validTypeMap := map[avcodec.MediaType]int{ + avformat.AVMEDIA_TYPE_VIDEO: 1, + avformat.AVMEDIA_TYPE_AUDIO: 1, + avformat.AVMEDIA_TYPE_SUBTITLE: 1, + } + var inCodecParam *avcodec.AvCodecParameters + defer inCodecParam.AvCodecParametersFree() + + var outputFmtCtx *avformat.Context + outputFile := GenFileName(outDir, format) + avformat.AvAllocOutputContext2(&outputFmtCtx, nil, nil, &outputFile) + if outputFmtCtx == nil { + return errors.New("Could not create output context") + } + defer outputFmtCtx.AvformatFreeContext() + + for index, inStream := range inputFmtCtx.Streams() { + inCodecParam = inStream.CodecParameters() + inCodecType := inCodecParam.AvCodecGetType() + + if validTypeMap[inCodecType] == 0 { + streamMapping[index] = -1 + continue + } + streamMapping[index] = streamIndex + streamIndex++ + outStream := outputFmtCtx.AvformatNewStream(nil) + if outStream == nil { + return errors.New("Failed allocating output stream") + } + if inCodecParam.AvCodecParametersCopyTo(outStream.CodecParameters()) < 0 { + return errors.New("Failed to copy codec parameters") + } + } + + // initialize opts + var opts *avutil.Dictionary + defer opts.AvDictFree() + if fragmentedMp4Options != 0 { + opts.AvDictSet("movflags", "frag_keyframe+empty_moov+default_base_moof", 0) + } + var packet avcodec.Packet + defer packet.AvPacketUnref() + + // Capture a set number of video segments + for idx := 0; idx < videoNum; idx++ { + outputFile = GenFileName(outDir, format) + // initialize output file with Context + outputFmtCtx.AvDumpFormat(0, outputFile, 1) + if outputFmtCtx.Oformat().GetFlags()&avformat.AVFMT_NOFILE == 0 { + avIOContext, err := avformat.AvIOOpen(outputFile, avformat.AVIO_FLAG_WRITE) + if err != nil { + return fmt.Errorf("could not open output file '%s'", outputFile) + } + outputFmtCtx.SetPb(avIOContext) + } + + if outputFmtCtx.AvformatWriteHeader(&opts) < 0 { + return errors.New("Error occurred when opening output file") + } + // Capture and generate video according to the set number of frames + for i := 1; i < frameCount; i++ { + if inputFmtCtx.AvReadFrame(&packet) < 0 { + return errors.New("read frame failed") + } + index := packet.StreamIndex() + inputStream := inputFmtCtx.Streams()[index] + if index >= streamMappingSize || streamMapping[index] < 0 { + continue + } + packet.SetStreamIndex(streamMapping[index]) + outputStream := outputFmtCtx.Streams()[index] + packet.AvPacketRescaleTs(inputStream.TimeBase(), outputStream.TimeBase()) + packet.SetPos(-1) + if outputFmtCtx.AvInterleavedWriteFrame(&packet) < 0 { + klog.Error("Error muxing packet") + continue + } + } + + outputFmtCtx.AvWriteTrailer() + if outputFmtCtx.Oformat().GetFlags()&avformat.AVFMT_NOFILE == 0 { + if outputFmtCtx.Pb().Close() != nil { + klog.Error("Error close output context") + return errors.New("error close output context") + } + } + } + return nil +} diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go index be274770c..f7afe8d37 100644 --- a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/device/device.go @@ -19,6 +19,7 @@ import ( dbTdengine "github.com/kubeedge/Template/data/dbmethod/tdengine" httpMethod "github.com/kubeedge/Template/data/publish/http" mqttMethod "github.com/kubeedge/Template/data/publish/mqtt" + "github.com/kubeedge/Template/data/stream" "github.com/kubeedge/Template/driver" dmiapi "github.com/kubeedge/kubeedge/pkg/apis/dmi/v1beta1" "github.com/kubeedge/mapper-framework/pkg/common" @@ -122,6 +123,18 @@ func dataHandler(ctx context.Context, dev *driver.CustomizedDev) { klog.Error(err) continue } + + // If the device property type is streaming, it will directly enter the streaming data processing function, + // such as saving frames or saving videos, and will no longer push it to the user database and application. + // If there are other needs for stream data processing, users can add functions in the mapper/data/stream directory. + if twin.Property.PProperty.DataType == "stream" { + err = stream.StreamHandler(&twin, dev.CustomizedClient, &visitorConfig) + if err != nil { + klog.Errorf("processed streaming data by %s Error: %v", twin.PropertyName, err) + } + continue + } + // handle twin twinData := &TwinData{ DeviceName: dev.Instance.Name, diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/go.mod b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/go.mod index 9f988757a..2b6f77841 100644 --- a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/go.mod +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/go.mod @@ -11,6 +11,7 @@ require ( github.com/influxdata/influxdb-client-go/v2 v2.13.0 github.com/kubeedge/kubeedge v0.0.0 github.com/kubeedge/mapper-framework v0.0.0 + github.com/sailorvii/goav v0.1.4 github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace // indirect github.com/taosdata/driver-go/v3 v3.5.1 golang.org/x/net v0.17.0 // indirect @@ -30,7 +31,7 @@ require ( github.com/gorilla/websocket v1.5.0 // indirect github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/kr/text v0.1.0 // indirect + github.com/kr/text v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/oapi-codegen/runtime v1.0.0 // indirect diff --git a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/go.sum b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/go.sum index 7ce489094..ab00f7702 100644 --- a/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/go.sum +++ b/staging/src/github.com/kubeedge/mapper-framework/_template/mapper/go.sum @@ -6,6 +6,7 @@ github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevB github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -34,6 +35,8 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gosuri/uilive v0.0.0-20170323041506-ac356e6e42cd/go.mod h1:qkLSc0A5EXSP6B04TrN4oQoxqFI7A8XvoXSlJi8cwk8= +github.com/gosuri/uiprogress v0.0.0-20170224063937-d0567a9d84a1/go.mod h1:C1RTYn4Sc7iEyf6j8ft5dyoZ4212h8G1ol9QQluh5+0= github.com/influxdata/influxdb-client-go/v2 v2.13.0 h1:ioBbLmR5NMbAjP4UVA5r9b5xGjpABD7j65pI8kFphDM= github.com/influxdata/influxdb-client-go/v2 v2.13.0/go.mod h1:k+spCbt9hcvqvUiz0sr5D8LolXHqAAOfPw9v/RIRHl4= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= @@ -42,9 +45,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -57,6 +59,8 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.27.7 h1:fVih9JD6ogIiHUN6ePK7HJidyEDpWGVB5mzM7cWNXoU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sailorvii/goav v0.1.4 h1:4FwbikqIxx26dcHlZ8195WSPQSWbNnvRvTSgRTPgh2w= +github.com/sailorvii/goav v0.1.4/go.mod h1:upppsyLr1RLWDZ0+U3RYYGTv9NVwCjz14j/zzxRM018= github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace h1:9PNP1jnUjRhfmGMlkXHjYPishpcw4jpSt/V/xYY3FMA= github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= @@ -66,17 +70,17 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/taosdata/driver-go/v3 v3.5.1 h1:ln8gLJ6HR6gHU6dodmOa9utUjPUpAcdIplh6arFO26Q= github.com/taosdata/driver-go/v3 v3.5.1/go.mod h1:H2vo/At+rOPY1aMzUV9P49SVX7NlXb3LAbKw+MCLrmU= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA= -google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s= -google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc= -google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/grpc v1.56.3 h1:8I4C0Yq1EjstUzUJzpcRVbuYA2mODtEmpWiQoN/b2nc= +google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= diff --git a/staging/src/github.com/kubeedge/mapper-framework/hack/make-rules/generate.sh b/staging/src/github.com/kubeedge/mapper-framework/hack/make-rules/generate.sh index 523317f5a..e94cdcbf8 100755 --- a/staging/src/github.com/kubeedge/mapper-framework/hack/make-rules/generate.sh +++ b/staging/src/github.com/kubeedge/mapper-framework/hack/make-rules/generate.sh @@ -11,14 +11,20 @@ MAPPER_DIR="$(cd "$(dirname "$ROOT_DIR")" && pwd -P)" function entry() { # copy template - if [ $# -eq 0 ] ;then + if [ $# -ne 2 ] ;then read -p "Please input the mapper name (like 'Bluetooth', 'BLE'): " -r mapperName if [[ -z "${mapperName}" ]]; then echo "the mapper name is required" exit 1 fi + read -p "Please input the build method (like 'stream', 'nostream'): " -r buildMethod + if [[ -z "${buildMethod}" ]]; then + echo "the build method is required" + exit 1 + fi else mapperName=$1 + buildMethod=$2 fi mapperNameLowercase=$(echo -n "${mapperName}" | tr '[:upper:]' '[:lower:]') mapperPath="${MAPPER_DIR}/${mapperNameLowercase}" @@ -27,6 +33,16 @@ function entry() { exit 1 fi cp -r "${ROOT_DIR}/_template/mapper" "${mapperPath}" + if [ "${buildMethod}" = "stream" ]; then + rm "${mapperPath}/data/stream/handler_nostream.go" + fi + + if [ "${buildMethod}" = "nostream" ]; then + cd "${mapperPath}/data/stream" + ls |grep -v handler_nostream.go |xargs rm -rf + mv handler_nostream.go handler.go + cd - + fi mapperVar=$(echo "${mapperName}" | sed -e "s/\b\(.\)/\\u\1/g") diff --git a/staging/src/github.com/kubeedge/mapper-framework/pkg/common/const.go b/staging/src/github.com/kubeedge/mapper-framework/pkg/common/const.go index 004d47069..6352e2120 100644 --- a/staging/src/github.com/kubeedge/mapper-framework/pkg/common/const.go +++ b/staging/src/github.com/kubeedge/mapper-framework/pkg/common/const.go @@ -42,3 +42,8 @@ const ( DevInitModeRegister = "register" DevInitModeConfigmap = "configmap" ) + +const ( + SaveFrame = "saveFrame" + SaveVideo = "saveVideo" +) |
