From 4cd0563346f56d10a735bfc9d7ea67a561491179 Mon Sep 17 00:00:00 2001 From: rufael Date: Wed, 1 Dec 2021 12:07:21 +0100 Subject: [PATCH 1/3] added ingest mpd and ingest mpd push thread --- ingest-tools/fmp4ingest.cpp | 121 ++++++++++++++++++- ingest-tools/test_files/ingest.mpd | 180 +++++++++++++++++++++++++++++ 2 files changed, 296 insertions(+), 5 deletions(-) create mode 100644 ingest-tools/test_files/ingest.mpd diff --git a/ingest-tools/fmp4ingest.cpp b/ingest-tools/fmp4ingest.cpp index 50164dc..94c310e 100644 --- a/ingest-tools/fmp4ingest.cpp +++ b/ingest-tools/fmp4ingest.cpp @@ -219,7 +219,7 @@ struct ingest_post_state_t // generate segment name (init or media) // init shall not contain $Number$ or $Time$ // media shall contain $Number$ or $Time$ - +// $RepresentationID$ is filename minus the file extension string get_path_from_template( string &template_string, string &file_name, @@ -287,6 +287,109 @@ string get_path_from_template( return out_string; } +int push_mpd_thread( + push_options_t opt, + string post_url_string, + std::string file_name) +{ + try + { + /* open the mpd file and store the bytes*/ + // mpd_bytes = ...... + string mpdbytes; + try { + ifstream mpdfile = ifstream(file_name); + char c = 'i'; + + while (mpdfile.good() && (c != EOF)) + { + mpdfile.get(c); + if (c != EOF) + mpdbytes.push_back(c); + } + + // add optional validation step ? + + mpdfile.close(); + } + catch (...) + { + std::cout << "error loading mpd, mpd thread not started" << std::endl; + return 1; + } + + // todo add a validation step + if (mpdbytes.size() < 2) + { + std::cout << "error loading mpd, mpd thread not started" << std::endl; + return 1; + } + + while (!stop_all) + { + // setup curl + CURL* curl; + CURLcode res; + curl = curl_easy_init(); + + curl_easy_setopt(curl, CURLOPT_URL, post_url_string.c_str()); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char*)&mpdbytes[0]); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)mpdbytes.size()); + + curl_easy_setopt(curl, CURLOPT_POST, 1); + curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); + + curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0L); + + if (opt.basic_auth_.size()) + { + curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); + curl_easy_setopt(curl, CURLOPT_USERNAME, opt.basic_auth_name_.c_str()); + curl_easy_setopt(curl, CURLOPT_USERPWD, opt.basic_auth_.c_str()); + } + + if (opt.ssl_cert_.size()) + curl_easy_setopt(curl, CURLOPT_SSLCERT, opt.ssl_cert_.c_str()); + + if (opt.ssl_key_.size()) + curl_easy_setopt(curl, CURLOPT_SSLKEY, opt.ssl_key_.c_str()); + + if (opt.ssl_key_pass_.size()) + curl_easy_setopt(curl, CURLOPT_KEYPASSWD, opt.ssl_key_pass_.c_str()); + + if (!opt.dry_run_) + { + std::cout << "======= posting mpd =====" << std::endl; + std::cout << mpdbytes << std::endl; + std::cout << "======= end mpd =====" << std::endl; + + res = curl_easy_perform(curl); + + /* Check for errors */ + if (res != CURLE_OK) + { + fprintf(stderr, "---- connection with server failed in mpd push thread %s\n", + curl_easy_strerror(res)); + curl_easy_cleanup(curl); + } + else + { + fprintf(stderr, "---- connection with server sucessfull %s\n", + curl_easy_strerror(res)); + } + } + // send the mpd every 5 seconds + std::this_thread::sleep_for(std::chrono::milliseconds(5000)); + if (stop_all == true) + return 1; + } + } + catch(...) + {} + return 1; +} + int push_thread(ingest_stream l_ingest_stream, push_options_t opt, string post_url_string, std::string file_name) { @@ -548,7 +651,7 @@ int main(int argc, char * argv[]) if (!input.good()) { - std::cout << "failed loading input file: [cmf[tavm]]" << string(*it) << endl; + std::cout << "failed loading input file: [cmf[tavm]]" << *it << endl; push_options_t::print_options(); return 0; } @@ -563,7 +666,8 @@ int main(int argc, char * argv[]) double l_duration = (double) l_ingest_stream.get_duration() / (double) l_ingest_stream.init_fragment_.get_time_scale(); - if (l_duration > opts.cmaf_presentation_duration_) { + if (l_duration > opts.cmaf_presentation_duration_) + { opts.cmaf_presentation_duration_ = l_duration; std::cout << "CMAF presentation duration updated to: " << l_duration << " seconds " << std::endl; } @@ -599,13 +703,20 @@ int main(int argc, char * argv[]) if(it->substr(it->find_last_of(".") + 1) == "cmfm") { cout << "push thread: " << post_url_string << endl; - thread_ptr thread_n(new thread(push_thread, l_istreams[l_index], opts, post_url_string, (string) *it)); + thread_ptr thread_n(new thread(push_thread, l_istreams[l_index], opts, post_url_string, *it)); threads.push_back(thread_n); } + else if (it->substr(it->find_last_of(".") + 1) == "mpd") + { + cout << "push thread: " << post_url_string << endl; + string l_url = opts.url_ + '/' + *it; + thread_ptr thread_n(new thread(push_mpd_thread, opts, l_url, *it)); + threads.push_back(thread_n); + } else { cout << "push thread: " << post_url_string << endl; - thread_ptr thread_n(new thread(push_thread, l_istreams[l_index], opts, post_url_string, (string) *it)); + thread_ptr thread_n(new thread(push_thread, l_istreams[l_index], opts, post_url_string, *it)); threads.push_back(thread_n); } l_index++; diff --git a/ingest-tools/test_files/ingest.mpd b/ingest-tools/test_files/ingest.mpd new file mode 100644 index 0000000..5c7b214 --- /dev/null +++ b/ingest-tools/test_files/ingest.mpd @@ -0,0 +1,180 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + From 1cdaa91f7a493a26ef8b727300ff7829534a8d8e Mon Sep 17 00:00:00 2001 From: rufael Date: Wed, 1 Dec 2021 13:17:12 +0100 Subject: [PATCH 2/3] some additional cosmetic fixes --- ingest-tools/fmp4ingest.cpp | 95 ++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 44 deletions(-) diff --git a/ingest-tools/fmp4ingest.cpp b/ingest-tools/fmp4ingest.cpp index 94c310e..b862e74 100644 --- a/ingest-tools/fmp4ingest.cpp +++ b/ingest-tools/fmp4ingest.cpp @@ -69,7 +69,7 @@ int get_remote_sync_epoch(uint64_t *res_time, string &wc_uri_) struct push_options_t { push_options_t() - : url_("http://localhost/live/video.isml/video.ism") + : url_("http://localhost/live/video.isml/") , realtime_(false) , daemon_(false) , loop_(0) @@ -88,7 +88,7 @@ struct push_options_t , avail_(0) , avail_dur_(0) , announce_(60.0) - ,anchor_scale_(1) + , anchor_scale_(1) { } @@ -200,6 +200,7 @@ struct push_options_t uint32_t anchor_scale_; }; +// is this still used ? this was used for the long running post struct ingest_post_state_t { bool init_done_; // flag if init fragment was sent @@ -287,6 +288,7 @@ string get_path_from_template( return out_string; } +// thread to push mpd every 5 seconds, not the mpd needs to be formatted correctly! int push_mpd_thread( push_options_t opt, string post_url_string, @@ -399,7 +401,7 @@ int push_thread(ingest_stream l_ingest_stream, l_ingest_stream.get_init_segment_data(init_seg_dat); std:string out_file = "o_" + file_name; - ofstream outf = std::ofstream(out_file, std::ios::binary); + ofstream outf = ofstream(out_file, std::ios::binary); if (outf.good() && opt.dry_run_) outf.write((char *)&init_seg_dat[0], init_seg_dat.size()); @@ -646,34 +648,38 @@ int main(int argc, char * argv[]) int l_index = 0; for (auto it = opts.input_files_.begin(); it != opts.input_files_.end(); ++it) - { - ifstream input(*it, ifstream::binary); + { + // only consider the cmaf input files + if (! (it->substr(it->find_last_of(".") + 1) == "mpd")) { - if (!input.good()) - { - std::cout << "failed loading input file: [cmf[tavm]]" << *it << endl; - push_options_t::print_options(); - return 0; - } + ifstream input(*it, ifstream::binary); - ingest_stream &l_ingest_stream = l_istreams[l_index]; - l_ingest_stream.load_from_file(input); + if (!input.good()) + { + std::cout << "failed loading input file: [cmf[tavm]]" << *it << endl; + push_options_t::print_options(); + return 0; + } - // patch the tfdt values with an offset time - if (opts.wc_off_) - l_ingest_stream.patch_tfdt(opts.wc_time_start_, true, opts.anchor_scale_); + ingest_stream& l_ingest_stream = l_istreams[l_index]; + l_ingest_stream.load_from_file(input); - - double l_duration = (double) l_ingest_stream.get_duration() / (double) l_ingest_stream.init_fragment_.get_time_scale(); + // patch the tfdt values with an offset time + if (opts.wc_off_) + l_ingest_stream.patch_tfdt(opts.wc_time_start_, true, opts.anchor_scale_); - if (l_duration > opts.cmaf_presentation_duration_) - { - opts.cmaf_presentation_duration_ = l_duration; - std::cout << "CMAF presentation duration updated to: " << l_duration << " seconds " << std::endl; - } - l_index++; - input.close(); + double l_duration = (double)l_ingest_stream.get_duration() / (double)l_ingest_stream.init_fragment_.get_time_scale(); + + if (l_duration > opts.cmaf_presentation_duration_) + { + opts.cmaf_presentation_duration_ = l_duration; + std::cout << "CMAF presentation duration updated to: " << l_duration << " seconds " << std::endl; + } + + l_index++; + input.close(); + } } l_index = 0; @@ -683,8 +689,6 @@ int main(int argc, char * argv[]) string post_url_string = opts.url_ + "/Streams(" + "out_avail_track.cmfm" + ")"; event_track::gen_avail_files((uint32_t ) (opts.cmaf_presentation_duration_ * 1000), 2000, opts.avail_dur_, opts.avail_, opts.wc_time_start_); - - ifstream input_file_meta(avail_track, ifstream::binary); meta_ingest_stream.load_from_file(input_file_meta); @@ -698,31 +702,34 @@ int main(int argc, char * argv[]) for (auto it = opts.input_files_.begin(); it != opts.input_files_.end(); ++it) { - string post_url_string = opts.url_ + "/Streams(" + *it + ")"; - - if(it->substr(it->find_last_of(".") + 1) == "cmfm") - { - cout << "push thread: " << post_url_string << endl; - thread_ptr thread_n(new thread(push_thread, l_istreams[l_index], opts, post_url_string, *it)); - threads.push_back(thread_n); - } - else if (it->substr(it->find_last_of(".") + 1) == "mpd") + // only consider the cmaf input files + if (it->substr(it->find_last_of(".") + 1) == "mpd") { - cout << "push thread: " << post_url_string << endl; + cout << "push thread for mpd pushing " << endl; string l_url = opts.url_ + '/' + *it; thread_ptr thread_n(new thread(push_mpd_thread, opts, l_url, *it)); threads.push_back(thread_n); } - else - { - cout << "push thread: " << post_url_string << endl; - thread_ptr thread_n(new thread(push_thread, l_istreams[l_index], opts, post_url_string, *it)); - threads.push_back(thread_n); + else { + + string post_url_string = opts.url_ + "/Streams(" + *it + ")"; + + if (it->substr(it->find_last_of(".") + 1) == "cmfm") + { + cout << "push thread: " << post_url_string << endl; + thread_ptr thread_n(new thread(push_thread, l_istreams[l_index], opts, post_url_string, *it)); + threads.push_back(thread_n); + } + else + { + cout << "push thread: " << post_url_string << endl; + thread_ptr thread_n(new thread(push_thread, l_istreams[l_index], opts, post_url_string, *it)); + threads.push_back(thread_n); + } + l_index++; } - l_index++; } - for (auto& th : threads) th->join(); } From 284f02dfdeee6f5736a66bbd5cc27ae6d07d179c Mon Sep 17 00:00:00 2001 From: rufael Date: Wed, 1 Dec 2021 13:41:53 +0100 Subject: [PATCH 3/3] some additional cosmetic fixes --- ingest-tools/fmp4ingest.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ingest-tools/fmp4ingest.cpp b/ingest-tools/fmp4ingest.cpp index b862e74..65b5ba8 100644 --- a/ingest-tools/fmp4ingest.cpp +++ b/ingest-tools/fmp4ingest.cpp @@ -87,7 +87,6 @@ struct push_options_t , cmaf_presentation_duration_(0) , avail_(0) , avail_dur_(0) - , announce_(60.0) , anchor_scale_(1) { } @@ -108,7 +107,6 @@ struct push_options_t // " [--chunked] Use chunked Transfer-Encoding for POST (long running post) otherwise short running per fragment post \n" " [--avail] signal an advertisment slot every arg1 ms with duration of arg2 ms \n" " [--dry_run] Do a dry run and write the output files to disk directly for checking file and box integrity\n" - " [--announce] specify the number of seconds in advance to presenation time to send an avail" " [--auth] Basic Auth Password \n" " [--aname] Basic Auth User Name \n" " [--sslcert] TLS 1.2 client certificate \n" @@ -139,7 +137,6 @@ struct push_options_t if (t.compare("--wc_uri") == 0) { wc_uri_ = string(argv[++i]); continue; } if (t.compare("--auth") == 0) { basic_auth_ = string(argv[++i]); continue; } if (t.compare("--avail") == 0) { avail_ = atoi(argv[++i]); avail_dur_= atoi(argv[++i]); continue; } - if (t.compare("--announce") == 0) { announce_ = atof(argv[++i]); continue; } if (t.compare("--aname") == 0) { basic_auth_name_ = string(argv[++i]); continue; } if (t.compare("--sslcert") == 0) { ssl_cert_ = string(argv[++i]); continue; } if (t.compare("--sslkey") == 0) { ssl_key_ = string(argv[++i]); continue; } @@ -189,7 +186,6 @@ struct push_options_t string basic_auth_name_; // user name with basic auth string basic_auth_; // password with basic auth - double announce_; // number of seconds to send an announce in advance double cmaf_presentation_duration_; vector input_files_;