Write a custom metadata to a saved TS segment in gstreamer

225 Views Asked by At

I am using gstreamer-rs to pull a stream from an rtsp camera stream. I would like to add some custom metadata to the ts segment like the camera source, and some other information everytime a new ts segment is created. The custom metadata can change so I cannot do it static.

I use the get-fragment-stream callback to get when the ts segment is written.

  • I added a callback to get the stream convert it into a gst::Stream
  • Then add the custom tags and merge them and the push it back to call get-fragment-stream

But I am running into issues where the gst::Stream does not support io.write.

What is the right way to add a custom tag to every ts segment generated by hlssink?

Here is a simple example program that runs if the hlssink.connect is removed.

use gst::prelude::*;
use gst::{DebugGraphDetails};
use anyhow::{Error, format_err};
use curl::easy::Easy;
use std::process::exit;

#[cfg(not(target_os = "macos"))]
pub fn run<T,F: FnOnce() -> T + Send + 'static>(main: F) -> T
    where
        T: Send + 'static, { main() }



fn k_main () {

    gst::init().unwrap();
    let main_loop = glib::MainLoop::new(None, false);

    let pipeline = gst::Pipeline::new(None);

    let rtspsrc = gst::ElementFactory::make("rtspsrc", None).unwrap();
    let rtph264depay = gst::ElementFactory::make("rtph264depay", None).unwrap();
    let h264parse = gst::ElementFactory::make("h264parse", None).unwrap();
    let hlssink2 = gst::ElementFactory::make("hlssink2", None).unwrap();

    rtspsrc.set_property_from_str("location", "rtsp://<rtspurl>");

    hlssink2.set_property("playlist-location", "output.m3u8");
    hlssink2.set_property("location", "output%05d.ts");
    hlssink2.set_property("max-files", 10 as u32);
    hlssink2.set_property("target-duration", 10 as u32);



    pipeline.add_many(&[
        &rtspsrc,
        &rtph264depay,
        &h264parse,
        &hlssink2,
    ]).unwrap();


    gst::Element::link_many(&[
        &rtph264depay,
        &h264parse,
        &hlssink2,
        ]
        ).unwrap();

    rtspsrc.connect_pad_added(move |src, src_pad| {
                    println!("inside connect_pad_added src_pad={:#?}", src_pad.current_caps());
                    let is_video = if src_pad.name().ends_with("_96") {
                            true
                        } else {
                            false
                    };

                    let connect_remote_stream = || -> Result<(), Error> {
                        let video_sink_pad = rtph264depay.static_pad("sink").expect("could not get sink pad from rtph264depay");
                        src_pad.link(&video_sink_pad).expect("failed to link rtspsrc.video->rtph264depay.sink");
                        println!("linked rtspsrc->rtph264depay");
                        Ok(())
                    };

                    if is_video {
                         match connect_remote_stream() {
                            Ok(_) => println!("rtsp stream is setup!"),
                            Err(e) => log::error!("could not setup rtspsrc->rtph264depay")
                        }
                   }
    });

    // connect to the hlssink to know when ts segment is written
    let hlssink_clone = hlssink2.clone();

    hlssink_clone.connect("get-fragment-stream", false, move |args| {
        let fragment_stream = args[0].get::<gst::Stream>().expect("Failed to extract fragment stream from get-fragment-stream signal");
        let segment_number = args[1].get::<u32>().expect("failed to extract segment number from gst-fragment-stream");

        //println!("segment-number={}", segment_number);

        let mut custom_tags = gst::tags::TagList::new();
        custom_tags.add::<gst::tags::Title>(&format!("Custom Title {}", segment_number).as_str(), gst::TagMergeMode::Replace);
        custom_tags.add::<gst::tags::Artist>(&format!("Custom artist {}", "blahblah").as_str(), gst::TagMergeMode::Replace);


        fragment_stream.set_tags(Some(&custom_tags));


        Some(
                    gio::WriteOutputStream::new(fragment_stream)
                )

    });

    let bus = pipeline.bus().unwrap();
    let main_loop_clone = main_loop.clone();

    bus.add_watch(move |_, msg| {
        use gst::MessageView;

        match msg.view() {
            MessageView::Eos(..) => {
                println!("eos received");
                main_loop_clone.quit()
            }
            MessageView::Error(err) => {
                println!("error from {:#?}: {} ({:#?})",
                err.src().map(|s| s.path_string()),
                err.error(),
                err.debug());
            }
            MessageView::StateChanged(state_changed) => {
                println!("{} state changed from {:?} to {:?}", msg.src().unwrap(), state_changed.old(), state_changed.current());
            }
            MessageView::StreamStart(stream_status) => {
                println!("stream start={:?}", stream_status.seqnum());
            }
            _=> (),
        }
        glib::Continue(true)
    }).expect("failed to add bus watch");

    match pipeline.set_state(gst::State::Playing) {
        Ok(result) => println!("pipeline set to playing"),
        Err(e) => {
            println!("unable to set pipeline to playing e: {}", e);
            exit(0);
        }
    }
    main_loop.run();
    pipeline.set_state(gst::State::Null).unwrap();
}

fn main() {
    println!("Hello, world!");
    run(k_main);
}
0

There are 0 best solutions below