Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update server to handle connections in subprocesses #67

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 18 additions & 32 deletions server.cu
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@
#include <stdio.h>
#include <string>
#include <sys/socket.h>
#include <thread>
#include <unistd.h>
#include <unordered_map>
#include <pthread.h>
#include <sys/uio.h>

#include "codegen/gen_server.h"
Expand All @@ -25,7 +23,6 @@ typedef struct
int connfd;
int read_request_id;
int write_request_id;
pthread_mutex_t read_mutex, write_mutex;
struct iovec write_iov[128];
int write_iov_count = 0;
} conn_t;
Expand All @@ -52,24 +49,13 @@ int request_handler(const conn_t *conn)
void client_handler(int connfd)
{
conn_t conn = {connfd};
if (pthread_mutex_init(&conn.read_mutex, NULL) < 0 ||
pthread_mutex_init(&conn.write_mutex, NULL) < 0)
{
std::cerr << "Error initializing mutex." << std::endl;
return;
}

#ifdef VERBOSE
printf("Client connected.\n");
#endif

while (1)
{
if (pthread_mutex_lock(&conn.read_mutex) < 0)
{
std::cerr << "Error locking mutex." << std::endl;
break;
}
int n = read(connfd, &conn.read_request_id, sizeof(int));
if (n == 0)
{
Expand All @@ -82,18 +68,10 @@ void client_handler(int connfd)
break;
}

// TODO: this can't be multithreaded as some of the __cuda* functions
// assume that they are running in the same thread as the one that
// calls cudaLaunchKernel. we'll need to find a better way to map
// function calls to threads. maybe each rpc maps to an optional
// thread id that is passed to the handler?
if (request_handler(&conn) < 0)
std::cerr << "Error handling request." << std::endl;
}

if (pthread_mutex_destroy(&conn.read_mutex) < 0 ||
pthread_mutex_destroy(&conn.write_mutex) < 0)
std::cerr << "Error destroying mutex." << std::endl;
close(connfd);
}

Expand All @@ -112,15 +90,11 @@ int rpc_write(const void *conn, const void *data, const size_t size)
int rpc_end_request(const void *conn)
{
int request_id = ((conn_t *)conn)->read_request_id;
if (pthread_mutex_unlock(&((conn_t *)conn)->read_mutex) < 0)
return -1;
return request_id;
}

int rpc_start_response(const void *conn, const int request_id)
{
if (pthread_mutex_lock(&((conn_t *)conn)->write_mutex) < 0)
return -1;
((conn_t *)conn)->write_request_id = request_id;
((conn_t *)conn)->write_iov_count = 1;
return 0;
Expand All @@ -130,8 +104,7 @@ int rpc_end_response(const void *conn, void *result)
{
((conn_t *)conn)->write_iov[0] = (struct iovec){&((conn_t *)conn)->write_request_id, sizeof(int)};
((conn_t *)conn)->write_iov[((conn_t *)conn)->write_iov_count++] = (struct iovec){result, sizeof(int)};
if (writev(((conn_t *)conn)->connfd, ((conn_t *)conn)->write_iov, ((conn_t *)conn)->write_iov_count) < 0 ||
pthread_mutex_unlock(&((conn_t *)conn)->write_mutex) < 0)
if (writev(((conn_t *)conn)->connfd, ((conn_t *)conn)->write_iov, ((conn_t *)conn)->write_iov_count) < 0)
return -1;
return 0;
}
Expand Down Expand Up @@ -197,10 +170,23 @@ int main()
continue;
}

std::thread client_thread(client_handler, connfd);

// detach the thread so it runs independently
client_thread.detach();
pid_t pid = fork();
if (pid < 0)
{
std::cerr << "Fork failed." << std::endl;
close(connfd);
continue;
}
else if (pid == 0)
{
close(sockfd);
client_handler(connfd);
exit(0);
}
else
{
close(connfd);
}
}

close(sockfd);
Expand Down
Loading