機械学習・AI【エッジAI】Raspi+SONY IMX500で30FPS物体検出チャレンジ!~オリジナルモデル、静止画・ログ保存編 vol.24

【エッジAI】Raspi+SONY IMX500で30FPS物体検出チャレンジ!~オリジナルモデル、静止画・ログ保存編 vol.24

vol.22 【エッジAI】Raspi5+SONY IMX500でリアルタイム物体検出チャンレンジ!~検出実行編
vol.23 エッジAI】Raspi5+SONY IMX500でリアルタイム物体検出チャンレンジ!~環境整備編

に引き続き、今回は「vol24【エッジAI】Raspi+SONY IMX500でリアルタイム物体検出チャレンジ!~オリジナルモデル、静止画・ログ保存編」です。

前回予告で示した

  • 検出結果を静止画保存する!
  • 静止画保存するクラスを限定したい
  • 検出ログを書き出したい
  • FPSを知りたい

を具現化しちゃいますよ。

更に、今回は特別に、

  • プログラムの自動起動
  • ランチャープログラム

を追加しちゃいました。

 

■今回作ったもの

まずは画面を見てください

 

・プログラムの起動 ~/start.sh

 

Shellをメニューに追加
01.png

 

・ランチャー edgeai/launcher.py

ランチャー
02.png

 

 

・リアルタイム検出プログラム edgeai/detector.py

リアルタイム検出プログラム
03.png

 

■プログラムの機能

 

・start.sh

 


これは、シンプルに、これまでTerminalでやっていたコマンドをshにしただけです。
一応起動のログを書くようにだけしました。
仮想環境に入り、ランチャープログラムを実行します。
※終了手段を作っていないので、start.shを起動したTerminalを閉じてください。

#!/bin/bash
echo "[Viewer] Detection started at $(date)" >> ~/edgeai/launch.log
source ~/edgeai/venv311/bin/activate
cd ~/edgeai/src
python3 launcher.py

 

OSが準備しているタスクランチャーに登録しておいてもGOODです。

※サービス化して自動起動という方法もあるかと思いますが、綺麗に終了できる手段を別途作る必要があります。(今はターミナル内で実行なので、ターミナルを殺せば綺麗に終われます)

 

・ランチャー launcher.py

 


tkinterを使ったシンプルな起動プログラムです。

  •  検出プログラムの実行
  •  OSの再起動
  •  OSの終了

 を実行します。

 検出タスクの数だけメニューを追加する構想です。

import tkinter as tk
import subprocess

import subprocess

def start_detection():
    print("YOLO11n Detection started")
    cmd = [
        "sudo", "python3", "detector.py",
        "--model", "../models/yolo11n_imx_model/yolo11n_imx_model.rpk",
        "--bbox-normalization",
        "--bbox-order", "xy",
        "--threshold", "0.5",
        "--labels", "../models/yolo11n_imx_model/labels.txt"
    ]
    subprocess.run(cmd)


def start_detection2():
    print("Makuragi Detection started")
    cmd = [
        "sudo", "python3", "detector.py",
        "--model", "../models/minamiaso_makuragi_imx_model/minamiaso_makuragi.rpk",
        "--bbox-normalization",
        "--bbox-order", "xy",
        "--threshold", "0.15",
        "--labels", "../models/minamiaso_makuragi_imx_model/labels.txt"
    ]
    subprocess.run(cmd)
    
def shutdown():
    subprocess.run(["sudo", "shutdown", "now"])

def reboot():
    subprocess.run(["sudo", "reboot"])

root = tk.Tk()
root.title("Viewer Detection UI")

tk.Button(root, text="YOLO11n Detection", command=start_detection).pack()
tk.Button(root, text="Makuragi Detection", command=start_detection2).pack()
tk.Button(root, text="Shutdown", command=shutdown).pack()
tk.Button(root, text="Reboot", command=reboot).pack()

root.mainloop()


 

・検出プログラム detector.py

 
当面欲しい機能を作りました。

  •  指定したモデル、ラベルを用いて検出を実行します
  •  ビューア画面上にFPSを表示します
  •  静止画保存させるクラス名を任意に複数定義できます
  •  実行した日付のフォルダを作成し静止画保存を保存します
  •  実行した日付のフォルダにログファイルを保存します。
import argparse
import sys
import signal
from functools import lru_cache
import time
import os
import cv2
import numpy as np
from datetime import datetime
from picamera2 import MappedArray, Picamera2
from picamera2.devices import IMX500
from picamera2.devices.imx500 import (NetworkIntrinsics,
                                      postprocess_nanodet_detection)

detector_proc = None

def cleanup():
    print("Viewer detection ending gracefully")
    # ログ保存、画像保存、GPIO解放など
    # Viewer文化的には「痕跡を残して器を閉じる儀式」
    stop_detection()

