Skip to content

Commit

Permalink
Stop the test if one of the threads terminated
Browse files Browse the repository at this point in the history
  • Loading branch information
davidBar-On committed Feb 24, 2024
1 parent e004dc3 commit cf29e22
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/iperf.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ struct iperf_stream
int remote_port;
int socket;
int id;
int thread_number;
int sender;
/* XXX: is settings just a pointer to the same struct in iperf_test? if not,
should it be? */
Expand Down
1 change: 1 addition & 0 deletions src/iperf_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ enum {
IEPTHREADJOIN=152, // Unable to join thread (check perror)
IEPTHREADATTRINIT=153, // Unable to initialize thread attribute (check perror)
IEPTHREADATTRDESTROY=154, // Unable to destroy thread attribute (check perror)
IEPTHREADNOTRUNNING=155, // A thread stopped running unexpectedly
/* Stream errors */
IECREATESTREAM = 200, // Unable to create a new stream (check herror/perror)
IEINITSTREAM = 201, // Unable to initialize stream (check herror/perror)
Expand Down
49 changes: 39 additions & 10 deletions src/iperf_client_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
#endif /* TCP_CA_NAME_MAX */
#endif /* HAVE_TCP_CONGESTION */

// variable for number of active threads count (for checking if any failed)
static volatile int running_threads = 0;
static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;

void *
iperf_client_worker_run(void *s) {
struct iperf_stream *sp = (struct iperf_stream *) s;
Expand All @@ -75,6 +79,12 @@ iperf_client_worker_run(void *s) {
return NULL;

cleanup_and_fail:
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(sp->test, "Thread number %d FD %d terminated unexpectedly\n", sp->thread_number, sp->socket);
}
pthread_mutex_lock(&running_mutex);
running_threads--; // Indicate that the thread failed
pthread_mutex_unlock(&running_mutex);
return NULL;
}

Expand Down Expand Up @@ -545,6 +555,7 @@ iperf_run_client(struct iperf_test * test)
int64_t timeout_us;
int64_t rcv_timeout_us;
int i_errno_save;
int total_num_streams = 0;

if (NULL == test)
{
Expand Down Expand Up @@ -678,13 +689,23 @@ iperf_run_client(struct iperf_test * test)
goto cleanup_and_fail;
}

pthread_mutex_lock(&running_mutex);
running_threads = 0;
total_num_streams = 0;
pthread_mutex_unlock(&running_mutex);
SLIST_FOREACH(sp, &test->streams, streams) {
pthread_mutex_lock(&running_mutex);
running_threads++; // Count running threads
sp->thread_number = running_threads;
pthread_mutex_unlock(&running_mutex);
total_num_streams++;

if (pthread_create(&(sp->thr), &attr, &iperf_client_worker_run, sp) != 0) {
i_errno = IEPTHREADCREATE;
goto cleanup_and_fail;
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d created\n", sp->socket);
iperf_printf(test, "Thread number %d using FD %d created\n", sp->thread_number, sp->socket);
}
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
Expand Down Expand Up @@ -725,18 +746,18 @@ iperf_run_client(struct iperf_test * test)
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADCANCEL;
errno = rc;
iperf_err(test, "sender cancel in pthread_cancel - %s", iperf_strerror(i_errno));
iperf_err(test, "sender cancel in pthread_cancel of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
goto cleanup_and_fail;
}
rc = pthread_join(sp->thr, NULL);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADJOIN;
errno = rc;
iperf_err(test, "sender cancel in pthread_join - %s", iperf_strerror(i_errno));
iperf_err(test, "sender cancel in pthread_join of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
goto cleanup_and_fail;
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d stopped\n", sp->socket);
iperf_printf(test, "Thread number %d FD %d stopped\n", sp->thread_number, sp->socket);
}
}
}
Expand All @@ -751,6 +772,14 @@ iperf_run_client(struct iperf_test * test)
if (iperf_set_send_state(test, TEST_END) != 0)
goto cleanup_and_fail;
}

/* Terminate if one of the threads failed */
if (running_threads != total_num_streams) {
i_errno = IEPTHREADNOTRUNNING;
iperf_err(test, "Number of running threads is %d but expected %d", running_threads, test->num_streams);
goto cleanup_and_fail;
}

}
}

