Why can't I receive an RTP video stream produced by GStreamer's appsrc?

652 Views Asked by At

I'm trying to use the GStreamer's appsrc element with rtpbin and udpsink to create an RTP sender using the VP8 codec (vp8enc).

This program creates a fake video stream by switching black and white frames every 100 milliseconds. You can see the video by uncommenting the macro USE_AUTOVIDEOSINK.

When I run the receiver, I only get the first frame. It doesn't blink. What am I doing wrong?

PS: I'm using libgstreamer 1.20.3 from Ubuntu 22.04.

// broadcaster.c
//
// Compile with:
// c99 -Wall -O3 -std=c99  `pkg-config gstreamer-1.0 --cflags --libs-only-l` -O0 -g -c broadcaster.c -o broadcaster

#include <gst/gst.h>
#include <pthread.h>
#include <time.h>
#include <errno.h>
#include <stdint.h>

//#define USE_AUTOVIDEOSINK
#define FRAME_WIDTH 320
#define FRAME_HEIGHT 240
#define FRAME_SIZE (FRAME_WIDTH * FRAME_HEIGHT)
#define FRAME_RATE_INTERVAL_MS 100
#define VIDEO_PT 101
#define VIDEO_SSRC 2222
#define VIDEO_TRANSPORT_HOST "localhost"
#define VIDEO_TRANSPORT_PORT 1234

typedef struct {
  GstElement *pipeline;
  GstElement *video_appsrc;
  GstElement *videoconvert;
#ifdef USE_AUTOVIDEOSINK
  GstElement *autovideosink;
#else
  GstElement *rtpbin;
  GstElement *vp8enc;
  GstElement *rtpvp8pay;
  GstElement *udpsink;  
#endif
  gboolean need_video_data;
  gboolean playing;
  gboolean use_black_buffer;
} CustomData;

static void need_video_data_callback(
  GstElement *appsrc,
  guint length,
  CustomData *data
);

static void enough_video_data_callback(
  GstElement *appsrc,
  CustomData *data
);

#ifndef USE_AUTOVIDEOSINK
static void pad_added_handler(
  GstElement *src,
  GstPad *new_pad,
  CustomData *data
);
#endif

static void* thread_proc(
  void *p_data
);

static int msleep(
  long msec
);

static gboolean terminate = FALSE;

static uint16_t black_buffer[FRAME_SIZE];
static uint16_t white_buffer[FRAME_SIZE];

