Skip to content

Commit

Permalink
Add timeout to restart pinpoint if no data came in
Browse files Browse the repository at this point in the history
  • Loading branch information
UsualSpec committed Jul 9, 2024
1 parent e934d22 commit 49be8f8
Showing 1 changed file with 51 additions and 5 deletions.
56 changes: 51 additions & 5 deletions client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <shared_mutex>
#include <stdexcept>
#include <string>
#include <sys/poll.h>
#include <sys/wait.h>
#include <thread>
#include <unistd.h>
Expand Down Expand Up @@ -246,12 +247,55 @@ void AutopowerClient::getAndSavePpData() {
"INSERT INTO measurement_data (internal_measurement_id, measurement_value, measurement_timestamp) VALUES ($1, $2, TO_TIMESTAMP($3))");
// start transaction
pqxx::work txn(pgcon);
bool gotData = false; // log to output that we got data from pinpoint
while (read(pipe_comm[0], &rdbuffer, 1) > 0) { // new character arrived
bool gotData = false; // log to output that we got data from pinpoint
int ppWaitTime = 10;

try { // calculate wait time based on sampling interval
ppWaitTime += (std::stoll(ppSamplingInterval) / 1000);
} catch (std::exception &e) {
std::cerr << "Warning: Could not parse sampling interval: " << e.what() << ". Setting wait time for pinpoint failure detection to 10 seconds." << std::endl;
}

while (true) { // get next character with timeout
if (!gotData) {
gotData = true;
}

// set up polling and reading with timeout (see https://stackoverflow.com/a/56048171 as example). This restarts pinpoint if we didn't receive data after ppWaitTime e.g. on power loss of the power meter
struct pollfd ppPollFd;
ppPollFd.fd = pipe_comm[0];
ppPollFd.events = POLLIN;

int pollStatus = poll(&ppPollFd, 1, ppWaitTime * 1000); // check if new sample available
if (pollStatus == -1) {
std::string errorMsg = "Error: Could not poll on Pinpoint pipe. Killing pinpoint and trying again.";

if (kill(pid, SIGQUIT) != 0) { // tell child to quit as we detected an error.
std::cerr << "Could not kill pinpoint" << std::endl;
errorMsg += " Also could not kill pinpoint. Please also check the client for hardware errors.";
}

std::cerr << errorMsg << std::endl;
putStatusToServer(1, errorMsg);
break;
} else if (pollStatus == 0) {
std::string errorMsg = "Error: Pinpoint didn't respond within a reasonable time with new data. Killing pinpoint and trying again.";

if (kill(pid, SIGQUIT) != 0) { // tell child to terminate as we detected an error.
errorMsg += "Also could not kill pinpoint. Please also check for hardware errors.";
}

std::cerr << errorMsg << std::endl;
putStatusToServer(1, errorMsg);
break;
}

int readChar = read(pipe_comm[0], &rdbuffer, 1); // read new character
if (readChar <= 0) {
// no new data
break;
}

if (rdbuffer == '\n') {

struct CMsmtSample msmtPoint = parseMsmt(line);
Expand Down Expand Up @@ -281,11 +325,13 @@ void AutopowerClient::getAndSavePpData() {
}

close(pipe_comm[0]);

if (!measuring()) {
if (kill(pid, SIGTERM) != 0) { // tell child to terminate. This is the good case.
std::cerr << "Could not kill pinpoint" << std::endl;
}
}

int retStatus = 0;
waitpid(pid, &retStatus, 0);
setHasExited(true); // log the exit of pinpoint. Allows to get status
Expand Down Expand Up @@ -451,16 +497,16 @@ bool AutopowerClient::streamMeasurementData(std::string measId) {
getMmStartSql += " AND was_uploaded = false";
} else {
// MUST escape via readTxn.esc since not found how to give parameters to cursor.
getMmStartSql+= " AND measurements.shared_measurement_id = '" + readTxn.esc(measId) + "'";
getMmStartSql += " AND measurements.shared_measurement_id = '" + readTxn.esc(measId) + "'";
}

// https://stackoverflow.com/questions/16128142/how-to-use-pqxxstateless-cursor-class-from-libpqxx

pqxx::stateless_cursor<pqxx::cursor_base::read_only, pqxx::cursor_base::owned> tuplesToStream(readTxn, getMmStartSql, "uploadTupleCurs", false); // the tuples returned from the database to be uploaded

int numTuplesNotWritten = 0;
for (size_t idx = 0; true; idx++) {
pqxx::result res = tuplesToStream.retrieve(idx, idx+1);
pqxx::result res = tuplesToStream.retrieve(idx, idx + 1);
if (res.empty()) {
// on cursor end --> exit
break;
Expand Down

0 comments on commit 49be8f8

Please sign in to comment.