def signal_handler(sig, frame):
    print("Signal received:", sig)
    cleanup()
    sys.exit(0)

def start_detection():
    global detector_proc
    detector_proc = subprocess.Popen([...])

def stop_detection():
    global detector_proc
    if detector_proc:
        detector_proc.terminate()
        detector_proc.wait()
        detector_proc = None

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

# グローバル変数として初期化(ファイルの先頭で)
frame_count = 0
fps_start_time = time.time()
current_fps = 0.0

last_detections = []
TARGET_CLASSES = {"cat", "person", "car",}  # 検出したいクラス名(必要に応じて調整)

now = datetime.now()
date_str=now.strftime("%Y-%m-%d")
folder = f"../results/{date_str}"
os.makedirs(folder, exist_ok=True)
IMAGE_DIR = f"{folder}/images/"
os.makedirs(IMAGE_DIR, exist_ok=True)

image_path = os.path.join(IMAGE_DIR, now.strftime("%H-%M-%S.jpg"))
log_path = os.path.join(folder, now.strftime("%Y-%m-%d.log"))


class Detection:
    def __init__(self, coords, category, conf, metadata):
        """Create a Detection object, recording the bounding box, category and confidence."""
        self.category = category
        self.conf = conf
        self.box = imx500.convert_inference_coords(coords, metadata, picam2)


def parse_detections(metadata: dict):
    """Parse the output tensor into a number of detected objects, scaled to the ISP output."""
    global last_detections
    bbox_normalization = intrinsics.bbox_normalization
    bbox_order = intrinsics.bbox_order
    threshold = args.threshold
    iou = args.iou
    max_detections = args.max_detections

    np_outputs = imx500.get_outputs(metadata, add_batch=True)
    input_w, input_h = imx500.get_input_size()
    if np_outputs is None:
        return last_detections
    if intrinsics.postprocess == "nanodet":
        boxes, scores, classes = \
            postprocess_nanodet_detection(outputs=np_outputs[0], conf=threshold, iou_thres=iou,
                                          max_out_dets=max_detections)[0]
        from picamera2.devices.imx500.postprocess import scale_boxes
        boxes = scale_boxes(boxes, 1, 1, input_h, input_w, False, False)
    else:
        boxes, scores, classes = np_outputs[0][0], np_outputs[1][0], np_outputs[2][0]
        if bbox_normalization:
            boxes = boxes / input_h

        if bbox_order == "xy":
            boxes = boxes[:, [1, 0, 3, 2]]
        boxes = np.array_split(boxes, 4, axis=1)
        boxes = zip(*boxes)

    last_detections = [
        Detection(box, category, score, metadata)
        for box, score, category in zip(boxes, scores, classes)
        if score > threshold
    ]
    return last_detections


@lru_cache
def get_labels():
    labels = intrinsics.labels

    if intrinsics.ignore_dash_labels:
        labels = [label for label in labels if label and label != "-"]
    return labels


