Displaying and recording Gstreamer Video in Python Streamlit, how to handle EOS?

168 Views Asked by At

My goal is to display and record a video from a GStreamer pipeline within a Python environment. I have made a Streamlit app for this. My code behaves as expected if the source device=/dev/video0 is a webcam. However, if it is a USB-connected camera with a larger resolution and frame rate, when I tried to stop the recording, the bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.EOS) function just hangs. What is the correct way to handle EOS and implement the functionality I want to achieve? I saw this helpful post regarding EOS that led me to implement my EOS handling, but I feel there must be something that I am missing.

Thank you

import cv2
import streamlit as st
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
import numpy as np
import os
import datetime

#------ Resources------
INPUT = "v4l2src device=/dev/video0" 
OUTPUT_DIR = "Resources/Outputs/"
st.title("Gstreamer App")

class GstreamerApp:
    def __init__(self):
        if "init" not in st.session_state:
            st.session_state.init = True
        col1, buff1, col2, buff2, col3 = st.columns([1, 0.5, 1, 0.5, 1])
        self.start_button_pressed = col1.button("Start Playing", on_click=self.start)
        self.stop_button_pressed = col2.button("Stop", on_click=self.stop)
        self.record_button_pressed = col3.button("Start Record", on_click=self.record)
   
    def start(self):
        #---- Stop a running pipeline if one is already running-----
        if self.start_button_pressed:
            self.stop()
        else:
            self.start_button_pressed = True
            
        Gst.init(None)
        self.frame_placeholder = st.empty()
        if self.record_button_pressed:
            OUTPUT_FILE_NAME = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
            pipeline_string = f"{INPUT} ! decodebin ! videoconvert ! tee name=t t. ! queue !\
                              videoconvert ! video/x-raw,format=BGR ! appsink name=appsink t. !\
                              queue max-size-buffers=20 ! x264enc tune=zerolatency ! mp4mux !\
                              filesink location={OUTPUT_DIR}{OUTPUT_FILE_NAME}.mp4"
        else:
            pipeline_string = f"{INPUT} ! decodebin ! videoconvert ! videoconvert !\
                              video/x-raw,format=BGR ! appsink max-buffers=20 name=appsink"

        st.session_state.pipeline = Gst.parse_launch(pipeline_string)
        appsink = st.session_state.pipeline.get_by_name("appsink")
        st.session_state.pipeline.set_state(Gst.State.PLAYING)
        #----- Reset stop button on first pressing start ----
        self.stop_button_pressed = False
        while not self.stop_button_pressed:
            sample = appsink.emit("pull-sample")
            buf = sample.get_buffer()
            caps = sample.get_caps()
            width = caps.get_structure(0).get_value("width")
            height = caps.get_structure(0).get_value("height")
            numpy_array = np.ndarray(
                (height, width, 3),
                buffer=buf.extract_dup(0, buf.get_size()),
                dtype=np.uint8,
            )
            frame = cv2.cvtColor(numpy_array, cv2.COLOR_BGR2RGB)
            self.frame_placeholder.image(frame, channels="RGB")
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break
                
    def stop(self):
        self.stop_button_pressed = True
        if self.record_button_pressed == True:
            self.record_button_pressed = False
        if self.start_button_pressed == True:
            self.start_button_pressed = False
        #-----Send and wait for EOS-----
        bus = st.session_state.pipeline.get_bus()
        st.session_state.pipeline.send_event(Gst.Event.new_eos())
        if bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.EOS) is None:
            raise RuntimeError("Pipeline did not receive EOS message within the timeout period")
        st.session_state.pipeline.set_state(Gst.State.NULL) 
        cv2.destroyAllWindows()

    def record(self):
        self.record_button_pressed = True
        self.start()

def create_app():
    return GstreamerApp()

if __name__=="__main__":
    gst_app = create_app()

1

There are 1 best solutions below

1
Vishal Kumar On

Hi you can look at this repo Streamlit-x-Gstreamer which shows how we can integrate the gstreamer pipeline in the streamlit app.