int main(int argc, char *argv[]) {
  CustomData data;
  GstBus *bus;
  GstMessage *msg;
  pthread_t thread;
#ifndef USE_AUTOVIDEOSINK
  GstPad *rtpvp8pay_src_pad, *rtpbin_send_rtp_sink_0_pad;
#endif

  for (int i = 0; i < FRAME_SIZE; ++i) { 
    black_buffer[i] = 0;
    white_buffer[i] = 0xFFFF;
  }

  gst_init(&argc, &argv);

  data.pipeline = gst_pipeline_new("broadcaster-pipeline");

  data.video_appsrc = gst_element_factory_make(
    "appsrc",
    "video_appsrc"
  );

  data.videoconvert = gst_element_factory_make(
    "videoconvert",
    "videoconvert"
  );

#ifdef USE_AUTOVIDEOSINK
  data.autovideosink = gst_element_factory_make(
    "autovideosink",
    "autovideosink"
  );
#else
  data.rtpbin = gst_element_factory_make(
    "rtpbin",
    "rtpbin"
  );

  data.vp8enc = gst_element_factory_make(
    "vp8enc",
    "vp8enc"
  );

  data.rtpvp8pay = gst_element_factory_make(
    "rtpvp8pay",
    "rtpvp8pay"
  );

  data.udpsink = gst_element_factory_make(
    "udpsink",
    "udpsink"
  );
#endif

  data.need_video_data = FALSE;
  data.playing = FALSE;
  data.use_black_buffer = FALSE;

  if (
    !data.pipeline ||
    !data.video_appsrc ||
    !data.videoconvert ||
#ifdef USE_AUTOVIDEOSINK
    !data.autovideosink
#else
    !data.rtpbin ||
    !data.vp8enc ||
    !data.rtpvp8pay ||
    !data.udpsink
#endif
  ) {
    g_error("Error: Not all gstreamer elements could be created.\n");
    return -1;
  }

  gst_bin_add_many(
    GST_BIN(data.pipeline),
    data.video_appsrc,
    data.videoconvert,
#ifdef USE_AUTOVIDEOSINK
    data.autovideosink,
#else
    data.rtpbin,
    data.vp8enc,
    data.rtpvp8pay,
    data.udpsink,
#endif
    NULL
  );

  if (
    !gst_element_link_many(
      data.video_appsrc,
      data.videoconvert,
#ifdef USE_AUTOVIDEOSINK
      data.autovideosink,
#else
      data.vp8enc,
      data.rtpvp8pay,
#endif
      NULL
    )
  ) {
    g_printerr("Error: Not all video elements could be linked.\n");
    gst_object_unref(data.pipeline);
    return -1;
  }

  g_object_set(
    data.video_appsrc,
    "is-live", TRUE,
    "stream-type", 0,
    "format", GST_FORMAT_TIME,
    NULL
  );

#ifndef USE_AUTOVIDEOSINK
  g_object_set(
    data.vp8enc,
    "target-bitrate", 1000000,
    "deadline", 1,
    "cpu-used", 4,
    NULL
  );

  g_object_set(
    data.rtpvp8pay,
    "pt", VIDEO_PT,
    "ssrc", VIDEO_SSRC,
    "picture-id-mode", 2,
    NULL
  );

  g_object_set(
    data.udpsink,
    "host", VIDEO_TRANSPORT_HOST,
    "port", VIDEO_TRANSPORT_PORT,
    NULL
  );  
#endif

  g_signal_connect(
    data.video_appsrc, 
    "need-data",
    G_CALLBACK(need_video_data_callback), 
    &data
  );

  g_signal_connect(
    data.video_appsrc, 
    "enough-data",
    G_CALLBACK(enough_video_data_callback), 
    &data
  );

#ifndef USE_AUTOVIDEOSINK
  g_signal_connect(
    data.rtpbin,
    "pad-added",
    G_CALLBACK(pad_added_handler),
    &data
  );

  rtpvp8pay_src_pad = gst_element_get_static_pad(
    data.rtpvp8pay,
    "src"
  );

  rtpbin_send_rtp_sink_0_pad = gst_element_request_pad_simple(
    data.rtpbin,
    "send_rtp_sink_%u"
  );

  if (
    gst_pad_link(
      rtpvp8pay_src_pad, 
      rtpbin_send_rtp_sink_0_pad
    ) != GST_PAD_LINK_OK
  ) {
    g_printerr("Error: rtpvp8pay could not be linked.\n");
    gst_object_unref(data.pipeline);
    return -1; 
  }  
#endif

  if (pthread_create(&thread, NULL, thread_proc, &data)) {
    g_printerr("Could not create thread.\n");
    return -1;
  }

  bus = gst_element_get_bus(data.pipeline);
  do {
    msg = gst_bus_timed_pop_filtered(
      bus,
      GST_CLOCK_TIME_NONE,
      (GstMessageType)(
        GST_MESSAGE_STATE_CHANGED |
        GST_MESSAGE_ERROR |
        GST_MESSAGE_EOS
      )
    );

    if (msg != NULL) {
      GError *err;
      gchar *debug_info;

      switch (GST_MESSAGE_TYPE(msg)) {
        case GST_MESSAGE_ERROR:
          gst_message_parse_error(msg, &err, &debug_info);
          g_printerr("Error received from element %s: %s\n", 
            GST_OBJECT_NAME(msg->src), err->message);
          g_printerr("Debugging information: %s\n", 
            debug_info ? debug_info : "none");
          g_clear_error(&err);
          g_free(debug_info);
          terminate = TRUE;
          break;
        case GST_MESSAGE_EOS:
          g_print("End-Of-Stream (EOS) reached.\n");
          terminate = TRUE;
          break;
        case GST_MESSAGE_STATE_CHANGED:
          if (GST_MESSAGE_SRC(msg) == GST_OBJECT(data.pipeline)) {
            GstState old_state, new_state, pending_state;
            gst_message_parse_state_changed(
              msg, 
              &old_state, 
              &new_state, 
              &pending_state
            );
            g_print(
              "Pipeline state changed from %s to %s: \n",
              gst_element_state_get_name(old_state),
              gst_element_state_get_name(new_state)
            );
            if (
              g_str_equal(
                gst_element_state_get_name(new_state),
                "PLAYING"
              )
            ) {
              GST_DEBUG_BIN_TO_DOT_FILE(
                GST_BIN(data.pipeline),
                GST_DEBUG_GRAPH_SHOW_ALL,
                "pipeline"
              );
            }
          }
          break;
        default:
          g_printerr("Unexpected message received.\n");
          break;
      }
      gst_message_unref(msg);
    }
  } while (!terminate);

  gst_object_unref(bus);
  gst_element_set_state(data.pipeline, GST_STATE_NULL);
  gst_object_unref(data.pipeline);
  g_print("Goodbye.\n");

  return 0;
}