def draw_detections(request, stream="main"):
    global frame_count, fps_start_time,current_fps

    """Draw the detections for this request onto the ISP output."""
    detections = last_results
    if detections is None:
        return
    labels = get_labels()
    with MappedArray(request, stream) as m:
        #検出があっても無くても描画する
        cv2.putText(m.array, f"FPS: {current_fps:.2f}", (10, 20),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

        for detection in detections:
            x, y, w, h = detection.box
            label = f"{labels[int(detection.category)]} ({detection.conf:.2f})"

            # Calculate text size and position
            (text_width, text_height), baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
            text_x = x + 5
            text_y = y + 15

            # Create a copy of the array to draw the background with opacity
            overlay = m.array.copy()

            # Draw the background rectangle on the overlay
            cv2.rectangle(overlay,
                          (text_x, text_y - text_height),
                          (text_x + text_width, text_y + baseline),
                          (255, 255, 255),  # Background color (white)
                          cv2.FILLED)

            alpha = 0.30
            cv2.addWeighted(overlay, alpha, m.array, 1 - alpha, 0, m.array)

            # Draw text on top of the background
            cv2.putText(m.array, label, (text_x, text_y),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)

            # Draw detection box
            cv2.rectangle(m.array, (x, y), (x + w, y + h), (0, 255, 0, 0), thickness=2)

        if intrinsics.preserve_aspect_ratio:
            b_x, b_y, b_w, b_h = imx500.get_roi_scaled(request)
            color = (255, 0, 0)  # red
            cv2.putText(m.array, "ROI", (b_x + 5, b_y + 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
            cv2.rectangle(m.array, (b_x, b_y), (b_x + b_w, b_y + b_h), (255, 0, 0, 0))

        # FPS計測
        frame_count += 1
        elapsed = time.time() - fps_start_time
        if elapsed >= 1.0:
            current_fps = frame_count / elapsed
            #print(f"[Viewer] FPS: {fps:.2f}")
            frame_count = 0
            fps_start_time = time.time()


        #静止画保存&ログ書き出し
        check_and_save(m.array,detections,labels,current_fps)



def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--model", type=str, help="Path of the model",
                        default="../models/minamiaso_makuragi_imx_model/minamiaso_makuragi.rpk")
    parser.add_argument("--fps", type=int, help="Frames per second")
    parser.add_argument("--bbox-normalization", action=argparse.BooleanOptionalAction, help="Normalize bbox")
    parser.add_argument("--bbox-order", choices=["yx", "xy"], default="yx",
                        help="Set bbox order yx -> (y0, x0, y1, x1) xy -> (x0, y0, x1, y1)")
    parser.add_argument("--threshold", type=float, default=0.55, help="Detection threshold")
    parser.add_argument("--iou", type=float, default=0.65, help="Set iou threshold")
    parser.add_argument("--max-detections", type=int, default=10, help="Set max detections")
    parser.add_argument("--ignore-dash-labels", action=argparse.BooleanOptionalAction, help="Remove '-' labels ")
    parser.add_argument("--postprocess", choices=["", "nanodet"],
                        default=None, help="Run post process of type")
    parser.add_argument("-r", "--preserve-aspect-ratio", action=argparse.BooleanOptionalAction,
                        help="preserve the pixel aspect ratio of the input tensor")
    parser.add_argument("--labels", type=str,
                        help="Path to the labels file")
    parser.add_argument("--print-intrinsics", action="store_true",
                        help="Print JSON network_intrinsics then exit")
    return parser.parse_args()

def save(image, detections, labels,fps):
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"{IMAGE_DIR}/detected_{timestamp}.jpg"
    cv2.imwrite(filename, image)

    with open(log_path, "a") as f:
        f.write(f"[{timestamp}],{fps}")
        for det in detections:
            x, y, w, h = det.box
            label = labels[int(det.category)]
            conf = det.conf
            f.write(f"  - {label} ({conf:.2f}) at ({x}, {y}, {w}, {h}),")
        f.write("\n")

def check_and_save(image, detections, labels,fps):
    for det in detections:
        label = labels[int(det.category)]
        if label in TARGET_CLASSES:
            save(image, detections, labels,fps)
            break  # 一度保存したら終了(複数保存したい場合はこの行を削除)


if __name__ == "__main__":
    args = get_args()

    # This must be called before instantiation of Picamera2
    imx500 = IMX500(args.model)
    intrinsics = imx500.network_intrinsics
    if not intrinsics:
        intrinsics = NetworkIntrinsics()
        intrinsics.task = "object detection"
    elif intrinsics.task != "object detection":
        print("Network is not an object detection task", file=sys.stderr)
        exit()

    # Override intrinsics from args
    for key, value in vars(args).items():
        if key == 'labels' and value is not None:
            with open(value, 'r') as f:
                intrinsics.labels = f.read().splitlines()
        elif hasattr(intrinsics, key) and value is not None:
            setattr(intrinsics, key, value)

    # Defaults
    if intrinsics.labels is None:
        with open("../models/minamiaso_makuragi_imx_model/labels.txt", "r") as f:
            intrinsics.labels = f.read().splitlines()
    intrinsics.update_with_defaults()

    if args.print_intrinsics:
        print(intrinsics)
        exit()

    picam2 = Picamera2(imx500.camera_num)
    config = picam2.create_preview_configuration(controls={"FrameRate": intrinsics.inference_rate}, buffer_count=12)

    imx500.show_network_fw_progress_bar()
    picam2.start(config, show_preview=True)
    print("Detection started!")

    if intrinsics.preserve_aspect_ratio:
        imx500.set_auto_aspect_ratio()

    last_results = None
    picam2.pre_callback = draw_detections
    try:
        while True:
            last_results = parse_detections(picam2.capture_metadata())
    except KeyboardInterrupt:
        print("Detection stopped gracefully")
        # ログ保存や後処理
    finally:    
        cleanup()


 

■ディレクトリ構成

 

edgeaiディレクトリの構造
04.png

 

~
|- edgeai
|- data
|- models
| |- model1
| |- model2
|- results
| |- 2025-10-29
| | |- images
| | | |- ****.jpg
| | |- 2025-10-29.log
| |
| |- 2025-10-30
|
|- src
|- tools
|- venv311
|- launch.log

 

 

■検出実行時のターミナル画面例

detector.py実行時のターミナル例です。

カメラに対して指定したモデルファイルを転送して、検出動作を開始します。

 

/home/rdcenter/Desktop/start.sh: 1: source: not found
YOLO11n Detection started
[2:45:26.787593982] [6732] INFO Camera camera_manager.cpp:330 libcamera v0.5.2+99-bfd68f78
[2:45:26.796906698] [6739] INFO RPI pisp.cpp:720 libpisp version v1.2.1 981977ff21f3 29-04-2025 (14:17:26)
[2:45:26.803141139] [6739] INFO IPAProxy ipa_proxy.cpp:180 Using tuning file /usr/share/libcamera/ipa/rpi/pisp/imx500.json
[2:45:26.812891303] [6739] INFO Camera camera_manager.cpp:220 Adding camera '/base/axi/pcie@1000120000/rp1/i2c@80000/imx500@1a' for pipeline handler rpi/pisp
[2:45:26.812966285] [6739] INFO RPI pisp.cpp:1179 Registered camera /base/axi/pcie@1000120000/rp1/i2c@80000/imx500@1a to CFE device /dev/media0 and ISP device /dev/media2 using PiSP variant BCM2712_C0

----------------------------------------------------------------------------------NOTE: Loading network firmware onto the IMX500 can take several minutes, please do not close down the application.
----------------------------------------------------------------------------------

[2:45:26.848253104] [6732] INFO Camera camera.cpp:1215 configuring streams: (0) 640x480-XBGR8888/sRGB (1) 2028x1520-RGGB_PISP_COMP1/RAW
[2:45:26.849948206] [6739] INFO RPI pisp.cpp:1483 Sensor: /base/axi/pcie@1000120000/rp1/i2c@80000/imx500@1a - Selected sensor format: 2028x1520-SRGGB10_1X10/RAW - Selected CFE format: 2028x1520-PC1R/RAW
Network Firmware Upload: 0.00bytes [00:00, ?bytes/s]QStandardPaths: XDG_RUNTIME_DIR not set, defaulting to '/tmp/runtime-root'
Detection started!

 

 

■検出時のシステム負荷

検出実行時のシステム負荷やリソースの消費状況を確認します。

htopとシステムモニタで負荷状況やリソース消費を確認
05.png

detector.pyを実行している間のCPU負荷はわずかに35%(30%以下)
メモリはOSと合わせても1GBも使っていません。
大変に負荷が軽い、地球にもお財布にも優しい仕組みです。※

※30FPSをYOLOモデルでモリモリ動かそうとしたら、GPUは必須ですし、ノートPCでもJetson系列のエッジデバイスでも30万円程度の出費は間違いないところです。

  

■連続稼働

 

24時間の連続稼働を確認済です。

2025/10/29 10:17 - 2025/10/30 10:30 24時間

※どうでも良い話ですが、我が家の猫が6:30に外猫の見張り行動をしていることが判明しました。

 

日次の有人による撮影であれば勤務時間動作すればOKでしょうが、監視カメラとして24時間365日稼働させ続けるとどうなるか?というのは別の課題ですね。

 

■更に欲望が湧いてきます

今回は、picamera2のサンプルを拝借して、「とりあえず静止画保存、ログ保存したい」を実現しました。

更に欲を言えば、

  • 静止画保存するクラスを外部(引数やjson)から指定したい
  • 結果だけを通信でサーバに送ってリアルタイムな情報を集約したい
  • マルチスレッドで効率的・高速に動作させたい
  • 位置情報を付与してEXIFとして保存したい
  • リモートから起動/停止できるようにして集中監視したい

・・とかあると思います。(私の妄想は少なくともそうです)

スマホをコントローラとして、複数のRaspiをコントロールしたいなぁとか、保存した結果を非同期通信のジョブ管理でクラウドにアップロードしたいなぁとか。

業務で使おうとすると、現場に置いたエッジカメラから、どのように情報を取り出すか(通信するか?)
またはエージェント的なものを連動させて、もっとインテリジェントな仕組みを構築したいだとか、エッジAIの世界観に没入していってしまいますね。

 

■まとめ

 

エッジAIはあらぬ妄想を喚起する危ない代物でした。
・IMX500でYOLOからコンバートしたモデルを実行できます。
・IMX500が出力する画像→静止画に、検出結果→ログに取り出せます。
・30FPSをCPU負荷30%程度で実行します
・ビューア画面へのテキストオーバーレイもCV2で自由自在です

 

 

▼この記事を書いたひと

001@2x.png

R&Dセンター 松井 良行

R&Dセンター 技術戦略担当部長。コンピュータと共に35年。そしてこれからも!

 

 

おすすめの関連記事


機械学習・AIの最新記事


お問い合わせ

ご意見・ご質問などお気軽にお問い合わせ下さい。
ナカシャクリエイテブ株式会社

●富士見事務所 TEL : 052-228-8733 FAX : 052-323-3337
〒460-0014 愛知県名古屋市中区富士見町13−22 ファミール富士見711  地図
交通部 R&Dセンター

Email:メールでのお問い合わせ