프로그램 사용/yolo_tensorflow

i.mx8mp gopoint 실행 경로

구차니 2024. 1. 2. 15:50

문서를  찾다가 지쳐서 걍 실행하고 인자를 보는걸로..

root        3019     925 72 06:38 ?        00:00:25 /usr/bin/python3 /home/root/.nxp-demo-experience/scripts/machine_learning/MLDemoLauncher.py detect


root@imx8mpevk:~/.nxp-demo-experience/scripts/machine_learning# cat MLDemoLauncher.py 
#!/usr/bin/env python3

Copyright 2021-2023 NXP

SPDX-License-Identifier: BSD-2-Clause

This script launches the NNStreamer ML Demos using a UI to pick settings.

import gi
import os
import sys
import glob
from gi.repository import Gtk, GLib, Gio

gi.require_version("Gtk", "3.0")

import utils

class MLLaunch(Gtk.Window):
    """The GUI window for the ML demo launcher"""

    def __init__(self, demo):
        """Creates the UI window"""
        # Initialization
        self.demo = demo
        self.set_default_size(450, 200)

        # Get platform
        self.platform = os.uname().nodename
        # OpenVX graph caching is not available on i.MX 8QuadMax platform.
        if self.platform != "imx8qmmek" :
            os.environ["VIV_VX_CACHE_BINARY_GRAPH_DIR"] = "/home/root/.cache/gopoint"
            os.environ["VIV_VX_ENABLE_CACHE_GRAPH_BINARY"] = "1"

        # Get widget properties
        devices = []
        if self.demo != "brand" and self.demo != "selfie_nn":
            if self.platform != "imx93evk":
                devices.append("Example Video")

        for device in glob.glob("/dev/video*"):

        backends_available = ["CPU"]
        if (
            and self.demo != "pose"
            and self.demo != "selfie_nn"
            backends_available.insert(1, "GPU")
        if os.path.exists("/usr/lib/libneuralnetworks.so") and self.demo != "brand" and self.platform != "imx8qmmek":
            backends_available.insert(0, "NPU")
        if os.path.exists("/usr/lib/libethosu_delegate.so"):
            backends_available.insert(0, "NPU")

        displays_available = ["Weston"]

        colors_available = ["Red", "Green", "Blue", "Black", "White"]

        demo_modes_available = ["Background Substitution", "Segmentation Mask"]

        # Create widgets
        main_grid = Gtk.Grid.new()
        device_label = Gtk.Label.new("Source")
        self.device_combo = Gtk.ComboBoxText()
        backend_label = Gtk.Label.new("Backend")
        self.backend_combo = Gtk.ComboBoxText()
        self.display_combo = Gtk.ComboBoxText()
        self.launch_button = Gtk.Button.new_with_label("Run")
        self.status_bar = Gtk.Label.new()
        header = Gtk.HeaderBar()
        quit_button = Gtk.Button()
        quit_icon = Gio.ThemedIcon(name="process-stop-symbolic")
        quit_image = Gtk.Image.new_from_gicon(quit_icon, Gtk.IconSize.BUTTON)
        separator = Gtk.Separator.new(0)
        time_title_label = Gtk.Label.new("Video Refresh")
        self.time_label = Gtk.Label.new("--.-- ms")
        self.fps_label = Gtk.Label.new("-- FPS")
        inference_title_label = Gtk.Label.new("Inference Time")
        self.inference_label = Gtk.Label.new("--.-- ms")
        self.ips_label = Gtk.Label.new("-- IPS")
        if self.demo != "selfie_nn":
            self.width_entry = self.r_scale = Gtk.Scale.new_with_range(
                Gtk.Orientation.HORIZONTAL, 300, 1920, 2
            self.height_entry = self.r_scale = Gtk.Scale.new_with_range(
                Gtk.Orientation.HORIZONTAL, 300, 1080, 2
            self.width_label = Gtk.Label.new("Height")
            self.height_label = Gtk.Label.new("Width")
            self.color_label = Gtk.Label.new("Label Color")
            self.color_label = Gtk.Label.new("Text Color")
            self.demo_mode = Gtk.Label.new("Demo Mode")
            self.mode_combo = Gtk.ComboBoxText()
        self.color_combo = Gtk.ComboBoxText()

        # Organize widgets



        main_grid.attach(device_label, 0, 1, 2, 1)
        main_grid.attach(backend_label, 0, 2, 2, 1)
        # main_grid.attach(display_label, 0, 3, 2, 1)
        if self.demo != "selfie_nn":
            main_grid.attach(self.width_label, 0, 4, 2, 1)
            main_grid.attach(self.height_label, 0, 5, 2, 1)
            main_grid.attach(self.color_label, 0, 6, 2, 1)
            main_grid.attach(self.demo_mode, 0, 4, 2, 1)
            main_grid.attach(self.color_label, 0, 5, 2, 1)

        main_grid.attach(self.device_combo, 2, 1, 2, 1)
        main_grid.attach(self.backend_combo, 2, 2, 2, 1)
        # main_grid.attach(self.display_combo, 2, 3, 2, 1)
        if self.demo != "selfie_nn":
            main_grid.attach(self.width_entry, 2, 4, 2, 1)
            main_grid.attach(self.height_entry, 2, 5, 2, 1)
            main_grid.attach(self.color_combo, 2, 6, 2, 1)
            main_grid.attach(self.mode_combo, 2, 4, 2, 1)
            main_grid.attach(self.color_combo, 2, 5, 2, 1)

        main_grid.attach(self.launch_button, 0, 7, 4, 1)
        main_grid.attach(self.status_bar, 0, 8, 4, 1)

        main_grid.attach(separator, 0, 9, 4, 1)

        main_grid.attach(time_title_label, 0, 10, 2, 1)
        main_grid.attach(self.time_label, 0, 11, 1, 1)
        main_grid.attach(self.fps_label, 1, 11, 1, 1)
        main_grid.attach(inference_title_label, 2, 10, 2, 1)
        main_grid.attach(self.inference_label, 2, 11, 1, 1)
        main_grid.attach(self.ips_label, 3, 11, 1, 1)

        # Configure widgets
        for device in devices:
        for backend in backends_available:
        for display in displays_available:
        for color in colors_available:
        if self.demo == "selfie_nn":
            for mode in demo_modes_available:

        if self.demo != "selfie_nn":
        self.device_combo.connect("changed", self.on_source_change)
        self.launch_button.connect("clicked", self.start)
        quit_button.connect("clicked", exit)
        if self.demo == "detect":
            header.set_title("Detection Demo")
        elif self.demo == "id":
            header.set_title("Classification Demo")
        elif self.demo == "pose":
            header.set_title("Pose Demo")
        elif self.demo == "brand":
            header.set_title("Brand Demo")
        elif self.demo == "selfie_nn":
            header.set_title("Selfie Segmenter Demo")
            header.set_title("NNStreamer Demo")
        header.set_subtitle("NNStreamer Examples")

    def start(self, button):
        """Starts the ML Demo with selected settings"""
        self.update_time = GLib.get_monotonic_time()
        if self.color_combo.get_active_text() == "Red":
            r = 1
            g = 0
            b = 0
        elif self.color_combo.get_active_text() == "Blue":
            r = 0
            g = 0
            b = 1
        elif self.color_combo.get_active_text() == "Green":
            r = 0
            g = 1
            b = 0
        elif self.color_combo.get_active_text() == "Black":
            r = 0
            g = 0
            b = 0
        elif self.color_combo.get_active_text() == "White":
            r = 1
            g = 1
            b = 1
            r = 1
            g = 0
            b = 0
        if self.demo == "detect":
            if self.platform == "imx93evk":
                model = utils.download_file("mobilenet_ssd_v2_coco_quant_postprocess_vela.tflite")
                model = utils.download_file("mobilenet_ssd_v2_coco_quant_postprocess.tflite")
            labels = utils.download_file("coco_labels.txt")
            if self.device_combo.get_active_text() == "Example Video":
                device = utils.download_file("detect_example.mov")
                device = self.device_combo.get_active_text()
            if model == -1 or model == -2 or model == -3:
                if self.platform == "imx93evk":
                    error = "mobilenet_ssd_v2_coco_quant_postprocess_vela.tflite"
                    error = "mobilenet_ssd_v2_coco_quant_postprocess.tflite"
            elif labels == -1 or labels == -2 or labels == -3:
                error = "coco_labels.txt"
            elif device == -1 or device == -2 or device == -3:
                error = "detect_example.mov"
        if self.demo == "id":
            if self.platform == "imx93evk":
                model = utils.download_file("mobilenet_v1_1.0_224_quant_vela.tflite")
                model = utils.download_file("mobilenet_v1_1.0_224_quant.tflite")
            labels = utils.download_file("1_1.0_224_labels.txt")
            if self.device_combo.get_active_text() == "Example Video":
                device = utils.download_file("id_example.mov")
                device = self.device_combo.get_active_text()
            if model == -1 or model == -2 or model == -3:
                if self.platform == "imx93evk":
                    error = "mobilenet_v1_1.0_224_quant_vela.tflite"
                    error = "mobilenet_v1_1.0_224_quant.tflite"
            elif labels == -1 or labels == -2 or labels == -3:
                error = "1_1.0_224_labels.txt"
            elif device == -1 or device == -2 or device == -3:
                error = "id_example.mov"
        if self.demo == "pose":
            model = utils.download_file("posenet_resnet50_uint8_float32_quant.tflite")
            labels = utils.download_file("key_point_labels.txt")
            if self.device_combo.get_active_text() == "Example Video":
                device = utils.download_file("pose_example.mov")
                device = self.device_combo.get_active_text()
            if model == -1 or model == -2 or model == -3:
                error = "posenet_resnet50_uint8_float32_quant.tflite"
            elif labels == -1 or labels == -2 or labels == -3:
                error = "key_point_labels.txt"
            elif device == -1 or device == -2 or device == -3:
                error = "pose_example.mov"
        if self.demo == "brand":
            model = utils.download_file("brand_model.tflite")
            labels = utils.download_file("brand_labels.txt")
            if self.device_combo.get_active_text() == "Example Video":
                device = utils.download_file("brand_example.mov")
                device = self.device_combo.get_active_text()
            if model == -1 or model == -2 or model == -3:
                error = "brand_model.tflite"
            elif labels == -1 or labels == -2 or labels == -3:
                error = "brand_labels.txt"
            elif device == -1 or device == -2 or device == -3:
                error = "brand_example.mov"
        if self.demo == "selfie_nn":
            if self.platform == "imx93evk":
                model = utils.download_file(
                model = utils.download_file("selfie_segmenter_int8.tflite")
            # Labels refer to background img
            if self.platform == "imx93evk":
                labels = utils.download_file("bg_image_landscape.jpg")
                labels = utils.download_file("bg_image.jpg")
            if self.device_combo.get_active_text() == "Example Video":
                device = utils.download_file("selfie_example.mov")
                device = self.device_combo.get_active_text()
            if model == -1 or model == -2 or model == -3:
                if self.platform == "imx93evk":
                    error = "selfie_segmenter_landscape_int8_vela.tflite"
                    error = "selfie_segmenter_int8.tflite"
            elif labels == -1 or labels == -2 or labels == -3:
                if self.platform == "imx93evk":
                    error = "bg_image_landscape.jpg"
                    error = "bg_image.jpg"
            elif device == -1 or device == -2 or device == -3:
                error = "selfie_example.mov"
            if self.mode_combo.get_active_text() == "Background Substitution":
                set_mode = 0
                set_mode = 1

        if model == -1 or labels == -1 or device == -1:
            dialog = Gtk.MessageDialog(
                text="Cannot find files! The file that you requested" +
                " does not have any metadata that is related to it. " +
                "Please see /home/root/.nxp-demo-experience/downloads.txt" +
                " to see if the requested file exists! \n \n Cannot find:" +
            self.status_bar.set_text("Cannot find files!")
        if model == -2 or labels == -2 or device == -2:
            dialog = Gtk.MessageDialog(
                text="Cannot download files! The URL used to download the" +
                " file cannot be reached. If you are connected to the " +
                "internet, please check the /home/root/.nxp-demo-experience" +
                "/downloads.txt for the URL. For some regions, " +
                "these sites may be blocked. To install these manually," +
                " please go to the file listed above and provide the " +
                "path to the file in \"PATH\" \n \n Cannot download " + error)
            self.status_bar.set_text("Download failed!")
        if model == -3 or labels == -3 or device == -4:
            dialog = Gtk.MessageDialog(
                text="Invalid files! The files where not what we expected." +
                "If you are SURE that the files are correct, delete " +
                "the \"SHA\" value in /home/root/.nxp-demo-experience" +
                "/downloads.txt to bypass the SHA check. \n \n Bad SHA for " +
            self.status_bar.set_text("Downloaded bad file!")
        if self.demo == "detect":
            import nndetection

            example = nndetection.ObjectDetection(
        if self.demo == "id":
            import nnclassification

            example = nnclassification.NNStreamerExample(
        if self.demo == "pose":
            import nnpose

            example = nnpose.NNStreamerExample(
        if self.demo == "brand":
            import nnbrand

            example = nnbrand.NNStreamerExample(
        if self.demo == "selfie_nn":
            import selfie_segmenter

            example = selfie_segmenter.SelfieSegmenter(


    def update_stats(self, time):
        """Callback used the update stats in GUI"""
        interval_time = (GLib.get_monotonic_time() - self.update_time) / 1000000
        if interval_time > 1:
            refresh_time = time.interval_time
            inference_time = time.tensor_filter.get_property("latency")

            if refresh_time != 0 and inference_time != 0:
                # Print pipeline information
                if self.demo == "selfie_nn" or self.demo == "id" or self.demo == "detect":
                        "{:12.2f} ms".format(1.0 / time.current_framerate * 1000.0)
                        "{:12.2f} FPS".format(time.current_framerate)
                    self.time_label.set_text("{:12.2f} ms".format(refresh_time / 1000))
                        "{:12.2f} FPS".format(1 / (refresh_time / 1000000))
                # Print inference information
                    "{:12.2f} ms".format(inference_time / 1000)
                    "{:12.2f} FPS".format(1 / (inference_time / 1000000))
            self.update_time = GLib.get_monotonic_time()
        return True

    def on_source_change(self, widget):
        """Callback to lock sliders"""
        if self.demo != "selfie_nn":
            if self.device_combo.get_active_text() == "Example Video":

if __name__ == "__main__":
    if (
        len(sys.argv) != 2
        and sys.argv[1] != "detect"
        and sys.argv[1] != "id"
        and sys.argv[1] != "pose"
        and sys.argv[1] != "selfie_nn"
        print("Demos available: detect, id, pose, selfie_nn")
        win = MLLaunch(sys.argv[1])
        win.connect("destroy", Gtk.main_quit)


아래가 실행되는 녀석인데 nndetection을 import 하니까 그걸 따라가서 보는 중.

그나저나 LGPL 이면 그냥 공개해도 되려나?

root@imx8mpevk:~/.nxp-demo-experience/scripts/machine_learning# find / -name nndetection.py

root@imx8mpevk:~/.nxp-demo-experience/scripts/machine_learning# cat /home/root/.nxp-demo-experience/scripts/machine_learning/nndetection.py
#!/usr/bin/env python3

Copyright SSAFY Team 1 <jangjongha.sw@gmail.com>
Copyright 2021-2023 NXP

SPDX-License-Identifier: LGPL-2.1-only
Original Source: https://github.com/nnstreamer/nnstreamer-example

This demo shows how you can use the NNStreamer to detect objects.

From the original source, this was modified to better work with the a
GUI and to get better performance on the i.MX 8M Plus and i.MX93.

import os
import sys
import gi
import re
import logging
import numpy as np
import cairo

gi.require_version("Gst", "1.0")
from gi.repository import Gst, GObject, GLib

DEBUG = False

class ObjectDetection:
    """The class that manages the demo"""

    def __init__(
        """Creates an instance of the demo

        device -- What camera or video file to use
        backend -- Whether to use NPU or CPU
        model -- the path to the model
        labels -- the path to the labels
        display -- Whether to use X11 or Weston
        callback -- Callback to pass stats to
        width -- Width of output
        height -- Height of output
        r -- Red value for labels
        g -- Green value for labels
        b -- Blue value for labels
        self.loop = None
        self.pipeline = None
        self.running = False
        self.video_caps = None
        self.first_frame = True

        self.BOX_SIZE = 4
        self.LABEL_SIZE = 91
        self.DETECTION_MAX = 20
        self.MAX_OBJECT_DETECTION = 20

        self.Y_SCALE = 10.0
        self.X_SCALE = 10.0
        self.H_SCALE = 5.0
        self.W_SCALE = 5.0

        self.VIDEO_WIDTH = width
        self.VIDEO_HEIGHT = height
        self.MODEL_WIDTH = 300
        self.MODEL_HEIGHT = 300

        self.tflite_model = model
        self.label_path = labels
        self.device = device
        self.backend = backend
        self.display = display
        self.tflite_labels = []
        self.detected_objects = []
        self.callback = callback
        self.r = r
        self.b = b
        self.g = g
        self.platform = platform
        self.current_framerate = 1000

        # Define PXP or GPU2D converter
        if self.platform == "imx93evk":
            self.nxp_converter = "imxvideoconvert_pxp "
            self.nxp_converter = "imxvideoconvert_g2d "

        if not self.tflite_init():
            raise Exception


    def run(self):
        """Starts pipeline and run demo"""

        if self.backend == "CPU":
            if self.platform == "imx93evk":
                backend = "true:cpu custom=NumThreads:2"
                backend = "true:cpu custom=NumThreads:4"
        elif self.backend == "GPU":
            os.environ["USE_GPU_INFERENCE"] = "1"
            backend = (
                "true:gpu custom=Delegate:External," "ExtDelegateLib:libvx_delegate.so"
            if self.platform == "imx93evk":
                backend = (
                    "true:npu custom=Delegate:External,"
                os.environ["USE_GPU_INFERENCE"] = "0"
                backend = (
                    "true:npu custom=Delegate:External,"

        if self.display == "X11":
            display = "ximagesink name=img_tensor "
        elif self.display == "None":
            self.print_time = GLib.get_monotonic_time()
            display = "fakesink "
            display = "fpsdisplaysink name=img_tensor text-overlay=false video-sink=waylandsink sync=false"

        # main loop
        self.loop = GLib.MainLoop()
        self.old_time = GLib.get_monotonic_time()
        self.update_time = GLib.get_monotonic_time()
        self.reload_time = -1
        self.interval_time = 999999

        # Create decoder for video file
        if self.platform == "imx8qmmek":
            decoder = "h264parse ! v4l2h264dec "
            decoder = "vpudec "

        if "/dev/video" in self.device:
            gst_launch_cmdline = "v4l2src name=cam_src device=" + self.device
            gst_launch_cmdline += " ! " + self.nxp_converter + "! video/x-raw,width="
            gst_launch_cmdline += str(int(self.VIDEO_WIDTH)) + ",height="
            gst_launch_cmdline += str(int(self.VIDEO_HEIGHT))
            gst_launch_cmdline += ",framerate=30/1,format=BGRx ! tee name=t"
            gst_launch_cmdline = "filesrc location=" + self.device
            gst_launch_cmdline += " ! qtdemux ! " + decoder + "! tee name=t"

        gst_launch_cmdline += " t. ! " + self.nxp_converter + "!  video/x-raw,"
        gst_launch_cmdline += "width={:d},".format(self.MODEL_WIDTH)
        gst_launch_cmdline += "height={:d},".format(self.MODEL_HEIGHT)
        gst_launch_cmdline += " ! queue max-size-buffers=2 leaky=2 ! "
        gst_launch_cmdline += "videoconvert ! video/x-raw,format=RGB !"
        gst_launch_cmdline += " tensor_converter ! tensor_filter"
        gst_launch_cmdline += " framework=tensorflow-lite model="
        gst_launch_cmdline += self.tflite_model + " accelerator=" + backend
        gst_launch_cmdline += " silent=FALSE name=tensor_filter latency=1 ! "
        gst_launch_cmdline += "tensor_sink name=tensor_sink t. ! "
        gst_launch_cmdline += self.nxp_converter + "! "
        gst_launch_cmdline += "cairooverlay name=tensor_res ! "
        gst_launch_cmdline += "queue max-size-buffers=2 leaky=2 ! "
        gst_launch_cmdline += display

        self.pipeline = Gst.parse_launch(gst_launch_cmdline)

        # bus and message callback
        bus = self.pipeline.get_bus()
        bus.connect("message", self.on_bus_message)

        self.tensor_filter = self.pipeline.get_by_name("tensor_filter")
        self.wayland_sink = self.pipeline.get_by_name("img_tensor")

        # tensor sink signal : new data callback
        tensor_sink = self.pipeline.get_by_name("tensor_sink")
        tensor_sink.connect("new-data", self.new_data_cb)

        tensor_res = self.pipeline.get_by_name("tensor_res")
        tensor_res.connect("draw", self.draw_overlay_cb)
        tensor_res.connect("caps-changed", self.prepare_overlay_cb)
        if self.callback is not None:
            GObject.timeout_add(500, self.callback, self)

        # start pipeline
        self.running = True

        self.set_window_title("img_tensor", "NNStreamer Object Detection Example")

        # run main loop

        # quit when received eos or error message
        self.running = False


    def tflite_init(self):
        :return: True if successfully initialized

        if not os.path.exists(self.tflite_model):
            logging.error("cannot find tflite model [%s]", self.tflite_model)
            return False

        label_path = self.label_path
            with open(label_path, "r") as label_file:
                for line in label_file.readlines():
                    if line[0].isdigit():
                        while str(len(self.tflite_labels)) not in line:
                        self.tflite_labels.append(line[line.find(" ") + 1 :])
        except FileNotFoundError:
            logging.error("cannot find tflite label [%s]", label_path)
            return False

        logging.info("finished to load labels, total [%d]", len(self.tflite_labels))
        return True

    # @brief Callback for tensor sink signal.
    def new_data_cb(self, sink, buffer):
        """Callback for tensor sink signal.

        :param sink: tensor sink element
        :param buffer: buffer from element
        :return: None
        if self.running:
            new_time = GLib.get_monotonic_time()
            self.interval_time = new_time - self.old_time
            self.old_time = new_time
            if buffer.n_memory() != 4:
                return False

            #  tensor type is float32.
            # 4:20:1:1\,20:1:1:1\,20:1:1:1\,1:1:1:1
            # [0] detection_boxes (default 4th tensor). BOX_SIZE :
            # #MaxDetection, ANY-TYPE
            # [1] detection_classes (default 2nd tensor).
            # #MaxDetection, ANY-TYPE
            # [2] detection_scores (default 3rd tensor)
            # #MaxDetection, ANY-TYPE
            # [3] num_detection (default 1st tensor). 1, ANY-TYPE

            # bytestrings that are based on float32 must be
            # decoded into float list.

            # boxes
            mem_boxes = buffer.peek_memory(0)
            ret, info_boxes = mem_boxes.map(Gst.MapFlags.READ)
            if ret:
                assert info_boxes.size == (
                    self.BOX_SIZE * self.DETECTION_MAX * 4
                ), "Invalid info_box size"
                decoded_boxes = list(
                    np.frombuffer(info_boxes.data, dtype=np.float32)
                )  # decode bytestrings to float list

            # detections
            mem_detections = buffer.peek_memory(1)
            ret, info_detections = mem_detections.map(Gst.MapFlags.READ)
            if ret:
                assert info_detections.size == (
                    self.DETECTION_MAX * 4
                ), "Invalid info_detection size"
                decoded_detections = list(
                    np.frombuffer(info_detections.data, dtype=np.float32)
                )  # decode bytestrings to float list

            # scores
            mem_scores = buffer.peek_memory(2)
            ret, info_scores = mem_scores.map(Gst.MapFlags.READ)
            if ret:
                assert info_scores.size == (
                    self.DETECTION_MAX * 4
                ), "Invalid info_score size"
                decoded_scores = list(
                    np.frombuffer(info_scores.data, dtype=np.float32)
                )  # decode bytestrings to float list

            # num detection
            mem_num = buffer.peek_memory(3)
            ret, info_num = mem_num.map(Gst.MapFlags.READ)
            if ret:
                assert info_num.size == 4, "Invalid info_num size"
                decoded_num = list(
                    np.frombuffer(info_num.data, dtype=np.float32)
                )  # decode bytestrings to float list

                decoded_boxes, decoded_detections, decoded_scores, int(decoded_num[0])


            if self.display == "None":
                if (GLib.get_monotonic_time() - self.print_time) > 1000000:
                    inference = self.tensor_filter.get_property("latency")
                        "Inference time: "
                        + str(inference / 1000)
                        + " ms ("
                        + "{:5.2f}".format(1 / (inference / 1000000))
                        + " IPS)"
                    self.print_time = GLib.get_monotonic_time()

    def get_detected_objects(self, boxes, detections, scores, num):
        """Pairs boxes with dectected objects"""
        threshold_score = 0.5
        detected = list()

        for i in range(num):
            score = scores[i]
            if score < threshold_score:

            c = detections[i]

            box_offset = self.BOX_SIZE * i
            ymin = boxes[box_offset + 0]
            xmin = boxes[box_offset + 1]
            ymax = boxes[box_offset + 2]
            xmax = boxes[box_offset + 3]

            x = xmin * self.MODEL_WIDTH
            y = ymin * self.MODEL_HEIGHT
            width = (xmax - xmin) * self.MODEL_WIDTH
            height = (ymax - ymin) * self.MODEL_HEIGHT

            obj = {
                "class_id": int(c),
                "x": x,
                "y": y,
                "width": width,
                "height": height,
                "prob": score,


        # update result

        for d in detected:
            if DEBUG:
                print("LABEL           : {}".format(self.tflite_labels[d["class_id"]]))
                print("x               : {}".format(d["x"]))
                print("y               : {}".format(d["y"]))
                print("width           : {}".format(d["width"]))
                print("height          : {}".format(d["height"]))
                print("Confidence Score: {}".format(d["prob"]))

    def prepare_overlay_cb(self, overlay, caps):
        """Store the information from the caps that we are interested in."""
        self.video_caps = caps

    def draw_overlay_cb(self, overlay, context, timestamp, duration):
        """Callback to draw the overlay."""
        if self.video_caps is None or not self.running:
        scale_height = self.VIDEO_HEIGHT / 1080
        scale_width = self.VIDEO_WIDTH / 1920
        scale_text = max(scale_height, scale_width)

        # mutex_lock alternative required
        detected = self.detected_objects
        # mutex_unlock alternative needed

        drawed = 0
            "Sans", cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_BOLD
        context.set_font_size(int(50.0 * scale_text))
        context.set_source_rgb(self.r, self.g, self.b)

        for obj in detected:
            label = self.tflite_labels[obj["class_id"]][:-1]
            x = obj["x"] * self.VIDEO_WIDTH // self.MODEL_WIDTH
            y = obj["y"] * self.VIDEO_HEIGHT // self.MODEL_HEIGHT
            width = obj["width"] * self.VIDEO_WIDTH // self.MODEL_WIDTH
            height = obj["height"] * self.VIDEO_HEIGHT // self.MODEL_HEIGHT

            # draw rectangle
            context.rectangle(x, y, width, height)

            # draw title
            context.move_to(x + 5, y + int(50.0 * scale_text))

            drawed += 1
            if drawed >= self.MAX_OBJECT_DETECTION:

        inference = self.tensor_filter.get_property("latency")
        # Get current framerate and avg. framerate
        output_wayland = self.wayland_sink.get_property("last-message")
        if output_wayland:
            current_text = re.findall(r"current:\s[\d]+[.\d]*", output_wayland)[0]
            self.current_framerate = float(re.findall(r"[\d]+[.\d]*", current_text)[0])

        context.set_font_size(int(25.0 * scale_text))
            int(50 * scale_width), int(self.VIDEO_HEIGHT - (100 * scale_height))
        context.show_text("i.MX NNStreamer Detection Demo")
        if inference == 0:
                int(50 * scale_width), int(self.VIDEO_HEIGHT - (75 * scale_height))
            context.show_text("FPS: ")
                int(50 * scale_width), int(self.VIDEO_HEIGHT - (50 * scale_height))
            context.show_text("IPS: ")
        elif (
            GLib.get_monotonic_time() - self.reload_time
        ) < 100000 and self.refresh_time != -1:
                int(50 * scale_width), int(self.VIDEO_HEIGHT - (75 * scale_height))
                "FPS: {:6.2f} ({:6.2f} ms)".format(
                    self.current_framerate, 1.0 / self.current_framerate * 1000
                int(50 * scale_width), int(self.VIDEO_HEIGHT - (50 * scale_height))
                "IPS: {:6.2f} ({:6.2f} ms)".format(
                    1 / (inference / 1000000), inference / 1000
            self.reload_time = GLib.get_monotonic_time()
            self.refresh_time = self.interval_time
            self.inference = self.tensor_filter.get_property("latency")
                int(50 * scale_width), int(self.VIDEO_HEIGHT - (75 * scale_height))
                "FPS: {:6.2f} ({:6.2f} ms)".format(
                    self.current_framerate, 1.0 / self.current_framerate * 1000
                int(50 * scale_width), int(self.VIDEO_HEIGHT - (50 * scale_height))
                "IPS: {:6.2f} ({:6.2f} ms)".format(
                    1 / (inference / 1000000), inference / 1000
        if self.first_frame:
            context.move_to(int(400 * scale_width), int(600 * scale_height))
            context.set_font_size(int(200.0 * min(scale_width, scale_height)))
            self.first_frame = False

    def on_bus_message(self, bus, message):
        """Callback for message.

        :param bus: pipeline bus
        :param message: message from pipeline
        :return: None
        if message.type == Gst.MessageType.EOS:
            logging.info("received eos message")
        elif message.type == Gst.MessageType.ERROR:
            error, debug = message.parse_error()
            logging.warning("[error] %s : %s", error.message, debug)
        elif message.type == Gst.MessageType.WARNING:
            error, debug = message.parse_warning()
            logging.warning("[warning] %s : %s", error.message, debug)
        elif message.type == Gst.MessageType.STREAM_START:
            logging.info("received start message")
        elif message.type == Gst.MessageType.QOS:
            data_format, processed, dropped = message.parse_qos_stats()
            format_str = Gst.Format.get_name(data_format)
                "[qos] format[%s] processed[%d] dropped[%d]",

    def set_window_title(self, name, title):
        """Set window title for X11.

        :param name: GstXImageasink element name
        :param title: window title
        :return: None
        element = self.pipeline.get_by_name(name)
        if element is not None:
            pad = element.get_static_pad("sink")
            if pad is not None:
                tags = Gst.TagList.new_empty()
                tags.add_value(Gst.TagMergeMode.APPEND, "title", title)

if __name__ == "__main__":
    if (
        len(sys.argv) != 7
        and len(sys.argv) != 5
        and len(sys.argv) != 9
        and len(sys.argv) != 12
        and len(sys.argv) != 6
            "Usage: python3 nndetection.py <dev/video*/video file>"
            + " <NPU/CPU> <model file> <label file>"
    # Get platform
    platform = os.uname().nodename
    if len(sys.argv) == 7:
        example = ObjectDetection(
    if len(sys.argv) == 5:
        example = ObjectDetection(
            platform, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4]
    if len(sys.argv) == 6:
        example = ObjectDetection(
            platform, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5]
    if len(sys.argv) == 9:
        example = ObjectDetection(
    if len(sys.argv) == 12:
        example = ObjectDetection(


        self.pipeline = Gst.parse_launch(
            'v4l2src name=cam_src ! videoconvert ! videoscale ! '
            'video/x-raw,width=640,height=480,format=RGB ! tee name=t_raw '
            't_raw. ! queue leaky=2 max-size-buffers=2 ! videoscale ! video/x-raw,width=300,height=300 ! tensor_converter ! '
            'tensor_transform mode=arithmetic option=typecast:float32,add:-127.5,div:127.5 ! '
            'tensor_filter framework=tensorflow-lite model=' + self.tflite_model + ' ! '
            'tensor_decoder mode=bounding_boxes option1=mobilenet-ssd option2='
            + self.tflite_label + ' option3=' + self.tflite_box_prior + ' option4=640:480 option5=300:300 !'
            'compositor name=mix sink_0::zorder=2 sink_1::zorder=1 ! videoconvert ! ximagesink '
            't_raw. ! queue leaky=2 max-size-buffers=10 ! mix. '

[링크 : https://github.com/nnstreamer/nnstreamer-example/blob/main/native/example_object_detection_tensorflow_lite/nnstreamer_example_object_detection_tflite.py]


gst_launch_cmdline 를 출력해보니 아래와 같이 gstreamer 파이프라인이 나온다.

v4l2src name=cam_src device=/dev/video3 ! imxvideoconvert_g2d ! video/x-raw,width=1920,height=1080,framerate=30/1,format=BGRx ! tee name=t t. ! imxvideoconvert_g2d !  video/x-raw,width=300,height=300, ! queue max-size-buffers=2 leaky=2 ! videoconvert ! video/x-raw,format=RGB ! tensor_converter ! tensor_filter framework=tensorflow-lite model=/home/root/.cache/gopoint/mobilenet_ssd_v2_coco_quant_postprocess.tflite accelerator=true:npu custom=Delegate:External,ExtDelegateLib:libvx_delegate.so silent=FALSE name=tensor_filter latency=1 ! tensor_sink name=tensor_sink t. ! imxvideoconvert_g2d ! cairooverlay name=tensor_res ! queue max-size-buffers=2 leaky=2 ! fpsdisplaysink name=img_tensor text-overlay=false video-sink=waylandsink sync=false


보기어려우니 엔터로 구분

v4l2src name=cam_src device=/dev/video3 !
imxvideoconvert_g2d !
video/x-raw,width=1920,height=1080,framerate=30/1,format=BGRx !
tee name=t t. !
imxvideoconvert_g2d !
video/x-raw,width=300,height=300, !
queue max-size-buffers=2 leaky=2 !
videoconvert !
video/x-raw,format=RGB !
tensor_converter !
tensor_filter framework=tensorflow-lite model=/home/root/.cache/gopoint/mobilenet_ssd_v2_coco_quant_postprocess.tflite accelerator=true:npu custom=Delegate:External,ExtDelegateLib:libvx_delegate.so silent=FALSE name=tensor_filter latency=1 !
tensor_sink name=tensor_sink t. !
imxvideoconvert_g2d !
cairooverlay name=tensor_res !
queue max-size-buffers=2 leaky=2 !
fpsdisplaysink name=img_tensor text-overlay=false video-sink=waylandsink sync=false




# cd /home/root/.nxp-demo-experience/scripts/machine_learning
# python3 nndetection.py /dev/video3 NPU /home/root/.cache/gopoint/mobilenet_ssd_v2_coco_quant_postprocess.tflite /home/root/.cache/gopoint/coco_labels.txt


gst-launch 로도 실행은 되는데 callback 처리가 안되서 overlay가 출력이 안되어 동일한 화면을 보여주진 않는다.

gst-launch-1.0 v4l2src name=cam_src device=/dev/video3 ! imxvideoconvert_g2d ! video/x-raw,width=1920,height=1080,framerate=30/1,format=BGRx ! tee name=t t. ! imxvideoconvert_g2d ! video/x-raw,width=300,height=300, ! queue max-size-buffers=2 leaky=2 ! videoconvert ! video/x-raw,format=RGB ! tensor_converter ! tensor_filter framework=tensorflow-lite model=/home/root/.cache/gopoint/mobilenet_ssd_v2_coco_quant_postprocess.tflite accelerator=true:npu custom=Delegate:External,ExtDelegateLib:libvx_delegate.so silent=FALSE name=tensor_filter latency=1 !  tensor_sink name=tensor_sink t. ! imxvideoconvert_g2d ! cairooverlay name=tensor_res ! queue max-size-buffers=2 leaky=2 ! fpsdisplaysink name=img_tensor text-overlay=false video-sink=waylandsink sync=false