static void need_video_data_callback(
  GstElement *appsrc,
  guint length,
  CustomData *data
) {

  g_print(
    "Video data is needed on '%s':\n",
    GST_ELEMENT_NAME(appsrc)
  );

  data->need_video_data = TRUE;
}

static void enough_video_data_callback(
  GstElement *appsrc,
  CustomData *data
) {

  g_print(
    "Video data is enough on '%s':\n",
    GST_ELEMENT_NAME(appsrc)
  );

  data->need_video_data = FALSE;
}

#ifndef USE_AUTOVIDEOSINK
static void pad_added_handler(
  GstElement *src,
  GstPad *new_pad,
  CustomData *data
) {
  GstPad *sink_pad = NULL;

  g_print(
    "Received new pad '%s'.'%s':\n",
    GST_ELEMENT_NAME(src),
    GST_PAD_NAME(new_pad)
  );

  if (src != data->rtpbin) {
    return;
  }

  if (g_str_equal(GST_PAD_NAME(new_pad), "send_rtp_src_0")) {
    sink_pad = gst_element_get_static_pad(
      data->udpsink,
      "sink"
    );
  }

  if (
    sink_pad &&
    gst_pad_link(
      new_pad,
      sink_pad
    ) != GST_PAD_LINK_OK
  ) {
    g_printerr(
      "Error: %s.%s could not be linked to %s.\n",
      GST_ELEMENT_NAME(src),
      GST_PAD_NAME(new_pad),
      GST_PAD_NAME(sink_pad)
    );
  }

  if (sink_pad) {
    gst_object_unref(sink_pad);
  }      
}
#endif

static void* thread_proc(
  void *p_data
) {
  CustomData *data = (CustomData *)p_data;
  GstCaps *video_caps = NULL;

  g_print("Generating frames...\n");

  while (!terminate) {
    if (!video_caps) {
      video_caps = gst_caps_new_simple(
        "video/x-raw",
        "format", G_TYPE_STRING, "RGB16",
        "width", G_TYPE_INT, FRAME_WIDTH,
        "height", G_TYPE_INT, FRAME_HEIGHT,
        NULL
      );

      g_object_set(
        data->video_appsrc,
        "caps", video_caps,
        NULL
      );      
    }

    if (video_caps && !data->playing) {
      GstStateChangeReturn ret;

      ret = gst_element_set_state(
        data->pipeline,
        GST_STATE_PLAYING
      );

      if (ret == GST_STATE_CHANGE_FAILURE) {
        g_printerr("Unable to set the pipeline to the playing state.\n");
        terminate = TRUE;
        continue;
      }
      else {
        data->playing = TRUE;
        g_print("Playing the pipeline.\n");
      }
    }

    if (data->playing && data->need_video_data) {
      GstFlowReturn ret;
      guint size = FRAME_SIZE * 2;
      GstBuffer *buffer = gst_buffer_new_memdup(
        data->use_black_buffer ? black_buffer : white_buffer,
        size
      );
      g_signal_emit_by_name(
        data->video_appsrc,
        "push-buffer",
        buffer,
        &ret
      );
      gst_buffer_unref(buffer);

      data->use_black_buffer = !data->use_black_buffer;

      if (ret != GST_FLOW_OK) {
        terminate = TRUE;
      }   
    }

    msleep(FRAME_RATE_INTERVAL_MS);
  }

  return NULL;
}

static int msleep(
  long msec
) {
  struct timespec ts;
  int res;

  if (msec < 0) {
    errno = EINVAL;
    return -1;
  }

  ts.tv_sec = msec / 1000;
  ts.tv_nsec = (msec % 1000) * 1000000;

  do {
    res = nanosleep(&ts, &ts);
  } while (res && errno == EINTR);

  return res;
}
#!/bin/bash
# receiver.sh

HOST=localhost
VIDEO_RTP_PORT=1234

VIDEO_CAPS="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)VP8"
VIDEO_DEC="rtpvp8depay ! vp8dec"
VIDEO_SINK="videoconvert ! autovideosink"

gst-launch-1.0 \
  rtpbin name=rtpbin \
  udpsrc caps=$VIDEO_CAPS address=$HOST port=$VIDEO_RTP_PORT \
  ! rtpbin.recv_rtp_sink_0 rtpbin. \
  ! $VIDEO_DEC \
  ! $VIDEO_SINK
1

There are 1 best solutions below

0
Daniel Koch On

I have asked in the gstreamer-devel mailing list, and it turns out my buffers are missing timestamps.

So I fixed the RTP sender by setting the do-timestamp property TRUE for the appsrc.

  g_object_set(
    data.video_appsrc,
    "is-live", TRUE,
    "stream-type", 0,
    "format", GST_FORMAT_TIME,
    "do-timestamp", TRUE,
    NULL
  );