diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/.DS_Store differ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..512dddf --- /dev/null +++ b/Dockerfile @@ -0,0 +1,16 @@ +FROM ubuntu:22.04 + +RUN apt-get update && apt-get install -y gcc iproute2 net-tools iputils-ping + +# Copy over the source code and hostfile +COPY . /usr/src/myapp +WORKDIR /usr/src/myapp + +# Compile the program +RUN gcc -o udp_program main.c + +# Use ENTRYPOINT to specify the executable +ENTRYPOINT ["./udp_program"] + +# Default CMD to pass arguments to the entrypoint (can be overridden) +CMD ["-h", "hostfile.txt"] \ No newline at end of file diff --git a/README.md b/README.md index bccb082..ba80567 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,142 @@ -# UDP-Coordinate -A UDP-based heartbeat coordination algorithm for distributed systems. Processes communicate via UDP messages to ensure all participants are ready before proceeding. Implements periodic heartbeats, message exchange, and readiness checks using C and Docker for containerized environments. +# UDP Heartbeat Coordination Program + +## Brief Report on the UDP Heartbeat Coordination Algorithm + +### Introduction + +The UDP Heartbeat Coordination Algorithm enables multiple processes to communicate with each other to ensure that all participants are ready to begin a task. The coordination is achieved through UDP (User Datagram Protocol), where each process sends and receives "heartbeat" messages. This algorithm is particularly useful in distributed systems where synchronization among multiple components is crucial before performing a coordinated task. + +### Algorithm Overview + +The algorithm is implemented in a C program where each process: + +1. Listens for incoming heartbeat messages from other processes using UDP. +2. Sends periodic heartbeat messages to notify other processes of its presence. +3. Monitors the readiness of all other processes and signals "READY" once it has received heartbeats from all other participants. + +The program operates in a Dockerized environment where each process runs in its own container, communicating with others over a shared network. + +### Steps of the Algorithm + +1. **Initialization**: + - The program reads the `hostsfile.txt`, which contains the list of hostnames (or containers) that it needs to communicate with. + - Each process retrieves its hostname (typically from `/etc/hostname` in a Docker environment) and binds a UDP socket to a fixed port. + +2. **Heartbeat Message Exchange**: + - Each process sends a heartbeat message at regular intervals (`HEARTBEAT_INTERVAL`) to all other processes listed in `hostsfile.txt`. The message is of the format: "HEARTBEAT from ". + - The program uses `gethostbyname()` to resolve the hostnames from the `hostsfile.txt` into IP addresses, allowing communication between containers. + +3. **Receiving Heartbeats**: + - The process monitors its UDP socket using the `select()` system call, which checks for incoming messages. The `select()` function ensures that the process does not block indefinitely and allows periodic message sending. + - Upon receiving a heartbeat, the process logs the sender and marks it as "ready." The sender's identity is stored to ensure that heartbeats are only counted once from each process. + +4. **Coordination and Readiness**: + - After receiving a heartbeat from all other processes, the process prints "READY" to indicate that all participants are active and ready. + - If a process does not receive all heartbeats within a predefined timeout (`TIMEOUT`), the process exits with a failure message. + +5. **Concurrency with `select()`**: + - The use of the `select()` function allows the program to simultaneously send and receive messages without needing multithreading. It enables non-blocking I/O by checking the socket for incoming messages with a timeout. + - This ensures the program efficiently manages both heartbeat transmissions and receptions, alternating between the two based on system activity. + +### Key Concepts + +1. **UDP (User Datagram Protocol)**: + - UDP is a connectionless protocol, meaning that messages (datagrams) are sent without establishing a connection. This makes UDP lightweight and fast, but messages may be lost or received out of order. + - The algorithm compensates for this by periodically resending heartbeats, ensuring that missed packets do not cause failure. + +2. **`select()` System Call**: + - `select()` allows the program to wait for activity on the UDP socket for incoming messages. If no activity is detected within the heartbeat interval, the process sends a heartbeat. + - This allows the program to multiplex between sending heartbeats and receiving messages, ensuring efficient coordination. + +3. **Docker Environment**: + - Each process runs in a separate Docker container, and the containers communicate over a Docker network using the hostnames listed in `hostsfile.txt`. + - Docker provides isolation, but processes can still discover and communicate with each other using the shared network. + +### Conclusion + +The UDP Heartbeat Coordination Algorithm ensures reliable synchronization between multiple processes in a distributed system. By using heartbeat messages and non-blocking I/O with `select()`, the algorithm achieves coordination without complex multithreading or blocking I/O. Docker provides a robust environment for simulating real-world distributed systems, making this approach scalable and practical for larger applications. + +### Future Considerations + +- **Error Handling**: Future improvements can add more robust error handling, such as retries for missed heartbeats. +- **Fault Tolerance**: To improve fault tolerance, the system could be enhanced to detect and handle failures in individual processes. + +--- + +## Running the UDP Heartbeat Coordination Program + +### Prerequisites + +Ensure that you have the following installed: +- [Docker](https://www.docker.com/get-started) +- [Docker Compose](https://docs.docker.com/compose/install/) + +### Building and Running the Program with Docker + +#### Step 1: Clone the repository +First, clone the repository containing the UDP Heartbeat Coordination Algorithm: + +```bash +git clone +cd udp-heartbeat-coordination +``` + +#### Step 2: Build the Docker Image + +Use the following command to build the Docker image for the program: + +```bash +docker build -t udp-heartbeat . +``` + +This command creates a Docker image tagged as udp-heartbeat from the Dockerfile. + +#### Step 3: Create a Docker Network + +If you don’t already have a Docker network, create one to allow communication between containers: + +```bash +docker network create mynetwork +``` + +#### Step 4: Run Containers + +Run multiple containers (processes) using the Docker image, ensuring each container uses a different hostname and the same network. + +For example, to run two containers: + +```bash +docker run --name container1 --network mynetwork --hostname container1 udp-heartbeat -h /usr/src/myapp/hostsfile.txt +docker run --name container2 --network mynetwork --hostname container2 udp-heartbeat -h /usr/src/myapp/hostsfile.txt +``` + + • --name: The name of the Docker container. + • --network: The Docker network you created in Step 3. + • --hostname: The hostname used by the program. + • -h /usr/src/myapp/hostsfile.txt: The path to the hostsfile.txt inside the Docker container. + +#### Step 5: Verify Communication + +Each container will send and receive heartbeat messages. When all processes are ready, the message “READY” will be printed to the console. + +### Running the Program with Docker Compose + +```bash +docker-compose up +``` + +#### Verify Output + +Check the logs for each container to verify that the heartbeat messages are being sent and received. The program will print "READY" once all processes have communicated and are ready. + +#### Shut Down Containers + +To stop and remove the containers created by Docker Compose, run: + +```bash +docker-compose down +``` + +This will clean up all the resources used by the containers. + +These instructions will help you build and run the UDP Heartbeat Coordination program using Docker and Docker Compose. Make sure the hostsfile.txt contains the hostnames of all the participating containers for correct coordination. diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..813664f --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,44 @@ +services: + peer1: + image: prj1 + networks: + - mynetwork + hostname: "peer1" + container_name: "peer1" + command: -h hostsfile.txt + + peer2: + image: prj1 + networks: + - mynetwork + hostname: "peer2" + container_name: "peer2" + command: -h hostsfile.txt + + peer3: + image: prj1 + networks: + - mynetwork + hostname: "peer3" + container_name: "peer3" + command: -h hostsfile.txt + + peer4: + image: prj1 + networks: + - mynetwork + hostname: "peer4" + container_name: "peer4" + command: -h hostsfile.txt + + peer5: + image: prj1 + networks: + - mynetwork + hostname: "peer5" + container_name: "peer5" + command: -h hostsfile.txt + +networks: + # The presence of these objects is sufficient to define them + mynetwork: {} diff --git a/hostsfile.txt b/hostsfile.txt new file mode 100644 index 0000000..bf660a5 --- /dev/null +++ b/hostsfile.txt @@ -0,0 +1,5 @@ +peer1 +peer2 +peer3 +peer4 +peer5 diff --git a/main.c b/main.c new file mode 100644 index 0000000..d231461 --- /dev/null +++ b/main.c @@ -0,0 +1,195 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define PORT 12345 +#define BUFFER_SIZE 1024 +#define HEARTBEAT_INTERVAL 2 // Send a heartbeat every 2 seconds +#define TIMEOUT 30 // Timeout in seconds for waiting for all heartbeats + +int ready_count = 0; +int total_processes = 0; + +// Function to read the hostfile and store hostnames +void read_hostfile(const char *hostfile, char hostnames[][BUFFER_SIZE], int *total_processes) { + FILE *file = fopen(hostfile, "r"); + if (!file) { + fprintf(stderr, "Error opening hostfile\n"); + exit(EXIT_FAILURE); + } + + char hostname[BUFFER_SIZE]; + int count = 0; + while (fscanf(file, "%s", hostname) != EOF) { + strcpy(hostnames[count], hostname); + count++; + } + + fclose(file); + *total_processes = count; +} + +// Function to send heartbeat messages to other processes +void send_heartbeat(const char *hostname, int sockfd, const char hostnames[][BUFFER_SIZE], int total_processes) { + char message[BUFFER_SIZE]; + snprintf(message, sizeof(message), "HEARTBEAT from %s", hostname); + + for (int i = 0; i < total_processes; i++) { + if (strcmp(hostnames[i], hostname) != 0) { + struct sockaddr_in peer_addr; + memset(&peer_addr, 0, sizeof(peer_addr)); + peer_addr.sin_family = AF_INET; + peer_addr.sin_port = htons(PORT); + + // Resolve the hostname to an IP address + struct hostent *he = gethostbyname(hostnames[i]); + if (he == NULL) { + fprintf(stderr, "Error resolving hostname %s\n", hostnames[i]); + continue; + } + + memcpy(&peer_addr.sin_addr, he->h_addr_list[0], he->h_length); + + char ip_str[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &(peer_addr.sin_addr), ip_str, INET_ADDRSTRLEN); + + fprintf(stderr, "Sending heartbeat to %s (IP: %s)\n", hostnames[i], ip_str); + sendto(sockfd, message, strlen(message), 0, (const struct sockaddr *)&peer_addr, sizeof(peer_addr)); + } + } +} + +// Function to receive heartbeat messages from other processes +void receive_heartbeat(int sockfd, char received_hosts[][BUFFER_SIZE], int *ready_count, int total_processes) { + char buffer[BUFFER_SIZE]; + struct sockaddr_in cli_addr; + socklen_t len = sizeof(cli_addr); + + int n = recvfrom(sockfd, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&cli_addr, &len); + if (n > 0) { + buffer[n] = '\0'; + + char sender[BUFFER_SIZE]; + sscanf(buffer, "HEARTBEAT from %s", sender); + + char ip_str[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &(cli_addr.sin_addr), ip_str, INET_ADDRSTRLEN); + + fprintf(stderr, "Received heartbeat from %s (IP: %s)\n", sender, ip_str); + + // Check if this heartbeat has already been received + int already_received = 0; + for (int i = 0; i < *ready_count; i++) { + if (strcmp(received_hosts[i], sender) == 0) { + already_received = 1; + break; + } + } + + if (!already_received) { + strcpy(received_hosts[*ready_count], sender); + (*ready_count)++; + fprintf(stderr, "Logged heartbeat from %s\n", sender); + } + } +} + +int main(int argc, char *argv[]) { + // Validate arguments + if (argc != 3 || strcmp(argv[1], "-h") != 0) { + fprintf(stderr, "Usage: %s -h \n", argv[0]); + exit(EXIT_FAILURE); + } + + char *hostfile = argv[2]; + char hostname[BUFFER_SIZE]; + + FILE *f = fopen("/etc/hostname", "r"); + if (f == NULL) { + fprintf(stderr, "Error reading hostname\n"); + exit(EXIT_FAILURE); + } + fscanf(f, "%s", hostname); + fclose(f); + + // Array to store the hostnames + char hostnames[BUFFER_SIZE][BUFFER_SIZE]; + char received_hosts[BUFFER_SIZE][BUFFER_SIZE]; // Track received heartbeats + + // Read the hostfile and get the list of other hosts + read_hostfile(hostfile, hostnames, &total_processes); + + // Create a UDP socket + int sockfd = socket(AF_INET, SOCK_DGRAM, 0); + if (sockfd < 0) { + fprintf(stderr, "Socket creation failed\n"); + exit(EXIT_FAILURE); + } + + // Server address setup + struct sockaddr_in servaddr; + memset(&servaddr, 0, sizeof(servaddr)); + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = INADDR_ANY; + servaddr.sin_port = htons(PORT); + + // Bind socket to port + if (bind(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) { + fprintf(stderr, "Socket bind failed\n"); + close(sockfd); + exit(EXIT_FAILURE); + } + + fd_set read_fds; + struct timeval timeout; + time_t last_heartbeat_time = time(NULL); + time_t start_time = time(NULL); + + int all_ready = 0; + + while (1) { + FD_ZERO(&read_fds); + FD_SET(sockfd, &read_fds); + + // Timeout for select to check for heartbeats and send them periodically + timeout.tv_sec = HEARTBEAT_INTERVAL; + timeout.tv_usec = 0; + + // Check for incoming heartbeat messages + int activity = select(sockfd + 1, &read_fds, NULL, NULL, &timeout); + if (activity > 0 && FD_ISSET(sockfd, &read_fds)) { + receive_heartbeat(sockfd, received_hosts, &ready_count, total_processes); + } + + // Check if all heartbeats have been received + if (ready_count >= total_processes - 1) { + fprintf(stderr, "READY\n"); + all_ready = 1; + break; + } + + // Send heartbeat if enough time has passed since the last heartbeat + if (difftime(time(NULL), last_heartbeat_time) >= HEARTBEAT_INTERVAL) { + send_heartbeat(hostname, sockfd, hostnames, total_processes); + last_heartbeat_time = time(NULL); + } + + // Check if timeout has been reached + if (difftime(time(NULL), start_time) >= TIMEOUT) { + fprintf(stderr, "Timeout reached. Exiting.\n"); + break; + } + } + + // Clean up + close(sockfd); + + return 0; +} \ No newline at end of file