Source code for pyvar.multimedia.helper

# Copyright 2021 Variscite LTD
# SPDX-License-Identifier: BSD-3-Clause

"""
:platform: Unix/Yocto
:synopsis: Classes to handle Multimedia capabilities.

.. moduleauthor:: Diego Dorta <diego.d@variscite.com>
"""

import os
import socket
import sys

import cv2
import gi

gi.require_version('Gst', '1.0')
from gi.repository import Gst

from pyvar.multimedia.config import FULL_HD_RESOLUTION
from pyvar.multimedia.config import HD_RESOLUTION
from pyvar.multimedia.config import VGA_RESOLUTION
from pyvar.multimedia.config import LEAKY


[docs]class Multimedia: """ Class to handle the Multimedia resources. :ivar video_src: video source; :ivar resolution: video source resolution; :ivar devices: video devices; :ivar sink: video capture. """ def __init__(self, source=None, resolution=None): self.video_src = source self.resolution = resolution self.devices = Devices() self.devices.get_video_devices() self.sink = None if not os.path.isfile(self.video_src): self.dev = self.devices.search_device(self.video_src) self.dev_caps = self.get_caps()
[docs] def get_caps(self): """ Get the caps from device. Returns: The valid resolution. """ if self.resolution == FULL_HD_RESOLUTION: return self.validate_caps(self.dev.full_hd_caps) elif self.resolution == VGA_RESOLUTION: return self.validate_caps(self.dev.vga_caps) else: if self.resolution != HD_RESOLUTION: print(f"Invalid resolution: {self.resolution}. " f"Trying to use HD resolution instead.") return self.validate_caps(self.dev.hd_caps)
[docs] def validate_caps(self, caps): """ Check if the resolution is valid. Returns: The valid resolution or default one. """ if caps: return caps else: print(f"Resolution not supported. Using " f"{self.dev.default_caps.width}x" f"{self.dev.default_caps.height} instead.") return self.dev.default_caps
[docs] def set_v4l2_config(self): """ Set the v4l2 configuration. """ if self.video_src and os.path.isfile(self.video_src): pipeline = self.v4l2_video_pipeline(self.video_src) host_name = socket.gethostname() if host_name.startswith("imx93"): pipeline = self.v4l2_video_pipeline_imx93(self.video_src) else: pipeline = self.v4l2_camera_pipeline( width=self.dev_caps.width, height=self.dev_caps.height, device=self.dev.name, framerate=self.dev_caps.framerate) self.sink = cv2.VideoCapture(pipeline)
[docs] @staticmethod def v4l2_video_pipeline_imx93(filesrc): """ Set the v4l2 configuration for i.MX 93. """ return (f"filesrc location={filesrc} ! qtdemux name=d d.video_0 ! " \ f"avdec_h264 ! queue ! videoconvert ! appsink")
[docs] @staticmethod def v4l2_video_pipeline(filesrc): """ Set the v4l2 configuration for video source file. """ return (f"filesrc location={filesrc} ! qtdemux name=d d.video_0 ! " f"decodebin ! queue {LEAKY} ! queue ! imxvideoconvert_g2d ! " f"videoconvert ! appsink")
[docs] @staticmethod def v4l2_camera_pipeline(width, height, device, framerate): """ Set the v4l2 configuration for camera device. """ return (f"v4l2src device={device} ! video/x-raw,width={width}," f"height={height},framerate={framerate} ! queue {LEAKY} ! " f"videoconvert ! appsink")
[docs] def get_frame(self): """ Get the frame from video source. Returns: The current frame. """ check, frame = self.sink.read() if check is not True: self.destroy() sys.exit("Your video device could not capture any image.") return frame
[docs] def loop(self): """ Check if the video source still have frames or not. """ if (not self.sink) or (not self.sink.isOpened()): sys.exit("Your video device could not be initialized. Exiting...") return self.sink.isOpened()
[docs] @staticmethod def save(name=None, output_frame=None): """ Save any frame as an output file. """ cv2.imwrite(name, output_frame)
[docs] def show(self, name=None, output_frame=None): """ Show any frame. """ cv2.imshow(name, output_frame) if (cv2.waitKey(1) & 0xFF) == ord('q'): self.destroy()
[docs] @staticmethod def show_image(title=None, image=None): """ Show any image. """ cv2.imshow(title, image) cv2.waitKey() cv2.destroyAllWindows()
[docs] def destroy(self): """ Release and destroy the video capture from video source. """ self.sink.release() cv2.destroyAllWindows()
[docs]class VideoDevice: """ Class to handle the Video device features and resolutions. :ivar name: video name; :ivar caps: caps resolution; :ivar default_caps: default caps resolution; :ivar full_hd_caps: full hd caps resolution; :ivar hd_caps: hd caps resolution; :ivar vga_caps: vga caps resolution. """ def __init__(self): self.name = None self.caps = None self.default_caps = None self.full_hd_caps = None self.hd_caps = None self.vga_caps = None
[docs]class Caps: """ Class to handle the video caps device. :ivar name: video name; :ivar format: caps format; :ivar width: caps width; :ivar height: caps height; :ivar framerate: caps framerate. """ def __init__(self): self.name = None self.format = None self.width = None self.height = None self.framerate = None
[docs]class Devices: """ Class to handle the devices. :ivar devices: devices from device monitor. """ def __init__(self): self.devices = []
[docs] def get_video_devices(self): """ Get the available devices from device monitor. """ Gst.init() dev_monitor = Gst.DeviceMonitor() dev_monitor.add_filter("Video/Source") dev_monitor.start() for dev in dev_monitor.get_devices(): video_dev = VideoDevice() dev_props = dev.get_properties() dev_caps = dev.get_caps() name = dev_props.get_string("device.path") caps = self.get_device_caps(dev_caps.normalize()) full_hd_caps, hd_caps, vga_caps = self.get_std_caps(caps) default_caps = hd_caps if (not default_caps) and caps: default_caps = caps[0] video_dev.name = name video_dev.caps = caps video_dev.default_caps = default_caps video_dev.full_hd_caps = full_hd_caps video_dev.hd_caps = hd_caps video_dev.vga_caps = vga_caps self.devices.append(video_dev) dev_monitor.stop()
[docs] @staticmethod def get_device_caps(dev_caps): caps_list = [] for i in range(dev_caps.get_size()): if dev_caps.get_structure(i).get_name() != "video/x-raw": continue caps = Caps() caps_struct = dev_caps.get_structure(i) caps.name = caps_struct.get_name() caps.format = caps_struct.get_string("format") caps.width = caps_struct.get_int("width")[1] caps.height = caps_struct.get_int("height")[1] framerate = ("{}/{}".format(*caps_struct.get_fraction( "framerate")[1:])) caps.framerate = framerate caps_list.append(caps) return caps_list
[docs] @staticmethod def get_std_caps(dev_caps): full_hd_caps = Caps() full_hd_caps.name = "video/x-raw" full_hd_caps.width = 1920 full_hd_caps.height = 1080 full_hd_caps.framerate = "60/1" hd_caps = Caps() hd_caps.name = "video/x-raw" hd_caps.width = 1280 hd_caps.height = 720 hd_caps.framerate = "60/1" vga_caps = Caps() vga_caps.name = "video/x-raw" vga_caps.width = 640 vga_caps.height = 480 vga_caps.framerate = "60/1" for caps in dev_caps: if (caps.width == 1920) and (caps.height == 1080): full_hd_caps = caps elif (caps.width == 1280) and (caps.height == 720): hd_caps = caps elif (caps.width == 640) and (caps.height == 480): vga_caps = caps return full_hd_caps, hd_caps, vga_caps
[docs] def search_device(self, dev_name): """ Check if the device is valid or not. """ dev = None if dev_name.startswith("/dev/video"): for device in self.devices: if device.name == dev_name: dev = device if not dev: print("The specified video_src was not found. " "Searching for default video device...") if not dev and self.devices: dev = self.devices[0] elif not dev: sys.exit("No video device found. Exiting...") print(f"Using {dev.name} as video device") return dev