Skip to content

Commit

Permalink
Add a test case for AXL_Cancel'ing a transfer
Browse files Browse the repository at this point in the history
- Make canceling pthread transfers actually work
- Add "number of files" (-n) and "seconds until cancel" (-s)
  options to test_axl.
- Add test case to cancel a pthreads and BB API transfer
- Fix some unused variable build warnings for BB API code

Signed-off-by: Tony Hutter <[email protected]>
  • Loading branch information
tonyhutter authored and gonsie committed Jun 17, 2019
1 parent 598b74e commit bf31fd3
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 70 deletions.
24 changes: 17 additions & 7 deletions src/axl.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ typedef enum {
AXL_XFER_STATE_NULL, /* placeholder for invalid state */
AXL_XFER_STATE_CREATED, /* handle has been created */
AXL_XFER_STATE_DISPATCHED, /* transfer has been dispatched */
AXL_XFER_STATE_COMPLETED, /* wait has been called */
AXL_XFER_STATE_WAITING, /* wait has been called */
AXL_XFER_STATE_COMPLETED, /* files are all copied */
AXL_XFER_STATE_CANCELED, /* transfer was AXL_Cancel'd */
} axl_xfer_state_t;

/*
Expand Down Expand Up @@ -640,18 +642,20 @@ int AXL_Wait (int id)
AXL_ERR("Could not find transfer info for UID %d", id);
return AXL_FAILURE;
}

/* check that handle is in correct state to wait */
if (xstate != AXL_XFER_STATE_DISPATCHED) {
AXL_ERR("Invalid state to wait UID %d", id);
return AXL_FAILURE;
}
kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_COMPLETED);
kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_WAITING);

/* lookup status for the transfer, return if done */
int status;
kvtree_util_get_int(file_list, AXL_KEY_STATUS, &status);

if (status == AXL_STATUS_DEST) {
kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_COMPLETED);
return AXL_SUCCESS;
} else if (status == AXL_STATUS_ERROR) {
return AXL_FAILURE;
Expand Down Expand Up @@ -683,6 +687,7 @@ int AXL_Wait (int id)
rc = AXL_FAILURE;
break;
}
kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_COMPLETED);

/* write data to file if we have one */
if (axl_flush_file) {
Expand All @@ -706,7 +711,8 @@ int AXL_Cancel (int id)
}

/* check that handle is in correct state to cancel */
if (xstate != AXL_XFER_STATE_DISPATCHED) {
if (xstate != AXL_XFER_STATE_DISPATCHED &&
xstate != AXL_XFER_STATE_WAITING) {
AXL_ERR("Invalid state to cancel UID %d", id);
return AXL_FAILURE;
}
Expand All @@ -730,6 +736,7 @@ int AXL_Cancel (int id)
#if 0
case AXL_XFER_SYNC:
rc = axl_sync_cancel(id);
rc = AXL_FAILURE;
break;
#endif
/* TODO: add cancel to backends */
Expand All @@ -746,12 +753,17 @@ int AXL_Cancel (int id)
rc = axl_async_cancel_cppr(id); */
break;
#endif
case AXL_XFER_PTHREAD:
rc = axl_pthread_cancel(id);
break;
default:
AXL_ERR("Unknown transfer type (%d)", (int) xtype);
rc = AXL_FAILURE;
break;
}

kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_CANCELED);

/* write data to file if we have one */
if (axl_flush_file) {
kvtree_write_file(axl_flush_file, axl_file_lists);
Expand All @@ -775,7 +787,8 @@ int AXL_Free (int id)

/* check that handle is in correct state to free */
if (xstate != AXL_XFER_STATE_CREATED &&
xstate != AXL_XFER_STATE_COMPLETED)
xstate != AXL_XFER_STATE_COMPLETED &&
xstate != AXL_XFER_STATE_CANCELED)
{
AXL_ERR("Invalid state to free UID %d", id);
return AXL_FAILURE;
Expand Down Expand Up @@ -819,9 +832,6 @@ int AXL_Stop ()
rc = AXL_FAILURE;
}

/* wait */
AXL_Wait(id);

