Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest mpd push #22

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 158 additions & 44 deletions ingest-tools/fmp4ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ int get_remote_sync_epoch(uint64_t *res_time, string &wc_uri_)
struct push_options_t
{
push_options_t()
: url_("http://localhost/live/video.isml/video.ism")
: url_("http://localhost/live/video.isml/")
, realtime_(false)
, daemon_(false)
, loop_(0)
Expand All @@ -87,8 +87,7 @@ struct push_options_t
, cmaf_presentation_duration_(0)
, avail_(0)
, avail_dur_(0)
, announce_(60.0)
,anchor_scale_(1)
, anchor_scale_(1)
{
}

Expand All @@ -108,7 +107,6 @@ struct push_options_t
// " [--chunked] Use chunked Transfer-Encoding for POST (long running post) otherwise short running per fragment post \n"
" [--avail] signal an advertisment slot every arg1 ms with duration of arg2 ms \n"
" [--dry_run] Do a dry run and write the output files to disk directly for checking file and box integrity\n"
" [--announce] specify the number of seconds in advance to presenation time to send an avail"
" [--auth] Basic Auth Password \n"
" [--aname] Basic Auth User Name \n"
" [--sslcert] TLS 1.2 client certificate \n"
Expand Down Expand Up @@ -139,7 +137,6 @@ struct push_options_t
if (t.compare("--wc_uri") == 0) { wc_uri_ = string(argv[++i]); continue; }
if (t.compare("--auth") == 0) { basic_auth_ = string(argv[++i]); continue; }
if (t.compare("--avail") == 0) { avail_ = atoi(argv[++i]); avail_dur_= atoi(argv[++i]); continue; }
if (t.compare("--announce") == 0) { announce_ = atof(argv[++i]); continue; }
if (t.compare("--aname") == 0) { basic_auth_name_ = string(argv[++i]); continue; }
if (t.compare("--sslcert") == 0) { ssl_cert_ = string(argv[++i]); continue; }
if (t.compare("--sslkey") == 0) { ssl_key_ = string(argv[++i]); continue; }
Expand Down Expand Up @@ -189,7 +186,6 @@ struct push_options_t
string basic_auth_name_; // user name with basic auth
string basic_auth_; // password with basic auth

double announce_; // number of seconds to send an announce in advance
double cmaf_presentation_duration_;
vector<string> input_files_;

Expand All @@ -200,6 +196,7 @@ struct push_options_t
uint32_t anchor_scale_;
};

// is this still used ? this was used for the long running post
struct ingest_post_state_t
{
bool init_done_; // flag if init fragment was sent
Expand All @@ -219,7 +216,7 @@ struct ingest_post_state_t
// generate segment name (init or media)
// init shall not contain $Number$ or $Time$
// media shall contain $Number$ or $Time$

// $RepresentationID$ is filename minus the file extension
string get_path_from_template(
string &template_string,
string &file_name,
Expand Down Expand Up @@ -287,6 +284,110 @@ string get_path_from_template(
return out_string;
}

// thread to push mpd every 5 seconds, not the mpd needs to be formatted correctly!
int push_mpd_thread(
push_options_t opt,
string post_url_string,
std::string file_name)
{
try
{
/* open the mpd file and store the bytes*/
// mpd_bytes = ......
string mpdbytes;
try {
ifstream mpdfile = ifstream(file_name);
char c = 'i';

while (mpdfile.good() && (c != EOF))
{
mpdfile.get(c);
if (c != EOF)
mpdbytes.push_back(c);
}

// add optional validation step ?

mpdfile.close();
}
catch (...)
{
std::cout << "error loading mpd, mpd thread not started" << std::endl;
return 1;
}

// todo add a validation step
if (mpdbytes.size() < 2)
{
std::cout << "error loading mpd, mpd thread not started" << std::endl;
return 1;
}

while (!stop_all)
{
// setup curl
CURL* curl;
CURLcode res;
curl = curl_easy_init();

curl_easy_setopt(curl, CURLOPT_URL, post_url_string.c_str());
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char*)&mpdbytes[0]);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)mpdbytes.size());

curl_easy_setopt(curl, CURLOPT_POST, 1);
curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);

curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0L);

if (opt.basic_auth_.size())
{
curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
curl_easy_setopt(curl, CURLOPT_USERNAME, opt.basic_auth_name_.c_str());
curl_easy_setopt(curl, CURLOPT_USERPWD, opt.basic_auth_.c_str());
}

if (opt.ssl_cert_.size())
curl_easy_setopt(curl, CURLOPT_SSLCERT, opt.ssl_cert_.c_str());

if (opt.ssl_key_.size())
curl_easy_setopt(curl, CURLOPT_SSLKEY, opt.ssl_key_.c_str());

if (opt.ssl_key_pass_.size())
curl_easy_setopt(curl, CURLOPT_KEYPASSWD, opt.ssl_key_pass_.c_str());

