diff --git a/cpp/Makefile b/cpp/Makefile index 221cd94..f8c5342 100644 --- a/cpp/Makefile +++ b/cpp/Makefile @@ -7,14 +7,22 @@ CXXCOMPILE = $(CXX) $(INCLUDES) $(CXXFLAGS) -c -o $@ CXXLINK = $(CXX) $(INCLUDES) $(CXXFLAGS) -o $@ default: all -all: rendler crawl_executor render_executor +all: rendler crawl_executor render_executor v1 HEADERS = rendler_helper.hpp - +HEADERS_V1 = $(HEADERS) rendler_v1_executor.hpp crawl_executor: crawl_executor.cpp $(HEADERS) $(CXXLINK) $< $(LDFLAGS) -lboost_regex -lcurl +rendler_v1_executor.o: rendler_v1_executor.cpp $(HEADERS_V1) + $(CXXLINK) -c $< + +%_v1_executor: %_v1_executor.cpp rendler_v1_executor.o $(HEADERS_V1) + $(CXXLINK) $< rendler_v1_executor.o $(LDFLAGS) -lboost_regex -lcurl + +v1: rendler_v1 crawl_v1_executor render_v1_executor + %: %.cpp $(HEADERS) $(CXXLINK) $< $(LDFLAGS) diff --git a/cpp/crawl_v1_executor.cpp b/cpp/crawl_v1_executor.cpp new file mode 100644 index 0000000..f99b825 --- /dev/null +++ b/cpp/crawl_v1_executor.cpp @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +#include + +#include + +#include "rendler_helper.hpp" +#include "rendler_v1_executor.hpp" + +using std::cout; +using std::endl; +using std::vector; + +using mesos::vectorToString; + + +static int writer(char *data, size_t size, size_t nmemb, string *writerData) +{ + assert(writerData != NULL); + writerData->append(data, size*nmemb); + return size * nmemb; +} + + +class CrawlV1Executor : public RendlerV1Executor +{ +public: + CrawlV1Executor(const FrameworkID& _frameworkId, + const ExecutorID& _executorId) + : RendlerV1Executor("CrawlV1Executor", _frameworkId, _executorId) {} + + virtual ~CrawlV1Executor() {} + +protected: + void runTask(const TaskInfo& task) override + { + string url = task.data(); + cout << "Running crawl task " << task.task_id().value() + << " Fetch: " << url; + + string buffer; + vector result; + result.push_back(task.task_id().value()); + result.push_back(url); + + CURL *conn; + conn = curl_easy_init(); + assert(conn != NULL); + assert(curl_easy_setopt(conn, CURLOPT_URL, url.c_str()) == CURLE_OK); + assert(curl_easy_setopt(conn, CURLOPT_FOLLOWLOCATION, 1L) == CURLE_OK); + assert(curl_easy_setopt(conn, CURLOPT_WRITEFUNCTION, writer) == CURLE_OK); + assert(curl_easy_setopt(conn, CURLOPT_WRITEDATA, &buffer) == CURLE_OK); + + if (curl_easy_perform(conn) != CURLE_OK) { + return; + } + + char *tmp; + assert(curl_easy_getinfo(conn, CURLINFO_EFFECTIVE_URL, &tmp) == CURLE_OK); + string redirectUrl = url; + if (tmp != NULL) { + redirectUrl = tmp; + } + curl_easy_cleanup(conn); + + size_t scheme = redirectUrl.find_first_of("://"); + size_t sp = redirectUrl.find_first_of('/', + scheme == string::npos ? 0 : scheme + 3); // skip the http:// part. + size_t lsp = redirectUrl.find_last_of('/'); // skip the http:// part. + string baseUrl = redirectUrl.substr(0, sp); // No trailing slash. + string dirUrl = redirectUrl.substr(0, lsp); // No trailing slash. + + cout << "redirectUrl " << redirectUrl << " baseUrl: " << baseUrl << endl; + cout << "dirUrl " << dirUrl << endl; + + const boost::regex hrefRE("]*?href\\s*=\\s*([\"'])(.*?)\\1"); + const boost::regex urlRE("^([a-zA-Z]+://).*"); + + boost::smatch matchHref; + string::const_iterator f = buffer.begin(); + string::const_iterator l = buffer.end(); + + while (f != buffer.end() && + boost::regex_search(f, l, matchHref, hrefRE)) { + string link = matchHref[2]; + f = matchHref[0].second; + + boost::smatch matchService; + string::const_iterator lb = link.begin(); + string::const_iterator le = link.end(); + + // Remove the anchor + if (link.find_first_of('#') != string::npos) { + link.erase(link.find_first_of('#')); + } + + if (link.empty()) { + continue; + } + + if (link[0] == '/') { + link = baseUrl + link; + } else if (!boost::regex_search(lb, le, matchService, urlRE)) { + // Relative URL + link = dirUrl + "/" + link; + } + result.push_back(link); + }; + + sendFrameworkMessage(vectorToString(result)); + sendStatusUpdate(task, TASK_FINISHED); + } +}; + + +int main(int argc, char** argv) +{ + FrameworkID frameworkId; + ExecutorID executorId; + + Option value; + + value = os::getenv("MESOS_FRAMEWORK_ID"); + if (value.isNone()) { + EXIT(EXIT_FAILURE) + << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment"; + } + frameworkId.set_value(value.get()); + + value = os::getenv("MESOS_EXECUTOR_ID"); + if (value.isNone()) { + EXIT(EXIT_FAILURE) + << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment"; + } + executorId.set_value(value.get()); + + process::Owned crawler( + new CrawlV1Executor(frameworkId, executorId)); + + process::spawn(crawler.get()); + process::wait(crawler.get()); + + return 0; +} diff --git a/cpp/render_v1_executor.cpp b/cpp/render_v1_executor.cpp new file mode 100644 index 0000000..e4809bd --- /dev/null +++ b/cpp/render_v1_executor.cpp @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include + +#include + +#include + +#include "rendler_helper.hpp" +#include "rendler_v1_executor.hpp" + +using std::cout; +using std::endl; +using std::vector; + +using mesos::vectorToString; + +static string renderJSPath; +static string workDirPath; + + +class RenderV1Executor : public RendlerV1Executor +{ +public: + RenderV1Executor(const FrameworkID& _frameworkId, + const ExecutorID& _executorId) + : RendlerV1Executor("RenderV1Executor", _frameworkId, _executorId) {} + + virtual ~RenderV1Executor() {} + +protected: + void runTask(const TaskInfo& task) override + { + string url = task.data(); + cout << "Running render task (" << task.task_id().value() << "): " << url; + string filename = workDirPath + task.task_id().value() + ".png"; + + vector result; + result.push_back(task.task_id().value()); + result.push_back(url); + result.push_back(filename); + + string cmd = "QT_QPA_PLATFORM=offscreen phantomjs " + renderJSPath + " " + url + " " + filename; + assert(system(cmd.c_str()) != -1); + + sendFrameworkMessage(vectorToString(result)); + sendStatusUpdate(task, TASK_FINISHED); + } +}; + + +int main(int argc, char** argv) +{ + FrameworkID frameworkId; + ExecutorID executorId; + + Option value; + + value = os::getenv("MESOS_FRAMEWORK_ID"); + if (value.isNone()) { + EXIT(EXIT_FAILURE) + << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment"; + } + frameworkId.set_value(value.get()); + + value = os::getenv("MESOS_EXECUTOR_ID"); + if (value.isNone()) { + EXIT(EXIT_FAILURE) + << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment"; + } + executorId.set_value(value.get()); + + std::string path = os::realpath(::dirname(argv[0])).get(); + renderJSPath = path + "/render.js"; + workDirPath = path + "/rendler-work-dir/"; + + process::Owned renderer( + new RenderV1Executor(frameworkId, executorId)); + + process::spawn(renderer.get()); + process::wait(renderer.get()); + + return 0; +} diff --git a/cpp/rendler.cpp b/cpp/rendler.cpp index 9aa4242..e925b44 100644 --- a/cpp/rendler.cpp +++ b/cpp/rendler.cpp @@ -96,6 +96,7 @@ class Rendler : public Scheduler for (size_t i = 0; i < offers.size(); i++) { const Offer& offer = offers[i]; Resources remaining = offer.resources(); + remaining.unallocate(); static Resources TASK_RESOURCES = Resources::parse( "cpus:" + stringify(CPUS_PER_TASK) + diff --git a/cpp/rendler_v1.cpp b/cpp/rendler_v1.cpp new file mode 100644 index 0000000..c5a095e --- /dev/null +++ b/cpp/rendler_v1.cpp @@ -0,0 +1,513 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +#include "rendler_helper.hpp" + +using namespace mesos::v1; + +using std::cout; +using std::endl; +using std::string; +using std::vector; +using std::queue; +using std::map; + +using mesos::stringToVector; + +using mesos::v1::scheduler::Call; +using mesos::v1::scheduler::Event; + +const float CPUS_PER_TASK = 0.2; +const int32_t MEM_PER_TASK = 32; + +static queue crawlQueue; +static queue renderQueue; +static map > crawlResults; +static map renderResults; +static map processed; +static size_t nextUrlId = 0; +static process::Owned rendler; + +static void shutdown(); +static void SIGINTHandler(int signum); + +class RendlerV1 : public process::Process +{ +public: + RendlerV1(const FrameworkInfo& _framework, + const ExecutorInfo& _crawler, + const ExecutorInfo& _renderer, + const string& _master, + const string& _seedUrl) + : framework(_framework), + crawler(_crawler), + renderer(_renderer), + master(_master), + state(INITIALIZING), + seedUrl(_seedUrl), + tasksLaunched(0), + tasksFinished(0), + frameworkMessagesReceived(0) + { + crawlQueue.push(seedUrl); + renderQueue.push(seedUrl); + processed[seedUrl] = nextUrlId++; + size_t scheme = seedUrl.find_first_of("://"); + size_t lsp = seedUrl.find_last_of( + '/', scheme == string::npos ? 0 : scheme + 3); // skip the http:// part + baseUrl = seedUrl.substr(0, lsp); // No trailing slash + } + + virtual ~RendlerV1() {} + + void connected() + { + state = CONNECTED; + doReliableRegistration(); + } + + void disconnected() + { + state = DISCONNECTED; + } + + void received(queue events) + { + while (!events.empty()) { + Event event = events.front(); + events.pop(); + + switch (event.type()) { + case Event::SUBSCRIBED: { + framework.mutable_id()->CopyFrom(event.subscribed().framework_id()); + state = SUBSCRIBED; + + cout << "Subscribed with ID " << framework.id() << endl; + break; + } + + case Event::OFFERS: { + resourceOffers(google::protobuf::convert(event.offers().offers())); + break; + } + + case Event::INVERSE_OFFERS: { + break; + } + + case Event::RESCIND: { + break; + } + + case Event::RESCIND_INVERSE_OFFER: { + break; + } + + case Event::UPDATE: { + statusUpdate(event.update().status()); + break; + } + + case Event::MESSAGE: { + frameworkMessage(event.message().executor_id(), + event.message().agent_id(), + event.message().data()); + break; + } + + case Event::FAILURE: { + break; + } + + case Event::ERROR: { + cout << event.error().message() << endl; + process::terminate(self()); + } + + case Event::HEARTBEAT: { + break; + } + + case Event::UNKNOWN: { + LOG(WARNING) << "Received an UNKNOWN event and ignored"; + break; + } + } + } + } + +protected: + virtual void initialize() + { + // We initialize the library here to ensure that callbacks are only invoked + // after the process has spawned. + mesos.reset(new scheduler::Mesos( + master, + mesos::ContentType::PROTOBUF, + process::defer(self(), &Self::connected), + process::defer(self(), &Self::disconnected), + process::defer(self(), &Self::received, lambda::_1), + None())); + } + + virtual void finalize() + { + shutdown(); + + Call call; + CHECK(framework.has_id()); + call.mutable_framework_id()->CopyFrom(framework.id()); + call.set_type(Call::TEARDOWN); + + mesos->send(call); + } + +private: + void doReliableRegistration() + { + if (state == SUBSCRIBED) { + return; + } + + Call call; + if (framework.has_id()) { + call.mutable_framework_id()->CopyFrom(framework.id()); + } + call.set_type(Call::SUBSCRIBE); + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(framework); + + mesos->send(call); + + // Re-registrate after 1 second if not subscribed then. + process::delay(Seconds(1), + self(), + &Self::doReliableRegistration); + } + + void resourceOffers(const vector& offers) + { + foreach (const Offer& offer, offers) { + static Resources TASK_RESOURCES = Resources::parse( + "cpus:" + stringify(CPUS_PER_TASK) + + ";mem:" + stringify(MEM_PER_TASK)).get(); + + Resources remaining = offer.resources(); + // We can either unallocate the resources of the offer for V0 + // compatibility, or allocate the task resources with a proper role. + remaining.unallocate(); + + size_t maxTasks = 0; + while (remaining.flatten().contains(TASK_RESOURCES)) { + maxTasks++; + remaining -= TASK_RESOURCES; + } + + // Launch floor(maxTasks/2) crawlers and ceil(maxTasks/2) renderers. + vector tasks; + + for (size_t i = 0; i < maxTasks / 2 && !crawlQueue.empty(); i++) { + string url = crawlQueue.front(); + crawlQueue.pop(); + string urlId = "C" + stringify(processed[url]); + + TaskInfo task; + task.set_name("Crawler " + urlId); + task.mutable_task_id()->set_value(urlId); + task.mutable_agent_id()->MergeFrom(offer.agent_id()); + task.mutable_executor()->MergeFrom(crawler); + task.mutable_resources()->MergeFrom(TASK_RESOURCES); + task.set_data(url); + tasks.push_back(task); + + tasksLaunched++; + cout << "Crawler " << urlId << " " << url << endl; + } + + for (size_t i = maxTasks / 2; i < maxTasks && !renderQueue.empty(); i++) { + string url = renderQueue.front(); + renderQueue.pop(); + string urlId = "R" + stringify(processed[url]); + + TaskInfo task; + task.set_name("Renderer " + urlId); + task.mutable_task_id()->set_value(urlId); + task.mutable_agent_id()->MergeFrom(offer.agent_id()); + task.mutable_executor()->MergeFrom(renderer); + task.mutable_resources()->MergeFrom(TASK_RESOURCES); + task.set_data(url); + tasks.push_back(task); + + tasksLaunched++; + cout << "Renderer " << urlId << " " << url << endl; + } + + Call call; + CHECK(framework.has_id()); + call.mutable_framework_id()->CopyFrom(framework.id()); + call.set_type(Call::ACCEPT); + + Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(offer.id()); + + Offer::Operation* operation = accept->add_operations(); + operation->set_type(Offer::Operation::LAUNCH); + foreach (const TaskInfo& task, tasks) { + operation->mutable_launch()->add_task_infos()->CopyFrom(task); + } + + mesos->send(call); + } + } + + void statusUpdate(const TaskStatus& status) + { + if (status.has_uuid()) { + Call call; + CHECK(framework.has_id()); + call.mutable_framework_id()->CopyFrom(framework.id()); + call.set_type(Call::ACKNOWLEDGE); + + Call::Acknowledge* ack = call.mutable_acknowledge(); + ack->mutable_agent_id()->CopyFrom(status.agent_id()); + ack->mutable_task_id()->CopyFrom(status.task_id()); + ack->set_uuid(status.uuid()); + + mesos->send(call); + } + + if (status.state() == TASK_FINISHED) { + cout << "Task " << status.task_id().value() << " finished" << endl; + tasksFinished++; + } + + if (tasksFinished == tasksLaunched && + crawlQueue.empty() && + renderQueue.empty()) { + // We don't wait to receive pending framework messages, if any. Framework + // messages are not reliable, so we can't easily recover dropped messages + // anyway. + int missing_messages = tasksFinished - frameworkMessagesReceived; + if (missing_messages > 0) { + cout << "Noticed that " << missing_messages + << " framework messages were not received" << endl; + } + + process::terminate(self()); + } + } + + void frameworkMessage(const ExecutorID& executorId, + const AgentID& agentId, + const string& data) + { + vector strVector = stringToVector(data); + string taskId = strVector[0]; + string url = strVector[1]; + + if (executorId.value() == crawler.executor_id().value()) { + cout << "Crawler msg received: " << taskId << endl; + + for (size_t i = 2; i < strVector.size(); i++) { + string& newUrl = strVector[i]; + crawlResults[url].push_back(newUrl); + if (processed.count(newUrl) == 0) { + processed[newUrl] = nextUrlId++; + size_t scheme = newUrl.find_first_of("://"); + size_t lsp = newUrl.find_last_of( + '/', scheme == string::npos ? 0 : scheme + 3); // skip the http:// part + if (newUrl.substr(0, lsp) == baseUrl) { + crawlQueue.push(newUrl); + } + renderQueue.push(newUrl); + } + } + } else { + if (access(strVector[2].c_str(), R_OK) == 0) { + renderResults[url] = strVector[2]; + } + } + frameworkMessagesReceived++; + } + + FrameworkInfo framework; + const ExecutorInfo crawler; + const ExecutorInfo renderer; + const string master; + process::Owned mesos; + + enum State + { + INITIALIZING = 0, + CONNECTED = 1, + SUBSCRIBED = 2, + DISCONNECTED = 3 + } state; + + string seedUrl; + string baseUrl; + size_t tasksLaunched; + size_t tasksFinished; + size_t frameworkMessagesReceived; +}; + + +static void shutdown() +{ + printf("RendlerV1 is shutting down\n"); + printf("Writing results to result.dot\n"); + + FILE *f = fopen("result.dot", "w"); + fprintf(f, "digraph G {\n"); + fprintf(f, " node [shape=box];\n"); + + // Add vertices. + map::iterator rit; + for (rit = renderResults.begin(); rit != renderResults.end(); rit++) { + // Prepend character as dot vertices cannot starting with a digit. + string url_hash = "R" + stringify(processed[rit->first]); + string& filename = rit->second; + fprintf(f, + " %s[label=\"\" image=\"%s\"];\n", + url_hash.c_str(), + filename.c_str()); + } + + // Add edges. + map >::iterator cit; + for (cit = crawlResults.begin(); cit != crawlResults.end(); cit++) { + if (renderResults.find(cit->first) == renderResults.end()) { + continue; + } + string from_hash = "R" + stringify(processed[cit->first]); + vector& adjList = cit->second; + + for (size_t i = 0; i < adjList.size(); i++) { + string to_hash = "R" + stringify(processed[adjList[i]]); + if (renderResults.find(adjList[i]) != renderResults.end()) { + // DOT format is: + // A -> B; + fprintf(f, " %s -> %s;\n", from_hash.c_str(), to_hash.c_str()); + } + } + } + + fprintf(f, "}\n"); + fclose(f); +} + + +static void SIGINTHandler(int signum) +{ + if (rendler.get()) { + process::terminate(rendler->self()); + } + rendler.reset(); + exit(0); +} + + +#define shift argc--,argv++ +int main(int argc, char** argv) +{ + string seedUrl, master; + shift; + while (true) { + string s = argc>0 ? argv[0] : "--help"; + if (argc > 1 && s == "--seedUrl") { + seedUrl = argv[1]; + shift; shift; + } else if (argc > 1 && s == "--master") { + master = argv[1]; + shift; shift; + } else { + break; + } + } + + if (master.length() == 0 || seedUrl.length() == 0) { + printf("Usage: rendler_v1 --seedUrl --master :\n"); + exit(1); + } + + // Find this executable's directory to locate executor. + string path = realpath(dirname(argv[0]), NULL); + string crawlerUri = path + "/crawl_v1_executor"; + string rendererUri = path + "/render_v1_executor"; + cout << crawlerUri << endl; + cout << rendererUri << endl; + + // Set up the signal handler for SIGINT for clean shutdown. + struct sigaction action; + action.sa_handler = SIGINTHandler; + sigemptyset(&action.sa_mask); + action.sa_flags = 0; + sigaction(SIGINT, &action, NULL); + + process::initialize(); + + const Result user = os::user(); + CHECK_SOME(user); + + FrameworkInfo framework; + framework.set_user(user.get()); + framework.set_name("RendlerV1 Framework (C++)"); + //framework.set_role(role); + framework.set_principal("rendler-cpp"); + + ExecutorInfo crawler; + crawler.mutable_executor_id()->set_value("Crawler"); + crawler.mutable_command()->set_value(crawlerUri); + crawler.set_name("Crawl Executor (C++)"); + crawler.set_source("cpp"); + + ExecutorInfo renderer; + renderer.mutable_executor_id()->set_value("Renderer"); + renderer.mutable_command()->set_value(rendererUri); + renderer.set_name("Render Executor (C++)"); + renderer.set_source("cpp"); + + rendler.reset(new RendlerV1(framework, crawler, renderer, master, seedUrl)); + + process::spawn(rendler.get()); + process::wait(rendler.get()); + + return 0; +} diff --git a/cpp/rendler_v1_executor.cpp b/cpp/rendler_v1_executor.cpp new file mode 100644 index 0000000..bc549e8 --- /dev/null +++ b/cpp/rendler_v1_executor.cpp @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include + +#include +#include + +#include +#include + +#include "rendler_v1_executor.hpp" + +using std::cout; +using std::endl; +using std::mem_fn; + + +static void* start(void* arg) +{ + lambda::function* thunk = (lambda::function*) arg; + (*thunk)(); + delete thunk; + return NULL; +} + + +RendlerV1Executor::RendlerV1Executor(const string& _name, + const FrameworkID& _frameworkId, + const ExecutorID& _executorId) + : frameworkId(_frameworkId), + executorId(_executorId), + state(INITIALIZING) {} + + +RendlerV1Executor::~RendlerV1Executor() {} + + +void RendlerV1Executor::connected() +{ + state = CONNECTED; + doReliableRegistration(); +} + + +void RendlerV1Executor::disconnected() +{ + state = DISCONNECTED; +} + + +void RendlerV1Executor::received(queue events) +{ + while (!events.empty()) { + Event event = events.front(); + events.pop(); + + switch (event.type()) { + case Event::SUBSCRIBED: { + cout << name << " subscribed on " + << event.subscribed().agent_info().hostname() << endl; + state = SUBSCRIBED; + break; + } + + case Event::LAUNCH: { + launchTask(event.launch().task()); + break; + } + + case Event::LAUNCH_GROUP: { + const TaskGroupInfo& taskGroup = event.launch_group().task_group(); + foreach (const TaskInfo& task, taskGroup.tasks()) { + launchTask(task); + } + break; + } + + case Event::KILL: { + break; + } + + case Event::ACKNOWLEDGED: { + // Remove the corresponding update. + updates.erase(UUID::fromBytes(event.acknowledged().uuid()).get()); + // Remove the corresponding task. + tasks.erase(event.acknowledged().task_id()); + break; + } + + case Event::MESSAGE: { + break; + } + + case Event::SHUTDOWN: { + break; + } + + case Event::ERROR: { + break; + } + + case Event::UNKNOWN: { + LOG(WARNING) << "Received an UNKNOWN event and ignored"; + break; + } + } + } +} + + +void RendlerV1Executor::sendStatusUpdate(const TaskInfo& task, + const TaskState& state) +{ + UUID uuid = UUID::random(); + + TaskStatus status; + status.mutable_task_id()->CopyFrom(task.task_id()); + status.mutable_executor_id()->CopyFrom(executorId); + status.set_state(state); + status.set_source(TaskStatus::SOURCE_EXECUTOR); + status.set_uuid(uuid.toBytes()); + + Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.mutable_executor_id()->CopyFrom(executorId); + call.set_type(Call::UPDATE); + + Call::Update* update = call.mutable_update(); + update->mutable_status()->CopyFrom(status); + + // Capture the status update so we can repeat if there is no ack. + updates[uuid] = call.update(); + + mesos->send(call); +} + + +void RendlerV1Executor::sendFrameworkMessage(const string& data) +{ + Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.mutable_executor_id()->CopyFrom(executorId); + call.set_type(Call::MESSAGE); + + call.mutable_message()->set_data(data); + + mesos->send(call); +} + + +void RendlerV1Executor::initialize() +{ + // We initialize the library here to ensure that callbacks are only invoked + // after the process has spawned. + mesos.reset(new Mesos( + mesos::ContentType::PROTOBUF, + process::defer(self(), &Self::connected), + process::defer(self(), &Self::disconnected), + process::defer(self(), &Self::received, lambda::_1))); +} + + +void RendlerV1Executor::doReliableRegistration() +{ + if (state == SUBSCRIBED || state == DISCONNECTED) { + return; + } + + Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.mutable_executor_id()->CopyFrom(executorId); + + call.set_type(Call::SUBSCRIBE); + + Call::Subscribe* subscribe = call.mutable_subscribe(); + + // Send all unacknowledged updates. + foreachvalue (const Call::Update& update, updates) { + subscribe->add_unacknowledged_updates()->MergeFrom(update); + } + + // Send all unacknowledged tasks. + foreachvalue (const TaskInfo& task, tasks) { + subscribe->add_unacknowledged_tasks()->MergeFrom(task); + } + + mesos->send(call); + + // Re-register after one second if not subscribed by then. + process::delay(Seconds(1), self(), &Self::doReliableRegistration); +} + + +void RendlerV1Executor::launchTask(const TaskInfo& task) +{ + cout << "Starting task " << task.task_id().value() << endl; + + tasks[task.task_id()] = task; + + lambda::function* thunk = + new lambda::function( + lambda::bind(mem_fn(&RendlerV1Executor::runTask), this, task)); + + pthread_t pthread; + if (pthread_create(&pthread, NULL, &start, thunk) != 0) { + sendStatusUpdate(task, TASK_FAILED); + } else { + pthread_detach(pthread); + sendStatusUpdate(task, TASK_RUNNING); + } +} diff --git a/cpp/rendler_v1_executor.hpp b/cpp/rendler_v1_executor.hpp new file mode 100644 index 0000000..052ae2c --- /dev/null +++ b/cpp/rendler_v1_executor.hpp @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __RENDLER_V1_EXECUTOR_HPP__ +#define __RENDLER_V1_EXECUTOR_HPP__ + +#include +#include + +#include + +#include +#include + +#include +#include + +using namespace mesos::v1; + +using std::queue; +using std::string; + +using mesos::v1::executor::Call; +using mesos::v1::executor::Event; +using mesos::v1::executor::Mesos; + + +class RendlerV1Executor : public process::Process +{ +public: + RendlerV1Executor(const string& _name, + const FrameworkID& _frameworkId, + const ExecutorID& _executorId); + virtual ~RendlerV1Executor(); + + void connected(); + void disconnected(); + void received(queue events); + void sendStatusUpdate(const TaskInfo& task, const TaskState& state); + void sendFrameworkMessage(const string& data); + +protected: + virtual void initialize(); + virtual void runTask(const TaskInfo& task) = 0; + +private: + void doReliableRegistration(); + void launchTask(const TaskInfo& task); + + const string name; + const FrameworkID frameworkId; + const ExecutorID executorId; + process::Owned mesos; + + enum State + { + INITIALIZING = 0, + CONNECTED = 1, + SUBSCRIBED = 2, + DISCONNECTED = 3 + } state; + + hashmap updates; // Unacknowledged updates. + hashmap tasks; // Unacknowledged tasks. +}; + +#endif // __RENDLER_V1_EXECUTOR_HPP__