Expand All @@ -763,18 +792,18 @@ iperf_run_client(struct iperf_test * test)
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADCANCEL;
errno = rc;
iperf_err(test, "receiver cancel in pthread_cancel - %s", iperf_strerror(i_errno));
iperf_err(test, "receiver cancel in pthread_cancel of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
goto cleanup_and_fail;
}
rc = pthread_join(sp->thr, NULL);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADJOIN;
errno = rc;
iperf_err(test, "receiver cancel in pthread_join - %s", iperf_strerror(i_errno));
iperf_err(test, "receiver cancel in pthread_join of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
goto cleanup_and_fail;
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d stopped\n", sp->socket);
iperf_printf(test, "Thread number %d FD %d stopped\n", sp->thread_number, sp->socket);
}
}
}
Expand Down Expand Up @@ -804,16 +833,16 @@ iperf_run_client(struct iperf_test * test)
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADCANCEL;
errno = rc;
iperf_err(test, "cleanup_and_fail in pthread_cancel - %s", iperf_strerror(i_errno));
iperf_err(test, "cleanup_and_fail in pthread_cancel of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
}
rc = pthread_join(sp->thr, NULL);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADJOIN;
errno = rc;
iperf_err(test, "cleanup_and_fail in pthread_join - %s", iperf_strerror(i_errno));
iperf_err(test, "cleanup_and_fail in pthread_join of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d stopped\n", sp->socket);
iperf_printf(test, "Thread number %d FD %d stopped\n", sp->thread_number, sp->socket);
}
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
Expand Down
4 changes: 4 additions & 0 deletions src/iperf_error.c
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,10 @@ iperf_strerror(int int_errno)
snprintf(errstr, len, "unable to destroy thread attributes");
perr = 1;
break;
case IEPTHREADNOTRUNNING:
snprintf(errstr, len, "a thread stopped running unexpectedly");
perr = 1;
break;
default:
snprintf(errstr, len, "int_errno=%d", int_errno);
perr = 1;
Expand Down
38 changes: 34 additions & 4 deletions src/iperf_server_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@
#endif /* TCP_CA_NAME_MAX */
#endif /* HAVE_TCP_CONGESTION */

// variable for number of active threads count
static volatile int running_threads = 0;
static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;

void *
iperf_server_worker_run(void *s) {
struct iperf_stream *sp = (struct iperf_stream *) s;
Expand All @@ -90,6 +94,12 @@ iperf_server_worker_run(void *s) {
return NULL;

cleanup_and_fail:
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(sp->test, "Thread number %d FD %d terminated unexpectedly\n", sp->thread_number, sp->socket);
}
pthread_mutex_lock(&running_mutex);
running_threads--; // Indicate that the thread failed
pthread_mutex_unlock(&running_mutex);
return NULL;
}

Expand Down Expand Up @@ -424,16 +434,16 @@ cleanup_server(struct iperf_test *test)
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADCANCEL;
errno = rc;
iperf_err(test, "cleanup_server in pthread_cancel - %s", iperf_strerror(i_errno));
iperf_err(test, "cleanup_server in pthread_cancel of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
}
rc = pthread_join(sp->thr, NULL);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADJOIN;
errno = rc;
iperf_err(test, "cleanup_server in pthread_join - %s", iperf_strerror(i_errno));
iperf_err(test, "cleanup_server in pthread_join of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d stopped\n", sp->socket);
iperf_printf(test, "Thread number %d FD %d stopped\n", sp->thread_number, sp->socket);
}
}
i_errno = i_errno_save;
Expand Down Expand Up @@ -511,6 +521,7 @@ iperf_run_server(struct iperf_test *test)
int64_t t_usecs;
int64_t timeout_us;
int64_t rcv_timeout_us;
int total_num_streams = 0;

if (test->logfile)
if (iperf_open_logfile(test) < 0)
Expand Down Expand Up @@ -872,14 +883,24 @@ iperf_run_server(struct iperf_test *test)
cleanup_server(test);
};

pthread_mutex_lock(&running_mutex);
running_threads = 0;
total_num_streams = 0;
pthread_mutex_unlock(&running_mutex);
SLIST_FOREACH(sp, &test->streams, streams) {
pthread_mutex_lock(&running_mutex);
running_threads++; // Count running threads
sp->thread_number = running_threads;
pthread_mutex_unlock(&running_mutex);
total_num_streams++;

if (pthread_create(&(sp->thr), &attr, &iperf_server_worker_run, sp) != 0) {
i_errno = IEPTHREADCREATE;
cleanup_server(test);
return -1;
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d created\n", sp->socket);
iperf_printf(test, "Thread number %d FD %d created\n", sp->thread_number, sp->socket);
}
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
Expand All @@ -893,6 +914,15 @@ iperf_run_server(struct iperf_test *test)
}
}

/* Terminate if any thread failed */
if (test->state == TEST_RUNNING) {
if (running_threads != total_num_streams) {
i_errno = IEPTHREADNOTRUNNING;
iperf_err(test, "Number of running threads is %d but expected %d", running_threads, test->num_streams);
cleanup_server(test);
}
}

if (result == 0 ||
(timeout != NULL && timeout->tv_sec == 0 && timeout->tv_usec == 0)) {
/* Run the timers. */
Expand Down

0 comments on commit cf29e22

Please sign in to comment.