Skip to content

Commit

Permalink
[FEATURE] Support putFragmentMetadata from kvssink (#1122)
Browse files Browse the repository at this point in the history
* use custom event for putFragmentMetadata from kvssink

* kvssink-fragment-metadata

* clwanup

* rename to kvs-add-metadata

* limit metadata when max count reached

* cleanup send_custom_event

* max fragment count

* cleanup

* revert log4cplus cmake

* use MAX_FRAGMENT_METADATA_TAGS

* abstract event trigger

* cleanup: rename

* update failure case check

* remove logic maintaining the count

* cleanup

* cleanup-2

* cleanup-3

* address comments

* fix new lines and indentation

* fix new lines and indentation

* fix new lines and indentation

* clean up

* cleanup

* cleanup -3

* remove gstkvssink.h inmport from samples

* remove unused unique_ptr<Credentials>

* fix the log-line
  • Loading branch information
niyatim23 committed May 16, 2024
1 parent edfde9d commit 0b5c45b
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 48 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
.github/github_deploy_key.enc
.idea
build
media
cmake-build-debug/
cmake-build-release/
doc/
Expand Down
5 changes: 5 additions & 0 deletions samples/include.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
#ifndef _KVS_SAMPLE_INCLUDE_H
#define _KVS_SAMPLE_INCLUDE_H

#define KVS_ADD_METADATA_G_STRUCT_NAME "kvs-add-metadata"
#define KVS_ADD_METADATA_NAME "name"
#define KVS_ADD_METADATA_VALUE "value"
#define KVS_ADD_METADATA_PERSISTENT "persist"

#define STATUS_KVS_GSTREAMER_SAMPLE_BASE 0x00080000
#define STATUS_KVS_GSTREAMER_SAMPLE_ERROR STATUS_KVS_GSTREAMER_SAMPLE_BASE + 0x00000001
#define STATUS_KVS_GSTREAMER_SAMPLE_INTERRUPTED STATUS_KVS_GSTREAMER_SAMPLE_BASE + 0x00000002
Expand Down
154 changes: 117 additions & 37 deletions samples/kvssink_gstreamer_sample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
#include <vector>
#include <stdlib.h>
#include <mutex>
#include <IotCertCredentialProvider.h>
#include "gstreamer/gstkvssink.h"
#include <thread>
#include <com/amazonaws/kinesis/video/client/Include.h>
#include "include.h"

using namespace std;
Expand Down Expand Up @@ -50,6 +49,8 @@ typedef struct _CustomData {
synthetic_dts(0),
last_unpersisted_file_idx(0),
stream_status(STATUS_SUCCESS),
put_fragment_metadata_frequency_seconds(2),
fragment_metadata_timer_id(0),
base_pts(0),
max_frame_pts(0),
key_frame_pts(0),
Expand All @@ -65,6 +66,10 @@ typedef struct _CustomData {
bool h264_stream_supported;
char *stream_name;
mutex file_list_mtx;
int put_fragment_metadata_frequency_seconds;
int fragment_metadata_timer_id;
int metadata_counter = 0;
bool persist_flag = true;

// list of files to upload.
vector<FileInfo> file_list;
Expand Down Expand Up @@ -102,9 +107,7 @@ typedef struct _CustomData {
volatile StreamSource streamSource;

string rtsp_url;

unique_ptr<Credentials> credential;


uint64_t synthetic_dts;

bool use_absolute_fragment_times;
Expand Down Expand Up @@ -155,6 +158,11 @@ static bool resolution_supported(GstCaps *src_caps, GstCaps *query_caps_raw, Gst

/* callback when eos (End of Stream) is posted on bus */
static void eos_cb(GstElement *sink, GstMessage *message, CustomData *data) {
if (data->fragment_metadata_timer_id != 0) {
g_source_remove(data->fragment_metadata_timer_id);
data->fragment_metadata_timer_id = 0;
LOG_TRACE("Removing the put_metadata timer");
}
if (data->streamSource == FILE_SOURCE) {
// bookkeeping base_pts. add 1ms to avoid overlap.
data->base_pts += +data->max_frame_pts + duration_cast<nanoseconds>(milliseconds(1)).count();
Expand All @@ -175,6 +183,12 @@ static void error_cb(GstBus *bus, GstMessage *msg, CustomData *data) {
GError *err;
gchar *debug_info;

if (data->fragment_metadata_timer_id != 0) {
g_source_remove(data->fragment_metadata_timer_id);
data->fragment_metadata_timer_id = 0;
LOG_TRACE("Removing the put_metadata timer");
}

/* Print error details on the screen */
gst_message_parse_error(msg, &err, &debug_info);
g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
Expand Down Expand Up @@ -215,8 +229,14 @@ void timer(CustomData *data) {

/* Function handles sigint signal */
void sigint_handler(int sigint){
LOG_DEBUG("SIGINT received. Exiting graceully");
LOG_DEBUG("SIGINT received. Exiting gracefully");

if (data_global.fragment_metadata_timer_id != 0) {
g_source_remove(data_global.fragment_metadata_timer_id);
data_global.fragment_metadata_timer_id = 0;
LOG_TRACE("Removing the put_metadata timer");
}

if(data_global.main_loop != NULL){
g_main_loop_quit(data_global.main_loop);
}
Expand All @@ -236,17 +256,17 @@ void determine_credentials(GstElement *kvssink, CustomData *data) {
nullptr != (private_key_path = getenv("PRIVATE_KEY_PATH")) &&
nullptr != (role_alias = getenv("ROLE_ALIAS")) &&
nullptr != (ca_cert_path = getenv("CA_CERT_PATH"))) {
// set the IoT Credentials if provided in envvar
GstStructure *iot_credentials = gst_structure_new(
"iot-certificate",
"iot-thing-name", G_TYPE_STRING, data->stream_name,
"endpoint", G_TYPE_STRING, iot_credential_endpoint,
"cert-path", G_TYPE_STRING, cert_path,
"key-path", G_TYPE_STRING, private_key_path,
"ca-path", G_TYPE_STRING, ca_cert_path,
"role-aliases", G_TYPE_STRING, role_alias, NULL);
g_object_set(G_OBJECT (kvssink), "iot-certificate", iot_credentials, NULL);
// set the IoT Credentials if provided in envvar
GstStructure *iot_credentials = gst_structure_new(
"iot-certificate",
"iot-thing-name", G_TYPE_STRING, data->stream_name,
"endpoint", G_TYPE_STRING, iot_credential_endpoint,
"cert-path", G_TYPE_STRING, cert_path,
"key-path", G_TYPE_STRING, private_key_path,
"ca-path", G_TYPE_STRING, ca_cert_path,
"role-aliases", G_TYPE_STRING, role_alias, NULL);
g_object_set(G_OBJECT (kvssink), "iot-certificate", iot_credentials, NULL);
gst_structure_free(iot_credentials);
// kvssink will search for long term credentials in envvar automatically so no need to include here
// if no long credentials or IoT credentials provided will look for credential file as last resort
Expand All @@ -255,7 +275,65 @@ void determine_credentials(GstElement *kvssink, CustomData *data) {
}
}

int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElement *pipeline) {
/*
This function creates a GstStructure and uses it to trigger the GST_EVENT_CUSTOM_DOWNSTREAM for put_fragment_metadata
*/
bool put_fragment_metadata(GstElement* element, const std::string name, const std::string value, bool persistent) {
GstStructure *metadata = gst_structure_new_empty(KVS_ADD_METADATA_G_STRUCT_NAME);
gst_structure_set(metadata, KVS_ADD_METADATA_NAME, G_TYPE_STRING, name.c_str(),
KVS_ADD_METADATA_VALUE, G_TYPE_STRING, value.c_str(),
KVS_ADD_METADATA_PERSISTENT, G_TYPE_BOOLEAN, persistent, NULL);
GstEvent* event = gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM, metadata);
LOG_TRACE("Emit the put_fragment_metadata event with structure: " << std::string(gst_structure_to_string (metadata)));
return gst_element_send_event(element, event);
}

/*
Function to put fragment metadata: name, value, and persist values
This is a sample function. This function alternates between putting persistent and non-persistent metadata
until it puts maximum number (10) of metadata in a fragment. After that, it removes the timer that fires this function
Customers can write their own put_metadata and trigger it either using the timer or some other logic. This function can contain custom logic to
generate the metadata. To trigger the downstream event that calls the API putKinesisVideoFragmentMetadata,
put_fragment_metadata must be called as shown below.
Example:
<metadata_name, metadata_value, is_persistent>
"metadata_name_1", "metadata_value_1", 0
"metadata_name_2", "metadata_value_2", 1
"metadata_name_3", "metadata_value_3", 0
"metadata_name_4", "metadata_value_4", 1
"metadata_name_1", "metadata_value_5", 0
"metadata_name_2", "metadata_value_6", 1
"metadata_name_3", "metadata_value_7", 0
"metadata_name_4", "metadata_value_8", 1
To remove a persistent metadata entry, call the same function with empty value
"metadata_name_2", "", 1
*/
static void put_metadata(GstElement* element) {
std::ostringstream metadata_name_stream, metadata_value_stream;

++data_global.metadata_counter;
data_global.persist_flag = !data_global.persist_flag;

metadata_name_stream << "metadata_name_" << data_global.metadata_counter;
metadata_value_stream << "metadata_value_" << data_global.metadata_counter;

// All even metadata_value_n are persistent, all odd ones are non-persistent.
if (data_global.metadata_counter == 2 * MAX_FRAGMENT_METADATA_COUNT) {
if (data_global.fragment_metadata_timer_id != 0) {
g_source_remove(data_global.fragment_metadata_timer_id);
data_global.fragment_metadata_timer_id = 0;
LOG_WARN("Removing the put_metadata timer as the the max capacity for metadata in a fragment is reached");
}
}

if (!put_fragment_metadata(element, metadata_name_stream.str(), metadata_value_stream.str(), data_global.persist_flag)) {
LOG_WARN("Failed to put fragment metadata with name:" << metadata_name_stream.str() << " and value:" << metadata_value_stream.str());
}
}

int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElement *pipeline, GstElement *kvssink) {

bool vtenc = false, isOnRpi = false;

Expand Down Expand Up @@ -328,7 +406,7 @@ int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElem
LOG_DEBUG("Streaming with live source and width: " << width << ", height: " << height << ", fps: " << framerate
<< ", bitrateInKBPS" << bitrateInKBPS);

GstElement *source_filter, *filter, *kvssink, *h264parse, *encoder, *source, *video_convert;
GstElement *source_filter, *filter, *h264parse, *encoder, *source, *video_convert;

/* create the elements */
source_filter = gst_element_factory_make("capsfilter", "source_filter");
Expand All @@ -341,11 +419,6 @@ int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElem
LOG_ERROR("Failed to create capsfilter (2)");
return 1;
}
kvssink = gst_element_factory_make("kvssink", "kvssink");
if (!kvssink) {
LOG_ERROR("Failed to create kvssink");
return 1;
}
h264parse = gst_element_factory_make("h264parse", "h264parse"); // needed to enforce avc stream format
if (!h264parse) {
LOG_ERROR("Failed to create h264parse");
Expand Down Expand Up @@ -540,7 +613,7 @@ int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElem
return 0;
}

int gstreamer_rtsp_source_init(int argc, char *argv[], CustomData *data, GstElement *pipeline) {
int gstreamer_rtsp_source_init(int argc, char *argv[], CustomData *data, GstElement *pipeline, GstElement *kvssink) {
// process runtime if provided
if (argc == 5){
if ((0 == STRCMPI(argv[3], "-runtime")) ||
Expand All @@ -552,10 +625,9 @@ int gstreamer_rtsp_source_init(int argc, char *argv[], CustomData *data, GstElem
}
}
}
GstElement *filter, *kvssink, *depay, *source, *h264parse;
GstElement *filter, *depay, *source, *h264parse;

filter = gst_element_factory_make("capsfilter", "filter");
kvssink = gst_element_factory_make("kvssink", "kvssink");
depay = gst_element_factory_make("rtph264depay", "depay");
source = gst_element_factory_make("rtspsrc", "source");
h264parse = gst_element_factory_make("h264parse", "h264parse");
Expand Down Expand Up @@ -603,14 +675,13 @@ int gstreamer_rtsp_source_init(int argc, char *argv[], CustomData *data, GstElem
return 0;
}

int gstreamer_file_source_init(CustomData *data, GstElement *pipeline) {
int gstreamer_file_source_init(CustomData *data, GstElement *pipeline, GstElement *kvssink) {

GstElement *demux, *kvssink, *filesrc, *h264parse, *filter, *queue;
GstElement *demux, *filesrc, *h264parse, *filter, *queue;
string file_suffix;
string file_path = data->file_list.at(data->current_file_idx).path;

filter = gst_element_factory_make("capsfilter", "filter");
kvssink = gst_element_factory_make("kvssink", "kvssink");
filesrc = gst_element_factory_make("filesrc", "filesrc");
h264parse = gst_element_factory_make("h264parse", "h264parse");
queue = gst_element_factory_make("queue", "queue");
Expand Down Expand Up @@ -680,28 +751,34 @@ int gstreamer_init(int argc, char *argv[], CustomData *data) {
/* init GStreamer */
gst_init(&argc, &argv);

GstElement *pipeline;
GstElement *pipeline, *kvssink;
int ret;
GstStateChangeReturn gst_ret;

// Reset first frame pts
data->first_pts = GST_CLOCK_TIME_NONE;

kvssink = gst_element_factory_make("kvssink", "kvssink");
if (!kvssink) {
LOG_ERROR("Failed to create kvssink");
return 1;
}

switch (data->streamSource) {
case LIVE_SOURCE:
LOG_INFO("Streaming from live source");
pipeline = gst_pipeline_new("live-kinesis-pipeline");
ret = gstreamer_live_source_init(argc, argv, data, pipeline);
ret = gstreamer_live_source_init(argc, argv, data, pipeline, kvssink);
break;
case RTSP_SOURCE:
LOG_INFO("Streaming from rtsp source");
pipeline = gst_pipeline_new("rtsp-kinesis-pipeline");
ret = gstreamer_rtsp_source_init(argc, argv, data, pipeline);
ret = gstreamer_rtsp_source_init(argc, argv, data, pipeline, kvssink);
break;
case FILE_SOURCE:
LOG_INFO("Streaming from file source");
pipeline = gst_pipeline_new("file-kinesis-pipeline");
ret = gstreamer_file_source_init(data, pipeline);
ret = gstreamer_file_source_init(data, pipeline, kvssink);
break;
}

Expand All @@ -715,12 +792,16 @@ int gstreamer_init(int argc, char *argv[], CustomData *data) {
g_signal_connect(G_OBJECT(bus), "message::error", (GCallback) error_cb, data);
g_signal_connect(G_OBJECT(bus), "message::eos", G_CALLBACK(eos_cb), data);
gst_object_unref(bus);

// Create a GStreamer timer to generate and put fragment metadata tags every 2 seconds
data->fragment_metadata_timer_id = g_timeout_add_seconds(data->put_fragment_metadata_frequency_seconds, reinterpret_cast<GSourceFunc>(put_metadata), kvssink);

/* start streaming */
gst_ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
if (gst_ret == GST_STATE_CHANGE_FAILURE) {
g_printerr("Unable to set the pipeline to the playing state.\n");
gst_object_unref(pipeline);
data->stream_status = STATUS_KVS_GSTREAMER_SAMPLE_ERROR;
data->stream_status = STATUS_KVS_GSTREAMER_SAMPLE_ERROR;
return 1;
}
// set timer if valid runtime provided (non-positive values are ignored)
Expand Down Expand Up @@ -828,7 +909,7 @@ int main(int argc, char *argv[]) {
} else if(stream_status == STATUS_KVS_GSTREAMER_SAMPLE_INTERRUPTED){
LOG_ERROR("File upload interrupted. Terminating.");
continue_uploading = false;
}else { // non fatal case. retry upload
} else { // non fatal case. retry upload
LOG_ERROR("stream error occurred: " << stream_status << ". Terminating.");
do_retry = true;
}
Expand Down Expand Up @@ -874,4 +955,3 @@ int main(int argc, char *argv[]) {

return 0;
}

24 changes: 13 additions & 11 deletions src/gstreamer/gstkvssink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ GST_DEBUG_CATEGORY_STATIC (gst_kvs_sink_debug);
#define KVS_ADD_METADATA_NAME "name"
#define KVS_ADD_METADATA_VALUE "value"
#define KVS_ADD_METADATA_PERSISTENT "persist"

#define KVS_CLIENT_USER_AGENT_NAME "AWS-SDK-KVS-CPP-CLIENT"

#define DEFAULT_AUDIO_TRACK_NAME "audio"
Expand Down Expand Up @@ -1161,29 +1162,30 @@ gst_kvs_sink_handle_sink_event (GstCollectPads *pads,
gboolean persistent;
bool is_persist;

if (!gst_structure_has_name(structure, KVS_ADD_METADATA_G_STRUCT_NAME)) {
goto CleanUp;
}

LOG_INFO("received kvs-add-metadata event for " << kvssink->stream_name);
if (NULL == gst_structure_get_string(structure, KVS_ADD_METADATA_NAME) ||
if (!gst_structure_has_name(structure, KVS_ADD_METADATA_G_STRUCT_NAME) ||
NULL == gst_structure_get_string(structure, KVS_ADD_METADATA_NAME) ||
NULL == gst_structure_get_string(structure, KVS_ADD_METADATA_VALUE) ||
!gst_structure_get_boolean(structure, KVS_ADD_METADATA_PERSISTENT, &persistent)) {

LOG_WARN("Event structure contains invalid field: " << std::string(gst_structure_to_string (structure)) << " for " << kvssink->stream_name);
ret = FALSE;
LOG_WARN("Event structure is invalid or it contains an invalid field(s): " << std::string(gst_structure_to_string (structure)) << " for " << kvssink->stream_name);
goto CleanUp;
}
LOG_TRACE("Received kvs-add-metadata event for " << kvssink->stream_name);

metadata_name = std::string(gst_structure_get_string(structure, KVS_ADD_METADATA_NAME));
metadata_value = std::string(gst_structure_get_string(structure, KVS_ADD_METADATA_VALUE));
is_persist = persistent;

bool result = data->kinesis_video_stream->putFragmentMetadata(metadata_name, metadata_value, is_persist);
if (!result) {
LOG_WARN("Failed to putFragmentMetadata. name: " << metadata_name << ", value: " << metadata_value << ", persistent: " << is_persist << " for " << kvssink->stream_name);
}

gst_event_unref (event);
event = NULL;

if (!result) {
ret = FALSE;
LOG_WARN("Failed to putFragmentMetadata for name: " << metadata_name << ", value: " << metadata_value << ", persistent: " << is_persist << " for " << kvssink->stream_name);
}

break;
}
case GST_EVENT_EOS: {
Expand Down

0 comments on commit 0b5c45b

Please sign in to comment.