/* and free it */
if (AXL_Free(id) != AXL_SUCCESS) {
rc = AXL_FAILURE;
Expand Down
2 changes: 1 addition & 1 deletion src/axl_async_bbapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ int axl_async_test_bbapi (int id) {
/* get info about transfer */
BBTransferInfo_t tinfo;
int rc = BB_GetTransferInfo(thandle, &tinfo);
bb_check(rc);

/* check its status */
int status = AXL_STATUS_INPROG;
Expand Down Expand Up @@ -320,7 +321,6 @@ int axl_async_wait_bbapi (int id) {
kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id);

/* Sleep until test changes set status */
int rc;
int status = AXL_STATUS_INPROG;
while (status == AXL_STATUS_INPROG) {
/* delegate work to test call to update status */
Expand Down
62 changes: 41 additions & 21 deletions src/axl_pthread.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <assert.h>
#include "axl_internal.h"
#include "kvtree_util.h"
#include "axl_sync.h"
Expand All @@ -19,7 +20,7 @@
*
* - The number of CPU threads
* - MAX_THREADS
* - The number of files being transfered
* - The number of files being transferred
*/
#define MAX_THREADS 16 /* We don't see much scaling past 16 threads */

Expand Down Expand Up @@ -75,6 +76,8 @@ axl_pthread_func(void *arg)
kvtree_elem *elem;
kvtree *elem_hash;

pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

while (1) {
pthread_mutex_lock(&pdata->lock);

Expand All @@ -83,7 +86,7 @@ axl_pthread_func(void *arg)
if (!work) {
/* No more work to do */
pthread_mutex_unlock(&pdata->lock);
pthread_exit((void *) AXL_SUCCESS);
break;
} else {
/* Take our work out of the queue */
pdata->head = pdata->head->next;
Expand All @@ -109,10 +112,12 @@ axl_pthread_func(void *arg)

free(work);

if (rc != AXL_SUCCESS)
if (rc != AXL_SUCCESS) {
pthread_exit((void *) AXL_FAILURE);
}
}
return NULL;

return AXL_SUCCESS;
}

static struct axl_pthread_data *
Expand Down Expand Up @@ -283,10 +288,11 @@ int axl_pthread_test (int id)
{
kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id);
int64_t ptr;
struct axl_pthread_data *pdata;
struct axl_pthread_data *pdata = NULL;

kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr);
pdata = (struct axl_pthread_data *) ptr;
assert(pdata);

/* Is there still work in the workqueue? */
if (pdata->head) {
Expand All @@ -301,12 +307,12 @@ int axl_pthread_wait (int id)
{
/* get pointer to file list for this dataset */
kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id);
int64_t ptr;
struct axl_pthread_data *pdata;
int64_t ptr = 0;
struct axl_pthread_data *pdata = NULL;
int rc;
int final_rc = AXL_SUCCESS;
int i;
void *rc_ptr;
void *rc_ptr = NULL;

kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr);
pdata = (struct axl_pthread_data *) ptr;
Expand All @@ -319,23 +325,22 @@ int axl_pthread_wait (int id)
/* All our threads are now started. Wait for them to finish */
for (i = 0; i < pdata->threads; i++) {
rc = pthread_join(pdata->tid[i], &rc_ptr);

/*
* Thread either finished successfully, or was canceled by
* AXL_Cancel(). Both statuses are valid.
*/
if (rc != 0 && rc_ptr != PTHREAD_CANCELED) {
if (rc != 0) {
AXL_ERR("pthread_join(%d) failed (%d)", i, rc);
return AXL_FAILURE;
}

/*
* Check the rc that the thread actually reported. The thread
* returns a void * that we encode our rc value in.
* returns a void * that we encode our rc value in. If the
* thread was canceled, that totally valid and fine.
*/
rc = (int) ((unsigned long) rc_ptr);
if (rc) {
final_rc |= AXL_FAILURE;
if (rc_ptr != PTHREAD_CANCELED) {
rc = (int) ((unsigned long) rc_ptr);
if (rc) {
AXL_ERR("pthread join rc_ptr was set as %d\n", rc);
final_rc |= AXL_FAILURE;
}
}
}
if (final_rc != AXL_SUCCESS) {
Expand All @@ -360,17 +365,31 @@ int axl_pthread_cancel (int id)
struct axl_pthread_data *pdata;
int rc = 0;
int i;
void *rc_ptr;

kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr);
pdata = (struct axl_pthread_data *) ptr;
assert(pdata);

for (i = 0; i < pdata->threads; i++) {
rc |= pthread_cancel(pdata->tid[i]);
/* send the thread a cancellation request */
int rc = pthread_cancel(pdata->tid[i]);
if (rc) {
AXL_ERR("pthread_cancel failed, rc %d\n");
break;
}

/* wait for the thread to actually exit */
pthread_join(pdata->tid[i], &rc_ptr);
if (rc_ptr != 0 && rc_ptr != PTHREAD_CANCELED) {
AXL_ERR("pthread_join failed, rc_ptr %p", rc_ptr);
rc |= (int) ((unsigned long) rc_ptr);
}
}
if (rc != 0) {
AXL_ERR("Bad return code from pthread_cancel()\n");
AXL_ERR("Bad return code from canceling a thread\n");
}
return AXL_FAILURE;
return rc;
}

void axl_pthread_free (int id)
Expand All @@ -387,6 +406,7 @@ void axl_pthread_free (int id)
kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr);
pdata = (struct axl_pthread_data *) ptr;
if (pdata) {
kvtree_util_set_int64(file_list, AXL_KEY_PTHREAD_DATA, 0);
axl_pthread_free_pdata(pdata);
}
}
7 changes: 7 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ LIST(APPEND axl_test_srcs
axl_cp.c
)

# Build with debug symbols (-g)
# SET(CMAKE_BUILD_TYPE Debug)

ADD_EXECUTABLE(axl_cp ${axl_test_srcs})

TARGET_LINK_LIBRARIES(axl_cp axl)
Expand All @@ -20,8 +23,12 @@ ADD_TEST(sync_test test_axl.sh sync)
ADD_TEST(pthreads_test test_axl.sh pthread)
IF(BBAPI_FOUND)
ADD_TEST(bbapi_test test_axl.sh bbapi)
ADD_TEST(bbapi_cancel_test test_axl.sh -n 1500 -c 0.5 bbapi)
ENDIF(BBAPI_FOUND)

# Create 1500 files (~871MB) and cancel the test 200ms into the transfer
ADD_TEST(pthreads_cancel_test test_axl.sh -n 1500 -c 0.2 pthread)

####################
# make a verbose "test" target named "check"
####################
Expand Down
31 changes: 25 additions & 6 deletions test/axl_cp.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,31 @@ usage(void)

void sig_func(int signum)
{
int rc;
if (id == -1) {
/* AXL not initialized yet */
return;
}

printf("axl_cp: canceling\n");
AXL_Cancel(id);
AXL_Wait(id);
AXL_Free(id);
AXL_Finalize();
rc = AXL_Cancel(id);
if (rc != AXL_SUCCESS) {
printf("axl_cp SIGTERM: AXL_Cancel failed (%d)", rc);
exit(rc);
}

rc = AXL_Free(id);
if (rc != AXL_SUCCESS) {
printf("axl_cp SIGTERM: AXL_Free failed (%d)", rc);
exit(rc);
}

rc = AXL_Finalize();
if (rc != AXL_SUCCESS) {
printf("axl_cp SIGTERM: AXL_Finalized failed (%d)", rc);
exit(rc);
}

exit(AXL_SUCCESS);
}

int
Expand All @@ -91,12 +106,16 @@ main(int argc, char **argv) {
memset(&action, 0, sizeof(action));
action.sa_handler = sig_func;
sigaction(SIGTERM, &action, NULL);
char *state_file = NULL;

while ((opt = getopt(argc, argv, "rRX:")) != -1) {
switch (opt) {
case 'X':
xfer_str = optarg;
break;
case 'S':
state_file = optarg;
break;
case 'r':
case 'R':
recursive = 1;
Expand Down Expand Up @@ -144,7 +163,7 @@ main(int argc, char **argv) {
xfer = AXL_XFER_SYNC;
}

rc = AXL_Init(NULL);
rc = AXL_Init(state_file);
if (rc != AXL_SUCCESS) {
printf("AXL_Init() failed (error %d)\n", rc);
return rc;
Expand Down
Loading

0 comments on commit bf31fd3

Please sign in to comment.