Skip to content

Commit

Permalink
Merge pull request #1 from stream-labs/fix-crash-closing
Browse files Browse the repository at this point in the history
Fix crash closing
  • Loading branch information
eddyStreamlabs authored Dec 5, 2018
2 parents d6e01f8 + 8c10cac commit e090530
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 50 deletions.
31 changes: 18 additions & 13 deletions crash-handler-process/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ VOID DisconnectAndReconnect(DWORD);
BOOL ConnectToNewClient(HANDLE, LPOVERLAPPED);

std::vector<Process*> processes;
bool exitApp = false;
bool* exitApp = nullptr;
bool doRestartApp = false;
bool monitoring = false;
bool closeAll = false;
std::mutex mu;
std::mutex* mu = new std::mutex();

static thread_local std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>> converter;
std::string from_utf16_wide_to_utf8(const wchar_t* from, size_t length = -1)
Expand Down Expand Up @@ -187,24 +187,28 @@ void terminalCriticalProcesses(void) {
}
}

void checkProcesses(std::mutex &m) {
void checkProcesses(std::mutex* m) {

while (!exitApp) {
m.lock();
while (!(*exitApp)) {
m->lock();
bool alive = true;
size_t index = 0;

if (monitoring && processes.size() == 0) {
exitApp = true;
*exitApp = true;
break;
}

while (alive && index < processes.size()) {
monitoring = true;
std::unique_lock<std::mutex> ulock(processes.at(index)->mutex);
processes.at(index)->mutex.lock();
alive = processes.at(index)->getAlive();
processes.at(index)->mutex.unlock();
index++;
}

m->unlock();

if (!alive) {
index--;
if (!processes.at(index)->getCritical()) {
Expand Down Expand Up @@ -243,14 +247,13 @@ void checkProcesses(std::mutex &m) {
else {
closeAll = true;
}
exitApp = true;
*exitApp = true;
}
else {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
m.unlock();
}
exitApp = true;
*exitApp = true;
}

void close(bool doCloseALl) {
Expand Down Expand Up @@ -315,16 +318,18 @@ int main(int argc, char** argv)
uint64_t currentPID = GetCurrentProcessId();
write_pid_file(pid_path, currentPID);

std::thread processManager(checkProcesses, std::ref(mu));
exitApp = new bool(false);

std::thread processManager(checkProcesses, mu);

std::unique_ptr<NamedSocket> sock = NamedSocket::create();

while (!exitApp && !sock->read(&processes, std::ref(mu)))
while (!(*exitApp) && !sock->read(&processes, mu, exitApp))
{

}

exitApp = true;
*exitApp = true;
if (processManager.joinable())
processManager.join();
close(closeAll);
Expand Down
108 changes: 73 additions & 35 deletions crash-handler-process/namedsocket-win.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <iostream>
#include <mutex>
#include <algorithm>
#include <vector>

#define CONNECTING_STATE 0
#define READING_STATE 1
Expand All @@ -25,6 +26,8 @@ typedef struct
PIPEINST Pipe[INSTANCES];
HANDLE hEvents[INSTANCES];

std::vector<std::thread*> requests;

BOOL ConnectToNewClient(HANDLE hPipe, LPOVERLAPPED lpo)
{
BOOL fConnected, fPendingIO = FALSE;
Expand Down Expand Up @@ -135,10 +138,74 @@ NamedSocket_win::~NamedSocket_win() {
DisconnectNamedPipe(m_handle);
}

bool NamedSocket_win::read(std::vector<Process*>* processes, std::mutex &mu) {
void acknowledgeUnregister(void) {
std::string buffer = "exit";
HANDLE hPipe = CreateFile(
TEXT("\\\\.\\pipe\\exit-slobs-crash-handler"),
GENERIC_READ |
GENERIC_WRITE,
0,
NULL,
OPEN_EXISTING,
0,
NULL);

if (hPipe == INVALID_HANDLE_VALUE)
return;

if (GetLastError() == ERROR_PIPE_BUSY)
return;

DWORD bytesWritten;

WriteFile(
hPipe,
buffer.data(),
buffer.size(), &bytesWritten,
NULL);

CloseHandle(hPipe);
}

void processRequest(std::vector<char> p_buffer, std::vector<Process*>* processes, std::mutex* mu, bool* exitApp) {
Message msg(p_buffer);
mu->lock();
switch (msg.readBool()) {
case REGISTER: {
bool isCritical = msg.readBool();
uint32_t pid = msg.readUInt32();
processes->push_back(new Process(pid, isCritical));
break;
}
case UNREGISTER: {
uint32_t pid = msg.readUInt32();
auto it = std::find_if(processes->begin(), processes->end(), [&pid](Process* p) {
return p->getPID() == pid;
});

if (it != processes->end()) {
Process* p = (Process*)(*it);
p->stopWorker();

processes->erase(it);

if (p->getCritical())
*exitApp = true;
}
acknowledgeUnregister();
break;
}
case EXIT:
*exitApp = true;
default:
break;
}
mu->unlock();
}

bool NamedSocket_win::read(std::vector<Process*>* processes, std::mutex* mu, bool* exit) {
DWORD i, dwWait, cbRet, dwErr;
BOOL fSuccess;
bool exitApp = false;

dwWait = WaitForMultipleObjects(
INSTANCES,
Expand Down Expand Up @@ -208,38 +275,9 @@ bool NamedSocket_win::read(std::vector<Process*>* processes, std::mutex &mu) {
// The read operation completed successfully.
if (Pipe[i].cbRead > 0) {
Pipe[i].fPendingIO = FALSE;
Message msg(Pipe[i].chRequest);

switch (msg.readBool()) {
case REGISTER: {
bool isCritical = msg.readBool();
uint32_t pid = msg.readUInt32();
processes->push_back(new Process(pid, isCritical));
break;
}
case UNREGISTER: {
uint32_t pid = msg.readUInt32();
auto it = std::find_if(processes->begin(), processes->end(), [&pid](Process* p) {
return p->getPID() == pid;
});

mu.lock();
if (it != processes->end()) {
Process* p = (Process*)(*it);
p->stopWorker();
if (p->getWorker()->joinable())
p->getWorker()->join();

processes->erase(it);
}
mu.unlock();
break;
}
case EXIT:
exitApp = true;
default:
break;
}

// Start thread here
requests.push_back(new std::thread(processRequest, Pipe[i].chRequest, processes, mu, exit));
}
dwErr = GetLastError();
if (!fSuccess && (dwErr == ERROR_IO_PENDING))
Expand All @@ -251,7 +289,7 @@ bool NamedSocket_win::read(std::vector<Process*>* processes, std::mutex &mu) {
break;
}
}
return exitApp;
return false;
}

bool NamedSocket_win::flush() {
Expand Down
2 changes: 1 addition & 1 deletion crash-handler-process/namedsocket-win.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class NamedSocket_win : public NamedSocket {

public:
virtual bool connect() override;
virtual bool read(std::vector<Process*>*, std::mutex &mu) override;
virtual bool read(std::vector<Process*>*, std::mutex* mu, bool* exit) override;
virtual void disconnect() override;
virtual bool flush() override;

Expand Down
2 changes: 1 addition & 1 deletion crash-handler-process/namedsocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class NamedSocket {
static std::unique_ptr<NamedSocket> create();
virtual bool connect() = 0;
virtual void disconnect() = 0;
virtual bool read(std::vector<Process*>*, std::mutex &mu) = 0;
virtual bool read(std::vector<Process*>*, std::mutex* mu, bool* exit) = 0;
virtual bool flush() = 0;
virtual HANDLE getHandle() = 0;
};
2 changes: 2 additions & 0 deletions crash-handler-process/process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ void check(void* p) {
proc->setAlive(true);

if (!WaitForSingleObject(proc->getHandle(), INFINITE)) {
proc->mutex.lock();
proc->setAlive(false);
proc->stopWorker();
proc->mutex.unlock();
}
}

Expand Down

0 comments on commit e090530

Please sign in to comment.