From 71e63c55d92bf92c9c4d0fc7251ae9f956a1d75e Mon Sep 17 00:00:00 2001 From: Sachin C Date: Fri, 20 Sep 2024 16:35:17 -0400 Subject: [PATCH] Add UDP Heartbeat Coordination Algorithm implementation with Docker and Docker Compose support, including detailed README with usage instructions and algorithm overview. --- .DS_Store | Bin 0 -> 6148 bytes Dockerfile | 16 ++++ README.md | 144 ++++++++++++++++++++++++++++++++- docker-compose.yml | 44 ++++++++++ hostsfile.txt | 5 ++ main.c | 195 +++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 402 insertions(+), 2 deletions(-) create mode 100644 .DS_Store create mode 100644 Dockerfile create mode 100644 docker-compose.yml create mode 100644 hostsfile.txt create mode 100644 main.c diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..5008ddfcf53c02e82d7eee2e57c38e5672ef89f6 GIT binary patch literal 6148 zcmeH~Jr2S!425mzP>H1@V-^m;4Wg<&0T*E43hX&L&p$$qDprKhvt+--jT7}7np#A3 zem<@ulZcFPQ@L2!n>{z**++&mCkOWA81W14cNZlEfg7;MkzE(HCqgga^y>{tEnwC%0;vJ&^%eQ zLs35+`xjp>T0". + - The program uses `gethostbyname()` to resolve the hostnames from the `hostsfile.txt` into IP addresses, allowing communication between containers. + +3. **Receiving Heartbeats**: + - The process monitors its UDP socket using the `select()` system call, which checks for incoming messages. The `select()` function ensures that the process does not block indefinitely and allows periodic message sending. + - Upon receiving a heartbeat, the process logs the sender and marks it as "ready." The sender's identity is stored to ensure that heartbeats are only counted once from each process. + +4. **Coordination and Readiness**: + - After receiving a heartbeat from all other processes, the process prints "READY" to indicate that all participants are active and ready. + - If a process does not receive all heartbeats within a predefined timeout (`TIMEOUT`), the process exits with a failure message. + +5. **Concurrency with `select()`**: + - The use of the `select()` function allows the program to simultaneously send and receive messages without needing multithreading. It enables non-blocking I/O by checking the socket for incoming messages with a timeout. + - This ensures the program efficiently manages both heartbeat transmissions and receptions, alternating between the two based on system activity. + +### Key Concepts + +1. **UDP (User Datagram Protocol)**: + - UDP is a connectionless protocol, meaning that messages (datagrams) are sent without establishing a connection. This makes UDP lightweight and fast, but messages may be lost or received out of order. + - The algorithm compensates for this by periodically resending heartbeats, ensuring that missed packets do not cause failure. + +2. **`select()` System Call**: + - `select()` allows the program to wait for activity on the UDP socket for incoming messages. If no activity is detected within the heartbeat interval, the process sends a heartbeat. + - This allows the program to multiplex between sending heartbeats and receiving messages, ensuring efficient coordination. + +3. **Docker Environment**: + - Each process runs in a separate Docker container, and the containers communicate over a Docker network using the hostnames listed in `hostsfile.txt`. + - Docker provides isolation, but processes can still discover and communicate with each other using the shared network. + +### Conclusion + +The UDP Heartbeat Coordination Algorithm ensures reliable synchronization between multiple processes in a distributed system. By using heartbeat messages and non-blocking I/O with `select()`, the algorithm achieves coordination without complex multithreading or blocking I/O. Docker provides a robust environment for simulating real-world distributed systems, making this approach scalable and practical for larger applications. + +### Future Considerations + +- **Error Handling**: Future improvements can add more robust error handling, such as retries for missed heartbeats. +- **Fault Tolerance**: To improve fault tolerance, the system could be enhanced to detect and handle failures in individual processes. + +--- + +## Running the UDP Heartbeat Coordination Program + +### Prerequisites + +Ensure that you have the following installed: +- [Docker](https://www.docker.com/get-started) +- [Docker Compose](https://docs.docker.com/compose/install/) + +### Building and Running the Program with Docker + +#### Step 1: Clone the repository +First, clone the repository containing the UDP Heartbeat Coordination Algorithm: + +```bash +git clone +cd udp-heartbeat-coordination +``` + +#### Step 2: Build the Docker Image + +Use the following command to build the Docker image for the program: + +```bash +docker build -t udp-heartbeat . +``` + +This command creates a Docker image tagged as udp-heartbeat from the Dockerfile. + +#### Step 3: Create a Docker Network + +If you don’t already have a Docker network, create one to allow communication between containers: + +```bash +docker network create mynetwork +``` + +#### Step 4: Run Containers + +Run multiple containers (processes) using the Docker image, ensuring each container uses a different hostname and the same network. + +For example, to run two containers: + +```bash +docker run --name container1 --network mynetwork --hostname container1 udp-heartbeat -h /usr/src/myapp/hostsfile.txt +docker run --name container2 --network mynetwork --hostname container2 udp-heartbeat -h /usr/src/myapp/hostsfile.txt +``` + + • --name: The name of the Docker container. + • --network: The Docker network you created in Step 3. + • --hostname: The hostname used by the program. + • -h /usr/src/myapp/hostsfile.txt: The path to the hostsfile.txt inside the Docker container. + +#### Step 5: Verify Communication + +Each container will send and receive heartbeat messages. When all processes are ready, the message “READY” will be printed to the console. + +### Running the Program with Docker Compose + +```bash +docker-compose up +``` + +#### Verify Output + +Check the logs for each container to verify that the heartbeat messages are being sent and received. The program will print "READY" once all processes have communicated and are ready. + +#### Shut Down Containers + +To stop and remove the containers created by Docker Compose, run: + +```bash +docker-compose down +``` + +This will clean up all the resources used by the containers. + +These instructions will help you build and run the UDP Heartbeat Coordination program using Docker and Docker Compose. Make sure the hostsfile.txt contains the hostnames of all the participating containers for correct coordination. diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..813664f --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,44 @@ +services: + peer1: + image: prj1 + networks: + - mynetwork + hostname: "peer1" + container_name: "peer1" + command: -h hostsfile.txt + + peer2: + image: prj1 + networks: + - mynetwork + hostname: "peer2" + container_name: "peer2" + command: -h hostsfile.txt + + peer3: + image: prj1 + networks: + - mynetwork + hostname: "peer3" + container_name: "peer3" + command: -h hostsfile.txt + + peer4: + image: prj1 + networks: + - mynetwork + hostname: "peer4" + container_name: "peer4" + command: -h hostsfile.txt + + peer5: + image: prj1 + networks: + - mynetwork + hostname: "peer5" + container_name: "peer5" + command: -h hostsfile.txt + +networks: + # The presence of these objects is sufficient to define them + mynetwork: {} diff --git a/hostsfile.txt b/hostsfile.txt new file mode 100644 index 0000000..bf660a5 --- /dev/null +++ b/hostsfile.txt @@ -0,0 +1,5 @@ +peer1 +peer2 +peer3 +peer4 +peer5 diff --git a/main.c b/main.c new file mode 100644 index 0000000..d231461 --- /dev/null +++ b/main.c @@ -0,0 +1,195 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define PORT 12345 +#define BUFFER_SIZE 1024 +#define HEARTBEAT_INTERVAL 2 // Send a heartbeat every 2 seconds +#define TIMEOUT 30 // Timeout in seconds for waiting for all heartbeats + +int ready_count = 0; +int total_processes = 0; + +// Function to read the hostfile and store hostnames +void read_hostfile(const char *hostfile, char hostnames[][BUFFER_SIZE], int *total_processes) { + FILE *file = fopen(hostfile, "r"); + if (!file) { + fprintf(stderr, "Error opening hostfile\n"); + exit(EXIT_FAILURE); + } + + char hostname[BUFFER_SIZE]; + int count = 0; + while (fscanf(file, "%s", hostname) != EOF) { + strcpy(hostnames[count], hostname); + count++; + } + + fclose(file); + *total_processes = count; +} + +// Function to send heartbeat messages to other processes +void send_heartbeat(const char *hostname, int sockfd, const char hostnames[][BUFFER_SIZE], int total_processes) { + char message[BUFFER_SIZE]; + snprintf(message, sizeof(message), "HEARTBEAT from %s", hostname); + + for (int i = 0; i < total_processes; i++) { + if (strcmp(hostnames[i], hostname) != 0) { + struct sockaddr_in peer_addr; + memset(&peer_addr, 0, sizeof(peer_addr)); + peer_addr.sin_family = AF_INET; + peer_addr.sin_port = htons(PORT); + + // Resolve the hostname to an IP address + struct hostent *he = gethostbyname(hostnames[i]); + if (he == NULL) { + fprintf(stderr, "Error resolving hostname %s\n", hostnames[i]); + continue; + } + + memcpy(&peer_addr.sin_addr, he->h_addr_list[0], he->h_length); + + char ip_str[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &(peer_addr.sin_addr), ip_str, INET_ADDRSTRLEN); + + fprintf(stderr, "Sending heartbeat to %s (IP: %s)\n", hostnames[i], ip_str); + sendto(sockfd, message, strlen(message), 0, (const struct sockaddr *)&peer_addr, sizeof(peer_addr)); + } + } +} + +// Function to receive heartbeat messages from other processes +void receive_heartbeat(int sockfd, char received_hosts[][BUFFER_SIZE], int *ready_count, int total_processes) { + char buffer[BUFFER_SIZE]; + struct sockaddr_in cli_addr; + socklen_t len = sizeof(cli_addr); + + int n = recvfrom(sockfd, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&cli_addr, &len); + if (n > 0) { + buffer[n] = '\0'; + + char sender[BUFFER_SIZE]; + sscanf(buffer, "HEARTBEAT from %s", sender); + + char ip_str[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &(cli_addr.sin_addr), ip_str, INET_ADDRSTRLEN); + + fprintf(stderr, "Received heartbeat from %s (IP: %s)\n", sender, ip_str); + + // Check if this heartbeat has already been received + int already_received = 0; + for (int i = 0; i < *ready_count; i++) { + if (strcmp(received_hosts[i], sender) == 0) { + already_received = 1; + break; + } + } + + if (!already_received) { + strcpy(received_hosts[*ready_count], sender); + (*ready_count)++; + fprintf(stderr, "Logged heartbeat from %s\n", sender); + } + } +} + +int main(int argc, char *argv[]) { + // Validate arguments + if (argc != 3 || strcmp(argv[1], "-h") != 0) { + fprintf(stderr, "Usage: %s -h \n", argv[0]); + exit(EXIT_FAILURE); + } + + char *hostfile = argv[2]; + char hostname[BUFFER_SIZE]; + + FILE *f = fopen("/etc/hostname", "r"); + if (f == NULL) { + fprintf(stderr, "Error reading hostname\n"); + exit(EXIT_FAILURE); + } + fscanf(f, "%s", hostname); + fclose(f); + + // Array to store the hostnames + char hostnames[BUFFER_SIZE][BUFFER_SIZE]; + char received_hosts[BUFFER_SIZE][BUFFER_SIZE]; // Track received heartbeats + + // Read the hostfile and get the list of other hosts + read_hostfile(hostfile, hostnames, &total_processes); + + // Create a UDP socket + int sockfd = socket(AF_INET, SOCK_DGRAM, 0); + if (sockfd < 0) { + fprintf(stderr, "Socket creation failed\n"); + exit(EXIT_FAILURE); + } + + // Server address setup + struct sockaddr_in servaddr; + memset(&servaddr, 0, sizeof(servaddr)); + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = INADDR_ANY; + servaddr.sin_port = htons(PORT); + + // Bind socket to port + if (bind(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) { + fprintf(stderr, "Socket bind failed\n"); + close(sockfd); + exit(EXIT_FAILURE); + } + + fd_set read_fds; + struct timeval timeout; + time_t last_heartbeat_time = time(NULL); + time_t start_time = time(NULL); + + int all_ready = 0; + + while (1) { + FD_ZERO(&read_fds); + FD_SET(sockfd, &read_fds); + + // Timeout for select to check for heartbeats and send them periodically + timeout.tv_sec = HEARTBEAT_INTERVAL; + timeout.tv_usec = 0; + + // Check for incoming heartbeat messages + int activity = select(sockfd + 1, &read_fds, NULL, NULL, &timeout); + if (activity > 0 && FD_ISSET(sockfd, &read_fds)) { + receive_heartbeat(sockfd, received_hosts, &ready_count, total_processes); + } + + // Check if all heartbeats have been received + if (ready_count >= total_processes - 1) { + fprintf(stderr, "READY\n"); + all_ready = 1; + break; + } + + // Send heartbeat if enough time has passed since the last heartbeat + if (difftime(time(NULL), last_heartbeat_time) >= HEARTBEAT_INTERVAL) { + send_heartbeat(hostname, sockfd, hostnames, total_processes); + last_heartbeat_time = time(NULL); + } + + // Check if timeout has been reached + if (difftime(time(NULL), start_time) >= TIMEOUT) { + fprintf(stderr, "Timeout reached. Exiting.\n"); + break; + } + } + + // Clean up + close(sockfd); + + return 0; +} \ No newline at end of file