if (!opt.dry_run_)
{
std::cout << "======= posting mpd =====" << std::endl;
std::cout << mpdbytes << std::endl;
std::cout << "======= end mpd =====" << std::endl;

res = curl_easy_perform(curl);

/* Check for errors */
if (res != CURLE_OK)
{
fprintf(stderr, "---- connection with server failed in mpd push thread %s\n",
curl_easy_strerror(res));
curl_easy_cleanup(curl);
}
else
{
fprintf(stderr, "---- connection with server sucessfull %s\n",
curl_easy_strerror(res));
}
}
// send the mpd every 5 seconds
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
if (stop_all == true)
return 1;
}
}
catch(...)
{}
return 1;
}

int push_thread(ingest_stream l_ingest_stream,
push_options_t opt, string post_url_string, std::string file_name)
{
Expand All @@ -296,7 +397,7 @@ int push_thread(ingest_stream l_ingest_stream,
l_ingest_stream.get_init_segment_data(init_seg_dat);

std:string out_file = "o_" + file_name;
ofstream outf = std::ofstream(out_file, std::ios::binary);
ofstream outf = ofstream(out_file, std::ios::binary);
if (outf.good() && opt.dry_run_)
outf.write((char *)&init_seg_dat[0], init_seg_dat.size());

Expand Down Expand Up @@ -543,33 +644,38 @@ int main(int argc, char * argv[])
int l_index = 0;

for (auto it = opts.input_files_.begin(); it != opts.input_files_.end(); ++it)
{
ifstream input(*it, ifstream::binary);
{
// only consider the cmaf input files
if (! (it->substr(it->find_last_of(".") + 1) == "mpd")) {

if (!input.good())
{
std::cout << "failed loading input file: [cmf[tavm]]" << string(*it) << endl;
push_options_t::print_options();
return 0;
}
ifstream input(*it, ifstream::binary);

ingest_stream &l_ingest_stream = l_istreams[l_index];
l_ingest_stream.load_from_file(input);
if (!input.good())
{
std::cout << "failed loading input file: [cmf[tavm]]" << *it << endl;
push_options_t::print_options();
return 0;
}

// patch the tfdt values with an offset time
if (opts.wc_off_)
l_ingest_stream.patch_tfdt(opts.wc_time_start_, true, opts.anchor_scale_);
ingest_stream& l_ingest_stream = l_istreams[l_index];
l_ingest_stream.load_from_file(input);


double l_duration = (double) l_ingest_stream.get_duration() / (double) l_ingest_stream.init_fragment_.get_time_scale();
// patch the tfdt values with an offset time
if (opts.wc_off_)
l_ingest_stream.patch_tfdt(opts.wc_time_start_, true, opts.anchor_scale_);

if (l_duration > opts.cmaf_presentation_duration_) {
opts.cmaf_presentation_duration_ = l_duration;
std::cout << "CMAF presentation duration updated to: " << l_duration << " seconds " << std::endl;
}

l_index++;
input.close();
double l_duration = (double)l_ingest_stream.get_duration() / (double)l_ingest_stream.init_fragment_.get_time_scale();

if (l_duration > opts.cmaf_presentation_duration_)
{
opts.cmaf_presentation_duration_ = l_duration;
std::cout << "CMAF presentation duration updated to: " << l_duration << " seconds " << std::endl;
}

l_index++;
input.close();
}
}
l_index = 0;

Expand All @@ -579,8 +685,6 @@ int main(int argc, char * argv[])
string post_url_string = opts.url_ + "/Streams(" + "out_avail_track.cmfm" + ")";

event_track::gen_avail_files((uint32_t ) (opts.cmaf_presentation_duration_ * 1000), 2000, opts.avail_dur_, opts.avail_, opts.wc_time_start_);


ifstream input_file_meta(avail_track, ifstream::binary);
meta_ingest_stream.load_from_file(input_file_meta);

Expand All @@ -594,24 +698,34 @@ int main(int argc, char * argv[])

for (auto it = opts.input_files_.begin(); it != opts.input_files_.end(); ++it)
{
string post_url_string = opts.url_ + "/Streams(" + *it + ")";

if(it->substr(it->find_last_of(".") + 1) == "cmfm")
{
cout << "push thread: " << post_url_string << endl;
thread_ptr thread_n(new thread(push_thread, l_istreams[l_index], opts, post_url_string, (string) *it));
threads.push_back(thread_n);
}
else
// only consider the cmaf input files
if (it->substr(it->find_last_of(".") + 1) == "mpd")
{
cout << "push thread: " << post_url_string << endl;
thread_ptr thread_n(new thread(push_thread, l_istreams[l_index], opts, post_url_string, (string) *it));
cout << "push thread for mpd pushing " << endl;
string l_url = opts.url_ + '/' + *it;
thread_ptr thread_n(new thread(push_mpd_thread, opts, l_url, *it));
threads.push_back(thread_n);
}
else {

string post_url_string = opts.url_ + "/Streams(" + *it + ")";

if (it->substr(it->find_last_of(".") + 1) == "cmfm")
{
cout << "push thread: " << post_url_string << endl;
thread_ptr thread_n(new thread(push_thread, l_istreams[l_index], opts, post_url_string, *it));
threads.push_back(thread_n);
}
else
{
cout << "push thread: " << post_url_string << endl;
thread_ptr thread_n(new thread(push_thread, l_istreams[l_index], opts, post_url_string, *it));
threads.push_back(thread_n);
}
l_index++;
}
l_index++;
}


for (auto& th : threads)
th->join();
}
Loading