My goal is to display and record a video from a GStreamer pipeline within a Python environment. I have made a Streamlit app for this. My code behaves as expected if the source device=/dev/video0 is a webcam. However, if it is a USB-connected camera with a larger resolution and frame rate, when I tried to stop the recording, the bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.EOS) function just hangs. What is the correct way to handle EOS and implement the functionality I want to achieve? I saw this helpful post regarding EOS that led me to implement my EOS handling, but I feel there must be something that I am missing.
Thank you
import cv2
import streamlit as st
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
import numpy as np
import os
import datetime
#------ Resources------
INPUT = "v4l2src device=/dev/video0"
OUTPUT_DIR = "Resources/Outputs/"
st.title("Gstreamer App")
class GstreamerApp:
def __init__(self):
if "init" not in st.session_state:
st.session_state.init = True
col1, buff1, col2, buff2, col3 = st.columns([1, 0.5, 1, 0.5, 1])
self.start_button_pressed = col1.button("Start Playing", on_click=self.start)
self.stop_button_pressed = col2.button("Stop", on_click=self.stop)
self.record_button_pressed = col3.button("Start Record", on_click=self.record)
def start(self):
#---- Stop a running pipeline if one is already running-----
if self.start_button_pressed:
self.stop()
else:
self.start_button_pressed = True
Gst.init(None)
self.frame_placeholder = st.empty()
if self.record_button_pressed:
OUTPUT_FILE_NAME = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
pipeline_string = f"{INPUT} ! decodebin ! videoconvert ! tee name=t t. ! queue !\
videoconvert ! video/x-raw,format=BGR ! appsink name=appsink t. !\
queue max-size-buffers=20 ! x264enc tune=zerolatency ! mp4mux !\
filesink location={OUTPUT_DIR}{OUTPUT_FILE_NAME}.mp4"
else:
pipeline_string = f"{INPUT} ! decodebin ! videoconvert ! videoconvert !\
video/x-raw,format=BGR ! appsink max-buffers=20 name=appsink"
st.session_state.pipeline = Gst.parse_launch(pipeline_string)
appsink = st.session_state.pipeline.get_by_name("appsink")
st.session_state.pipeline.set_state(Gst.State.PLAYING)
#----- Reset stop button on first pressing start ----
self.stop_button_pressed = False
while not self.stop_button_pressed:
sample = appsink.emit("pull-sample")
buf = sample.get_buffer()
caps = sample.get_caps()
width = caps.get_structure(0).get_value("width")
height = caps.get_structure(0).get_value("height")
numpy_array = np.ndarray(
(height, width, 3),
buffer=buf.extract_dup(0, buf.get_size()),
dtype=np.uint8,
)
frame = cv2.cvtColor(numpy_array, cv2.COLOR_BGR2RGB)
self.frame_placeholder.image(frame, channels="RGB")
if cv2.waitKey(1) & 0xFF == ord("q"):
break
def stop(self):
self.stop_button_pressed = True
if self.record_button_pressed == True:
self.record_button_pressed = False
if self.start_button_pressed == True:
self.start_button_pressed = False
#-----Send and wait for EOS-----
bus = st.session_state.pipeline.get_bus()
st.session_state.pipeline.send_event(Gst.Event.new_eos())
if bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.EOS) is None:
raise RuntimeError("Pipeline did not receive EOS message within the timeout period")
st.session_state.pipeline.set_state(Gst.State.NULL)
cv2.destroyAllWindows()
def record(self):
self.record_button_pressed = True
self.start()
def create_app():
return GstreamerApp()
if __name__=="__main__":
gst_app = create_app()
Hi you can look at this repo Streamlit-x-Gstreamer which shows how we can integrate the gstreamer pipeline in the streamlit app.