문서를 찾다가 지쳐서 걍 실행하고 인자를 보는걸로..
root 3019 925 72 06:38 ? 00:00:25 /usr/bin/python3 /home/root/.nxp-demo-experience/scripts/machine_learning/MLDemoLauncher.py detect |
root@imx8mpevk:~/.nxp-demo-experience/scripts/machine_learning# cat MLDemoLauncher.py #!/usr/bin/env python3 """ Copyright 2021-2023 NXP SPDX-License-Identifier: BSD-2-Clause This script launches the NNStreamer ML Demos using a UI to pick settings. """ import gi import os import sys import glob from gi.repository import Gtk, GLib, Gio gi.require_version("Gtk", "3.0") sys.path.append("/home/root/.nxp-demo-experience/scripts/") import utils class MLLaunch(Gtk.Window): """The GUI window for the ML demo launcher""" def __init__(self, demo): """Creates the UI window""" # Initialization self.demo = demo super().__init__(title=demo) self.set_default_size(450, 200) self.set_resizable(False) # Get platform self.platform = os.uname().nodename # OpenVX graph caching is not available on i.MX 8QuadMax platform. if self.platform != "imx8qmmek" : os.environ["VIV_VX_CACHE_BINARY_GRAPH_DIR"] = "/home/root/.cache/gopoint" os.environ["VIV_VX_ENABLE_CACHE_GRAPH_BINARY"] = "1" # Get widget properties devices = [] if self.demo != "brand" and self.demo != "selfie_nn": if self.platform != "imx93evk": devices.append("Example Video") for device in glob.glob("/dev/video*"): devices.append(device) backends_available = ["CPU"] if ( os.path.exists("/usr/lib/libvx_delegate.so") and self.demo != "pose" and self.demo != "selfie_nn" ): backends_available.insert(1, "GPU") if os.path.exists("/usr/lib/libneuralnetworks.so") and self.demo != "brand" and self.platform != "imx8qmmek": backends_available.insert(0, "NPU") if os.path.exists("/usr/lib/libethosu_delegate.so"): backends_available.insert(0, "NPU") backends_available.pop() displays_available = ["Weston"] colors_available = ["Red", "Green", "Blue", "Black", "White"] demo_modes_available = ["Background Substitution", "Segmentation Mask"] # Create widgets main_grid = Gtk.Grid.new() device_label = Gtk.Label.new("Source") self.device_combo = Gtk.ComboBoxText() backend_label = Gtk.Label.new("Backend") self.backend_combo = Gtk.ComboBoxText() self.display_combo = Gtk.ComboBoxText() self.launch_button = Gtk.Button.new_with_label("Run") self.status_bar = Gtk.Label.new() header = Gtk.HeaderBar() quit_button = Gtk.Button() quit_icon = Gio.ThemedIcon(name="process-stop-symbolic") quit_image = Gtk.Image.new_from_gicon(quit_icon, Gtk.IconSize.BUTTON) separator = Gtk.Separator.new(0) time_title_label = Gtk.Label.new("Video Refresh") self.time_label = Gtk.Label.new("--.-- ms") self.fps_label = Gtk.Label.new("-- FPS") inference_title_label = Gtk.Label.new("Inference Time") self.inference_label = Gtk.Label.new("--.-- ms") self.ips_label = Gtk.Label.new("-- IPS") if self.demo != "selfie_nn": self.width_entry = self.r_scale = Gtk.Scale.new_with_range( Gtk.Orientation.HORIZONTAL, 300, 1920, 2 ) self.height_entry = self.r_scale = Gtk.Scale.new_with_range( Gtk.Orientation.HORIZONTAL, 300, 1080, 2 ) self.width_label = Gtk.Label.new("Height") self.height_label = Gtk.Label.new("Width") self.color_label = Gtk.Label.new("Label Color") else: self.color_label = Gtk.Label.new("Text Color") self.demo_mode = Gtk.Label.new("Demo Mode") self.mode_combo = Gtk.ComboBoxText() self.color_combo = Gtk.ComboBoxText() # Organize widgets self.add(main_grid) self.set_titlebar(header) quit_button.add(quit_image) header.pack_end(quit_button) main_grid.set_row_spacing(10) main_grid.set_border_width(10) main_grid.attach(device_label, 0, 1, 2, 1) device_label.set_hexpand(True) main_grid.attach(backend_label, 0, 2, 2, 1) # main_grid.attach(display_label, 0, 3, 2, 1) if self.demo != "selfie_nn": main_grid.attach(self.width_label, 0, 4, 2, 1) main_grid.attach(self.height_label, 0, 5, 2, 1) main_grid.attach(self.color_label, 0, 6, 2, 1) else: main_grid.attach(self.demo_mode, 0, 4, 2, 1) main_grid.attach(self.color_label, 0, 5, 2, 1) main_grid.attach(self.device_combo, 2, 1, 2, 1) self.device_combo.set_hexpand(True) main_grid.attach(self.backend_combo, 2, 2, 2, 1) # main_grid.attach(self.display_combo, 2, 3, 2, 1) if self.demo != "selfie_nn": main_grid.attach(self.width_entry, 2, 4, 2, 1) main_grid.attach(self.height_entry, 2, 5, 2, 1) main_grid.attach(self.color_combo, 2, 6, 2, 1) else: main_grid.attach(self.mode_combo, 2, 4, 2, 1) main_grid.attach(self.color_combo, 2, 5, 2, 1) main_grid.attach(self.launch_button, 0, 7, 4, 1) main_grid.attach(self.status_bar, 0, 8, 4, 1) main_grid.attach(separator, 0, 9, 4, 1) main_grid.attach(time_title_label, 0, 10, 2, 1) main_grid.attach(self.time_label, 0, 11, 1, 1) main_grid.attach(self.fps_label, 1, 11, 1, 1) main_grid.attach(inference_title_label, 2, 10, 2, 1) main_grid.attach(self.inference_label, 2, 11, 1, 1) main_grid.attach(self.ips_label, 3, 11, 1, 1) # Configure widgets for device in devices: self.device_combo.append_text(device) for backend in backends_available: self.backend_combo.append_text(backend) for display in displays_available: self.display_combo.append_text(display) for color in colors_available: self.color_combo.append_text(color) if self.demo == "selfie_nn": for mode in demo_modes_available: self.mode_combo.append_text(mode) self.device_combo.set_active(0) self.backend_combo.set_active(0) self.display_combo.set_active(0) self.color_combo.set_active(0) if self.demo != "selfie_nn": self.width_entry.set_value(1920) self.height_entry.set_value(1080) self.width_entry.set_sensitive(False) self.height_entry.set_sensitive(False) else: self.mode_combo.set_active(0) self.device_combo.connect("changed", self.on_source_change) self.launch_button.connect("clicked", self.start) quit_button.connect("clicked", exit) if self.demo == "detect": header.set_title("Detection Demo") elif self.demo == "id": header.set_title("Classification Demo") elif self.demo == "pose": header.set_title("Pose Demo") elif self.demo == "brand": header.set_title("Brand Demo") elif self.demo == "selfie_nn": header.set_title("Selfie Segmenter Demo") else: header.set_title("NNStreamer Demo") header.set_subtitle("NNStreamer Examples") def start(self, button): """Starts the ML Demo with selected settings""" self.update_time = GLib.get_monotonic_time() self.launch_button.set_sensitive(False) if self.color_combo.get_active_text() == "Red": r = 1 g = 0 b = 0 elif self.color_combo.get_active_text() == "Blue": r = 0 g = 0 b = 1 elif self.color_combo.get_active_text() == "Green": r = 0 g = 1 b = 0 elif self.color_combo.get_active_text() == "Black": r = 0 g = 0 b = 0 elif self.color_combo.get_active_text() == "White": r = 1 g = 1 b = 1 else: r = 1 g = 0 b = 0 if self.demo == "detect": if self.platform == "imx93evk": model = utils.download_file("mobilenet_ssd_v2_coco_quant_postprocess_vela.tflite") else: model = utils.download_file("mobilenet_ssd_v2_coco_quant_postprocess.tflite") labels = utils.download_file("coco_labels.txt") if self.device_combo.get_active_text() == "Example Video": device = utils.download_file("detect_example.mov") else: device = self.device_combo.get_active_text() if model == -1 or model == -2 or model == -3: if self.platform == "imx93evk": error = "mobilenet_ssd_v2_coco_quant_postprocess_vela.tflite" else: error = "mobilenet_ssd_v2_coco_quant_postprocess.tflite" elif labels == -1 or labels == -2 or labels == -3: error = "coco_labels.txt" elif device == -1 or device == -2 or device == -3: error = "detect_example.mov" if self.demo == "id": if self.platform == "imx93evk": model = utils.download_file("mobilenet_v1_1.0_224_quant_vela.tflite") else: model = utils.download_file("mobilenet_v1_1.0_224_quant.tflite") labels = utils.download_file("1_1.0_224_labels.txt") if self.device_combo.get_active_text() == "Example Video": device = utils.download_file("id_example.mov") else: device = self.device_combo.get_active_text() if model == -1 or model == -2 or model == -3: if self.platform == "imx93evk": error = "mobilenet_v1_1.0_224_quant_vela.tflite" else: error = "mobilenet_v1_1.0_224_quant.tflite" elif labels == -1 or labels == -2 or labels == -3: error = "1_1.0_224_labels.txt" elif device == -1 or device == -2 or device == -3: error = "id_example.mov" if self.demo == "pose": model = utils.download_file("posenet_resnet50_uint8_float32_quant.tflite") labels = utils.download_file("key_point_labels.txt") if self.device_combo.get_active_text() == "Example Video": device = utils.download_file("pose_example.mov") else: device = self.device_combo.get_active_text() if model == -1 or model == -2 or model == -3: error = "posenet_resnet50_uint8_float32_quant.tflite" elif labels == -1 or labels == -2 or labels == -3: error = "key_point_labels.txt" elif device == -1 or device == -2 or device == -3: error = "pose_example.mov" if self.demo == "brand": model = utils.download_file("brand_model.tflite") labels = utils.download_file("brand_labels.txt") if self.device_combo.get_active_text() == "Example Video": device = utils.download_file("brand_example.mov") else: device = self.device_combo.get_active_text() if model == -1 or model == -2 or model == -3: error = "brand_model.tflite" elif labels == -1 or labels == -2 or labels == -3: error = "brand_labels.txt" elif device == -1 or device == -2 or device == -3: error = "brand_example.mov" if self.demo == "selfie_nn": if self.platform == "imx93evk": model = utils.download_file( "selfie_segmenter_landscape_int8_vela.tflite" ) else: model = utils.download_file("selfie_segmenter_int8.tflite") # Labels refer to background img if self.platform == "imx93evk": labels = utils.download_file("bg_image_landscape.jpg") else: labels = utils.download_file("bg_image.jpg") if self.device_combo.get_active_text() == "Example Video": device = utils.download_file("selfie_example.mov") else: device = self.device_combo.get_active_text() if model == -1 or model == -2 or model == -3: if self.platform == "imx93evk": error = "selfie_segmenter_landscape_int8_vela.tflite" else: error = "selfie_segmenter_int8.tflite" elif labels == -1 or labels == -2 or labels == -3: if self.platform == "imx93evk": error = "bg_image_landscape.jpg" else: error = "bg_image.jpg" elif device == -1 or device == -2 or device == -3: error = "selfie_example.mov" if self.mode_combo.get_active_text() == "Background Substitution": set_mode = 0 else: set_mode = 1 if model == -1 or labels == -1 or device == -1: """ dialog = Gtk.MessageDialog( transient_for=self, flags=0, message_type=Gtk.MessageType.ERROR, buttons=Gtk.ButtonsType.CANCEL, text="Cannot find files! The file that you requested" + " does not have any metadata that is related to it. " + "Please see /home/root/.nxp-demo-experience/downloads.txt" + " to see if the requested file exists! \n \n Cannot find:" + error) dialog.run() dialog.destroy() """ self.status_bar.set_text("Cannot find files!") self.launch_button.set_sensitive(True) return if model == -2 or labels == -2 or device == -2: """ dialog = Gtk.MessageDialog( transient_for=self, flags=0, message_type=Gtk.MessageType.ERROR, buttons=Gtk.ButtonsType.CANCEL, text="Cannot download files! The URL used to download the" + " file cannot be reached. If you are connected to the " + "internet, please check the /home/root/.nxp-demo-experience" + "/downloads.txt for the URL. For some regions, " + "these sites may be blocked. To install these manually," + " please go to the file listed above and provide the " + "path to the file in \"PATH\" \n \n Cannot download " + error) dialog.run() dialog.destroy() """ self.status_bar.set_text("Download failed!") self.launch_button.set_sensitive(True) return if model == -3 or labels == -3 or device == -4: """ dialog = Gtk.MessageDialog( transient_for=self, flags=0, message_type=Gtk.MessageType.ERROR, buttons=Gtk.ButtonsType.CANCEL, text="Invalid files! The files where not what we expected." + "If you are SURE that the files are correct, delete " + "the \"SHA\" value in /home/root/.nxp-demo-experience" + "/downloads.txt to bypass the SHA check. \n \n Bad SHA for " + error) dialog.run() dialog.destroy() """ self.status_bar.set_text("Downloaded bad file!") self.launch_button.set_sensitive(True) return if self.demo == "detect": import nndetection example = nndetection.ObjectDetection( self.platform, device, self.backend_combo.get_active_text(), model, labels, self.display_combo.get_active_text(), self.update_stats, self.width_entry.get_value(), self.height_entry.get_value(), r, g, b, ) example.run() if self.demo == "id": import nnclassification example = nnclassification.NNStreamerExample( self.platform, device, self.backend_combo.get_active_text(), model, labels, self.display_combo.get_active_text(), self.update_stats, self.width_entry.get_value(), self.height_entry.get_value(), r, g, b, ) example.run_example() if self.demo == "pose": import nnpose example = nnpose.NNStreamerExample( self.platform, device, self.backend_combo.get_active_text(), model, labels, self.display_combo.get_active_text(), self.update_stats, self.width_entry.get_value(), self.height_entry.get_value(), r, g, b, ) example.run_example() if self.demo == "brand": import nnbrand example = nnbrand.NNStreamerExample( self.platform, device, self.backend_combo.get_active_text(), model, labels, self.display_combo.get_active_text(), self.update_stats, self.width_entry.get_value(), self.height_entry.get_value(), r, g, b, ) example.run_example() if self.demo == "selfie_nn": import selfie_segmenter example = selfie_segmenter.SelfieSegmenter( self.platform, device, self.backend_combo.get_active_text(), model, labels, self.update_stats, set_mode, r, g, b, ) example.run() self.launch_button.set_sensitive(True) def update_stats(self, time): """Callback used the update stats in GUI""" interval_time = (GLib.get_monotonic_time() - self.update_time) / 1000000 if interval_time > 1: refresh_time = time.interval_time inference_time = time.tensor_filter.get_property("latency") if refresh_time != 0 and inference_time != 0: # Print pipeline information if self.demo == "selfie_nn" or self.demo == "id" or self.demo == "detect": self.time_label.set_text( "{:12.2f} ms".format(1.0 / time.current_framerate * 1000.0) ) self.fps_label.set_text( "{:12.2f} FPS".format(time.current_framerate) ) else: self.time_label.set_text("{:12.2f} ms".format(refresh_time / 1000)) self.fps_label.set_text( "{:12.2f} FPS".format(1 / (refresh_time / 1000000)) ) # Print inference information self.inference_label.set_text( "{:12.2f} ms".format(inference_time / 1000) ) self.ips_label.set_text( "{:12.2f} FPS".format(1 / (inference_time / 1000000)) ) self.update_time = GLib.get_monotonic_time() return True def on_source_change(self, widget): """Callback to lock sliders""" if self.demo != "selfie_nn": if self.device_combo.get_active_text() == "Example Video": self.width_entry.set_value(1920) self.height_entry.set_value(1080) self.width_entry.set_sensitive(False) self.height_entry.set_sensitive(False) else: self.width_entry.set_sensitive(True) self.height_entry.set_sensitive(True) if __name__ == "__main__": if ( len(sys.argv) != 2 and sys.argv[1] != "detect" and sys.argv[1] != "id" and sys.argv[1] != "pose" and sys.argv[1] != "selfie_nn" ): print("Demos available: detect, id, pose, selfie_nn") else: win = MLLaunch(sys.argv[1]) win.connect("destroy", Gtk.main_quit) win.show_all() Gtk.main() |
아래가 실행되는 녀석인데 nndetection을 import 하니까 그걸 따라가서 보는 중.
그나저나 LGPL 이면 그냥 공개해도 되려나?
root@imx8mpevk:~/.nxp-demo-experience/scripts/machine_learning# find / -name nndetection.py /run/media/root-mmcblk2p2/home/root/.nxp-demo-experience/scripts/machine_learning/nndetection.py /home/root/.nxp-demo-experience/scripts/machine_learning/nndetection.py root@imx8mpevk:~/.nxp-demo-experience/scripts/machine_learning# cat /home/root/.nxp-demo-experience/scripts/machine_learning/nndetection.py #!/usr/bin/env python3 """ Copyright SSAFY Team 1 <jangjongha.sw@gmail.com> Copyright 2021-2023 NXP SPDX-License-Identifier: LGPL-2.1-only Original Source: https://github.com/nnstreamer/nnstreamer-example This demo shows how you can use the NNStreamer to detect objects. From the original source, this was modified to better work with the a GUI and to get better performance on the i.MX 8M Plus and i.MX93. """ import os import sys import gi import re import logging import numpy as np import cairo gi.require_version("Gst", "1.0") gi.require_foreign("cairo") from gi.repository import Gst, GObject, GLib DEBUG = False class ObjectDetection: """The class that manages the demo""" def __init__( self, platform, device, backend, model, labels, display="Weston", callback=None, width=1920, height=1080, r=1, g=0, b=0, ): """Creates an instance of the demo Arguments: device -- What camera or video file to use backend -- Whether to use NPU or CPU model -- the path to the model labels -- the path to the labels display -- Whether to use X11 or Weston callback -- Callback to pass stats to width -- Width of output height -- Height of output r -- Red value for labels g -- Green value for labels b -- Blue value for labels """ self.loop = None self.pipeline = None self.running = False self.video_caps = None self.first_frame = True self.BOX_SIZE = 4 self.LABEL_SIZE = 91 self.DETECTION_MAX = 20 self.MAX_OBJECT_DETECTION = 20 self.Y_SCALE = 10.0 self.X_SCALE = 10.0 self.H_SCALE = 5.0 self.W_SCALE = 5.0 self.VIDEO_WIDTH = width self.VIDEO_HEIGHT = height self.MODEL_WIDTH = 300 self.MODEL_HEIGHT = 300 self.tflite_model = model self.label_path = labels self.device = device self.backend = backend self.display = display self.tflite_labels = [] self.detected_objects = [] self.callback = callback self.r = r self.b = b self.g = g self.platform = platform self.current_framerate = 1000 # Define PXP or GPU2D converter if self.platform == "imx93evk": self.nxp_converter = "imxvideoconvert_pxp " else: self.nxp_converter = "imxvideoconvert_g2d " if not self.tflite_init(): raise Exception Gst.init(None) def run(self): """Starts pipeline and run demo""" if self.backend == "CPU": if self.platform == "imx93evk": backend = "true:cpu custom=NumThreads:2" else: backend = "true:cpu custom=NumThreads:4" elif self.backend == "GPU": os.environ["USE_GPU_INFERENCE"] = "1" backend = ( "true:gpu custom=Delegate:External," "ExtDelegateLib:libvx_delegate.so" ) else: if self.platform == "imx93evk": backend = ( "true:npu custom=Delegate:External," "ExtDelegateLib:libethosu_delegate.so" ) else: os.environ["USE_GPU_INFERENCE"] = "0" backend = ( "true:npu custom=Delegate:External," "ExtDelegateLib:libvx_delegate.so" ) if self.display == "X11": display = "ximagesink name=img_tensor " elif self.display == "None": self.print_time = GLib.get_monotonic_time() display = "fakesink " else: display = "fpsdisplaysink name=img_tensor text-overlay=false video-sink=waylandsink sync=false" # main loop self.loop = GLib.MainLoop() self.old_time = GLib.get_monotonic_time() self.update_time = GLib.get_monotonic_time() self.reload_time = -1 self.interval_time = 999999 # Create decoder for video file if self.platform == "imx8qmmek": decoder = "h264parse ! v4l2h264dec " else: decoder = "vpudec " if "/dev/video" in self.device: gst_launch_cmdline = "v4l2src name=cam_src device=" + self.device gst_launch_cmdline += " ! " + self.nxp_converter + "! video/x-raw,width=" gst_launch_cmdline += str(int(self.VIDEO_WIDTH)) + ",height=" gst_launch_cmdline += str(int(self.VIDEO_HEIGHT)) gst_launch_cmdline += ",framerate=30/1,format=BGRx ! tee name=t" else: gst_launch_cmdline = "filesrc location=" + self.device gst_launch_cmdline += " ! qtdemux ! " + decoder + "! tee name=t" gst_launch_cmdline += " t. ! " + self.nxp_converter + "! video/x-raw," gst_launch_cmdline += "width={:d},".format(self.MODEL_WIDTH) gst_launch_cmdline += "height={:d},".format(self.MODEL_HEIGHT) gst_launch_cmdline += " ! queue max-size-buffers=2 leaky=2 ! " gst_launch_cmdline += "videoconvert ! video/x-raw,format=RGB !" gst_launch_cmdline += " tensor_converter ! tensor_filter" gst_launch_cmdline += " framework=tensorflow-lite model=" gst_launch_cmdline += self.tflite_model + " accelerator=" + backend gst_launch_cmdline += " silent=FALSE name=tensor_filter latency=1 ! " gst_launch_cmdline += "tensor_sink name=tensor_sink t. ! " gst_launch_cmdline += self.nxp_converter + "! " gst_launch_cmdline += "cairooverlay name=tensor_res ! " gst_launch_cmdline += "queue max-size-buffers=2 leaky=2 ! " gst_launch_cmdline += display self.pipeline = Gst.parse_launch(gst_launch_cmdline) # bus and message callback bus = self.pipeline.get_bus() bus.add_signal_watch() bus.connect("message", self.on_bus_message) self.tensor_filter = self.pipeline.get_by_name("tensor_filter") self.wayland_sink = self.pipeline.get_by_name("img_tensor") # tensor sink signal : new data callback tensor_sink = self.pipeline.get_by_name("tensor_sink") tensor_sink.connect("new-data", self.new_data_cb) tensor_res = self.pipeline.get_by_name("tensor_res") tensor_res.connect("draw", self.draw_overlay_cb) tensor_res.connect("caps-changed", self.prepare_overlay_cb) if self.callback is not None: GObject.timeout_add(500, self.callback, self) # start pipeline self.pipeline.set_state(Gst.State.PLAYING) self.running = True self.set_window_title("img_tensor", "NNStreamer Object Detection Example") # run main loop self.loop.run() # quit when received eos or error message self.running = False self.pipeline.set_state(Gst.State.NULL) bus.remove_signal_watch() def tflite_init(self): """ :return: True if successfully initialized """ if not os.path.exists(self.tflite_model): logging.error("cannot find tflite model [%s]", self.tflite_model) return False label_path = self.label_path try: with open(label_path, "r") as label_file: for line in label_file.readlines(): if line[0].isdigit(): while str(len(self.tflite_labels)) not in line: self.tflite_labels.append("Invalid") self.tflite_labels.append(line[line.find(" ") + 1 :]) else: self.tflite_labels.append(line) except FileNotFoundError: logging.error("cannot find tflite label [%s]", label_path) return False logging.info("finished to load labels, total [%d]", len(self.tflite_labels)) return True # @brief Callback for tensor sink signal. def new_data_cb(self, sink, buffer): """Callback for tensor sink signal. :param sink: tensor sink element :param buffer: buffer from element :return: None """ if self.running: new_time = GLib.get_monotonic_time() self.interval_time = new_time - self.old_time self.old_time = new_time if buffer.n_memory() != 4: return False # tensor type is float32. # LOCATIONS_IDX:CLASSES_IDX:SCORES_IDX:NUM_DETECTION_IDX # 4:20:1:1\,20:1:1:1\,20:1:1:1\,1:1:1:1 # [0] detection_boxes (default 4th tensor). BOX_SIZE : # #MaxDetection, ANY-TYPE # [1] detection_classes (default 2nd tensor). # #MaxDetection, ANY-TYPE # [2] detection_scores (default 3rd tensor) # #MaxDetection, ANY-TYPE # [3] num_detection (default 1st tensor). 1, ANY-TYPE # bytestrings that are based on float32 must be # decoded into float list. # boxes mem_boxes = buffer.peek_memory(0) ret, info_boxes = mem_boxes.map(Gst.MapFlags.READ) if ret: assert info_boxes.size == ( self.BOX_SIZE * self.DETECTION_MAX * 4 ), "Invalid info_box size" decoded_boxes = list( np.frombuffer(info_boxes.data, dtype=np.float32) ) # decode bytestrings to float list # detections mem_detections = buffer.peek_memory(1) ret, info_detections = mem_detections.map(Gst.MapFlags.READ) if ret: assert info_detections.size == ( self.DETECTION_MAX * 4 ), "Invalid info_detection size" decoded_detections = list( np.frombuffer(info_detections.data, dtype=np.float32) ) # decode bytestrings to float list # scores mem_scores = buffer.peek_memory(2) ret, info_scores = mem_scores.map(Gst.MapFlags.READ) if ret: assert info_scores.size == ( self.DETECTION_MAX * 4 ), "Invalid info_score size" decoded_scores = list( np.frombuffer(info_scores.data, dtype=np.float32) ) # decode bytestrings to float list # num detection mem_num = buffer.peek_memory(3) ret, info_num = mem_num.map(Gst.MapFlags.READ) if ret: assert info_num.size == 4, "Invalid info_num size" decoded_num = list( np.frombuffer(info_num.data, dtype=np.float32) ) # decode bytestrings to float list self.get_detected_objects( decoded_boxes, decoded_detections, decoded_scores, int(decoded_num[0]) ) mem_boxes.unmap(info_boxes) mem_detections.unmap(info_detections) mem_scores.unmap(info_scores) mem_num.unmap(info_num) if self.display == "None": if (GLib.get_monotonic_time() - self.print_time) > 1000000: inference = self.tensor_filter.get_property("latency") print( "Inference time: " + str(inference / 1000) + " ms (" + "{:5.2f}".format(1 / (inference / 1000000)) + " IPS)" ) self.print_time = GLib.get_monotonic_time() def get_detected_objects(self, boxes, detections, scores, num): """Pairs boxes with dectected objects""" threshold_score = 0.5 detected = list() for i in range(num): score = scores[i] if score < threshold_score: continue c = detections[i] box_offset = self.BOX_SIZE * i ymin = boxes[box_offset + 0] xmin = boxes[box_offset + 1] ymax = boxes[box_offset + 2] xmax = boxes[box_offset + 3] x = xmin * self.MODEL_WIDTH y = ymin * self.MODEL_HEIGHT width = (xmax - xmin) * self.MODEL_WIDTH height = (ymax - ymin) * self.MODEL_HEIGHT obj = { "class_id": int(c), "x": x, "y": y, "width": width, "height": height, "prob": score, } detected.append(obj) # update result self.detected_objects.clear() for d in detected: self.detected_objects.append(d) if DEBUG: print("==============================") print("LABEL : {}".format(self.tflite_labels[d["class_id"]])) print("x : {}".format(d["x"])) print("y : {}".format(d["y"])) print("width : {}".format(d["width"])) print("height : {}".format(d["height"])) print("Confidence Score: {}".format(d["prob"])) def prepare_overlay_cb(self, overlay, caps): """Store the information from the caps that we are interested in.""" self.video_caps = caps def draw_overlay_cb(self, overlay, context, timestamp, duration): """Callback to draw the overlay.""" if self.video_caps is None or not self.running: return scale_height = self.VIDEO_HEIGHT / 1080 scale_width = self.VIDEO_WIDTH / 1920 scale_text = max(scale_height, scale_width) # mutex_lock alternative required detected = self.detected_objects # mutex_unlock alternative needed drawed = 0 context.select_font_face( "Sans", cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_BOLD ) context.set_font_size(int(50.0 * scale_text)) context.set_source_rgb(self.r, self.g, self.b) for obj in detected: label = self.tflite_labels[obj["class_id"]][:-1] x = obj["x"] * self.VIDEO_WIDTH // self.MODEL_WIDTH y = obj["y"] * self.VIDEO_HEIGHT // self.MODEL_HEIGHT width = obj["width"] * self.VIDEO_WIDTH // self.MODEL_WIDTH height = obj["height"] * self.VIDEO_HEIGHT // self.MODEL_HEIGHT # draw rectangle context.rectangle(x, y, width, height) context.set_line_width(3) context.stroke() # draw title context.move_to(x + 5, y + int(50.0 * scale_text)) context.show_text(label) drawed += 1 if drawed >= self.MAX_OBJECT_DETECTION: break inference = self.tensor_filter.get_property("latency") # Get current framerate and avg. framerate output_wayland = self.wayland_sink.get_property("last-message") if output_wayland: current_text = re.findall(r"current:\s[\d]+[.\d]*", output_wayland)[0] self.current_framerate = float(re.findall(r"[\d]+[.\d]*", current_text)[0]) context.set_font_size(int(25.0 * scale_text)) context.move_to( int(50 * scale_width), int(self.VIDEO_HEIGHT - (100 * scale_height)) ) context.show_text("i.MX NNStreamer Detection Demo") if inference == 0: context.move_to( int(50 * scale_width), int(self.VIDEO_HEIGHT - (75 * scale_height)) ) context.show_text("FPS: ") context.move_to( int(50 * scale_width), int(self.VIDEO_HEIGHT - (50 * scale_height)) ) context.show_text("IPS: ") elif ( GLib.get_monotonic_time() - self.reload_time ) < 100000 and self.refresh_time != -1: context.move_to( int(50 * scale_width), int(self.VIDEO_HEIGHT - (75 * scale_height)) ) context.show_text( "FPS: {:6.2f} ({:6.2f} ms)".format( self.current_framerate, 1.0 / self.current_framerate * 1000 ) ) context.move_to( int(50 * scale_width), int(self.VIDEO_HEIGHT - (50 * scale_height)) ) context.show_text( "IPS: {:6.2f} ({:6.2f} ms)".format( 1 / (inference / 1000000), inference / 1000 ) ) else: self.reload_time = GLib.get_monotonic_time() self.refresh_time = self.interval_time self.inference = self.tensor_filter.get_property("latency") context.move_to( int(50 * scale_width), int(self.VIDEO_HEIGHT - (75 * scale_height)) ) context.show_text( "FPS: {:6.2f} ({:6.2f} ms)".format( self.current_framerate, 1.0 / self.current_framerate * 1000 ) ) context.move_to( int(50 * scale_width), int(self.VIDEO_HEIGHT - (50 * scale_height)) ) context.show_text( "IPS: {:6.2f} ({:6.2f} ms)".format( 1 / (inference / 1000000), inference / 1000 ) ) if self.first_frame: context.move_to(int(400 * scale_width), int(600 * scale_height)) context.set_font_size(int(200.0 * min(scale_width, scale_height))) context.show_text("Loading...") self.first_frame = False context.fill() def on_bus_message(self, bus, message): """Callback for message. :param bus: pipeline bus :param message: message from pipeline :return: None """ if message.type == Gst.MessageType.EOS: logging.info("received eos message") self.loop.quit() elif message.type == Gst.MessageType.ERROR: error, debug = message.parse_error() logging.warning("[error] %s : %s", error.message, debug) self.loop.quit() elif message.type == Gst.MessageType.WARNING: error, debug = message.parse_warning() logging.warning("[warning] %s : %s", error.message, debug) elif message.type == Gst.MessageType.STREAM_START: logging.info("received start message") elif message.type == Gst.MessageType.QOS: data_format, processed, dropped = message.parse_qos_stats() format_str = Gst.Format.get_name(data_format) logging.debug( "[qos] format[%s] processed[%d] dropped[%d]", format_str, processed, dropped, ) def set_window_title(self, name, title): """Set window title for X11. :param name: GstXImageasink element name :param title: window title :return: None """ element = self.pipeline.get_by_name(name) if element is not None: pad = element.get_static_pad("sink") if pad is not None: tags = Gst.TagList.new_empty() tags.add_value(Gst.TagMergeMode.APPEND, "title", title) pad.send_event(Gst.Event.new_tag(tags)) if __name__ == "__main__": if ( len(sys.argv) != 7 and len(sys.argv) != 5 and len(sys.argv) != 9 and len(sys.argv) != 12 and len(sys.argv) != 6 ): print( "Usage: python3 nndetection.py <dev/video*/video file>" + " <NPU/CPU> <model file> <label file>" ) exit() # Get platform platform = os.uname().nodename if len(sys.argv) == 7: example = ObjectDetection( platform, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5], sys.argv[6], ) if len(sys.argv) == 5: example = ObjectDetection( platform, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] ) if len(sys.argv) == 6: example = ObjectDetection( platform, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5] ) if len(sys.argv) == 9: example = ObjectDetection( platform, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5], sys.argv[6], int(sys.argv[7]), int(sys.argv[8]), ) if len(sys.argv) == 12: example = ObjectDetection( platform, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5], sys.argv[6], int(sys.argv[7]), int(sys.argv[8]), int(sys.argv[9]), int(sys.argv[10]), int(sys.argv[11]), ) example.run() |
self.pipeline = Gst.parse_launch( 'v4l2src name=cam_src ! videoconvert ! videoscale ! ' 'video/x-raw,width=640,height=480,format=RGB ! tee name=t_raw ' 't_raw. ! queue leaky=2 max-size-buffers=2 ! videoscale ! video/x-raw,width=300,height=300 ! tensor_converter ! ' 'tensor_transform mode=arithmetic option=typecast:float32,add:-127.5,div:127.5 ! ' 'tensor_filter framework=tensorflow-lite model=' + self.tflite_model + ' ! ' 'tensor_decoder mode=bounding_boxes option1=mobilenet-ssd option2=' + self.tflite_label + ' option3=' + self.tflite_box_prior + ' option4=640:480 option5=300:300 !' 'compositor name=mix sink_0::zorder=2 sink_1::zorder=1 ! videoconvert ! ximagesink ' 't_raw. ! queue leaky=2 max-size-buffers=10 ! mix. ' ) |
gst_launch_cmdline 를 출력해보니 아래와 같이 gstreamer 파이프라인이 나온다.
v4l2src name=cam_src device=/dev/video3 ! imxvideoconvert_g2d ! video/x-raw,width=1920,height=1080,framerate=30/1,format=BGRx ! tee name=t t. ! imxvideoconvert_g2d ! video/x-raw,width=300,height=300, ! queue max-size-buffers=2 leaky=2 ! videoconvert ! video/x-raw,format=RGB ! tensor_converter ! tensor_filter framework=tensorflow-lite model=/home/root/.cache/gopoint/mobilenet_ssd_v2_coco_quant_postprocess.tflite accelerator=true:npu custom=Delegate:External,ExtDelegateLib:libvx_delegate.so silent=FALSE name=tensor_filter latency=1 ! tensor_sink name=tensor_sink t. ! imxvideoconvert_g2d ! cairooverlay name=tensor_res ! queue max-size-buffers=2 leaky=2 ! fpsdisplaysink name=img_tensor text-overlay=false video-sink=waylandsink sync=false |
보기어려우니 엔터로 구분
v4l2src name=cam_src device=/dev/video3 ! imxvideoconvert_g2d ! video/x-raw,width=1920,height=1080,framerate=30/1,format=BGRx ! tee name=t t. ! imxvideoconvert_g2d ! video/x-raw,width=300,height=300, ! queue max-size-buffers=2 leaky=2 ! videoconvert ! video/x-raw,format=RGB ! tensor_converter ! tensor_filter framework=tensorflow-lite model=/home/root/.cache/gopoint/mobilenet_ssd_v2_coco_quant_postprocess.tflite accelerator=true:npu custom=Delegate:External,ExtDelegateLib:libvx_delegate.so silent=FALSE name=tensor_filter latency=1 ! tensor_sink name=tensor_sink t. ! imxvideoconvert_g2d ! cairooverlay name=tensor_res ! queue max-size-buffers=2 leaky=2 ! fpsdisplaysink name=img_tensor text-overlay=false video-sink=waylandsink sync=false |
+
2024.01.03
# cd /home/root/.nxp-demo-experience/scripts/machine_learning # python3 nndetection.py /dev/video3 NPU /home/root/.cache/gopoint/mobilenet_ssd_v2_coco_quant_postprocess.tflite /home/root/.cache/gopoint/coco_labels.txt |
gst-launch 로도 실행은 되는데 callback 처리가 안되서 overlay가 출력이 안되어 동일한 화면을 보여주진 않는다.
gst-launch-1.0 v4l2src name=cam_src device=/dev/video3 ! imxvideoconvert_g2d ! video/x-raw,width=1920,height=1080,framerate=30/1,format=BGRx ! tee name=t t. ! imxvideoconvert_g2d ! video/x-raw,width=300,height=300, ! queue max-size-buffers=2 leaky=2 ! videoconvert ! video/x-raw,format=RGB ! tensor_converter ! tensor_filter framework=tensorflow-lite model=/home/root/.cache/gopoint/mobilenet_ssd_v2_coco_quant_postprocess.tflite accelerator=true:npu custom=Delegate:External,ExtDelegateLib:libvx_delegate.so silent=FALSE name=tensor_filter latency=1 ! tensor_sink name=tensor_sink t. ! imxvideoconvert_g2d ! cairooverlay name=tensor_res ! queue max-size-buffers=2 leaky=2 ! fpsdisplaysink name=img_tensor text-overlay=false video-sink=waylandsink sync=false |
'프로그램 사용 > yolo_tensorflow' 카테고리의 다른 글
우분투에 jupyter notebook 설치 및 실행하기 (0) | 2024.01.02 |
---|---|
주피터 노트북 프로젝트(?) 실행하기 (0) | 2024.01.02 |
tensorflow keras dataset (0) | 2024.01.02 |
tensorflow lite / mnist 학습 (0) | 2024.01.02 |
yolo-label (0) | 2022.03.22 |