diff --git a/src/axl.c b/src/axl.c index 35b5738..9715886 100644 --- a/src/axl.c +++ b/src/axl.c @@ -39,7 +39,9 @@ typedef enum { AXL_XFER_STATE_NULL, /* placeholder for invalid state */ AXL_XFER_STATE_CREATED, /* handle has been created */ AXL_XFER_STATE_DISPATCHED, /* transfer has been dispatched */ - AXL_XFER_STATE_COMPLETED, /* wait has been called */ + AXL_XFER_STATE_WAITING, /* wait has been called */ + AXL_XFER_STATE_COMPLETED, /* files are all copied */ + AXL_XFER_STATE_CANCELED, /* transfer was AXL_Cancel'd */ } axl_xfer_state_t; /* @@ -640,18 +642,20 @@ int AXL_Wait (int id) AXL_ERR("Could not find transfer info for UID %d", id); return AXL_FAILURE; } + /* check that handle is in correct state to wait */ if (xstate != AXL_XFER_STATE_DISPATCHED) { AXL_ERR("Invalid state to wait UID %d", id); return AXL_FAILURE; } - kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_COMPLETED); + kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_WAITING); /* lookup status for the transfer, return if done */ int status; kvtree_util_get_int(file_list, AXL_KEY_STATUS, &status); if (status == AXL_STATUS_DEST) { + kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_COMPLETED); return AXL_SUCCESS; } else if (status == AXL_STATUS_ERROR) { return AXL_FAILURE; @@ -683,6 +687,7 @@ int AXL_Wait (int id) rc = AXL_FAILURE; break; } + kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_COMPLETED); /* write data to file if we have one */ if (axl_flush_file) { @@ -706,7 +711,8 @@ int AXL_Cancel (int id) } /* check that handle is in correct state to cancel */ - if (xstate != AXL_XFER_STATE_DISPATCHED) { + if (xstate != AXL_XFER_STATE_DISPATCHED && + xstate != AXL_XFER_STATE_WAITING) { AXL_ERR("Invalid state to cancel UID %d", id); return AXL_FAILURE; } @@ -730,6 +736,7 @@ int AXL_Cancel (int id) #if 0 case AXL_XFER_SYNC: rc = axl_sync_cancel(id); + rc = AXL_FAILURE; break; #endif /* TODO: add cancel to backends */ @@ -746,12 +753,17 @@ int AXL_Cancel (int id) rc = axl_async_cancel_cppr(id); */ break; #endif + case AXL_XFER_PTHREAD: + rc = axl_pthread_cancel(id); + break; default: AXL_ERR("Unknown transfer type (%d)", (int) xtype); rc = AXL_FAILURE; break; } + kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_CANCELED); + /* write data to file if we have one */ if (axl_flush_file) { kvtree_write_file(axl_flush_file, axl_file_lists); @@ -775,7 +787,8 @@ int AXL_Free (int id) /* check that handle is in correct state to free */ if (xstate != AXL_XFER_STATE_CREATED && - xstate != AXL_XFER_STATE_COMPLETED) + xstate != AXL_XFER_STATE_COMPLETED && + xstate != AXL_XFER_STATE_CANCELED) { AXL_ERR("Invalid state to free UID %d", id); return AXL_FAILURE; @@ -819,9 +832,6 @@ int AXL_Stop () rc = AXL_FAILURE; } - /* wait */ - AXL_Wait(id); - /* and free it */ if (AXL_Free(id) != AXL_SUCCESS) { rc = AXL_FAILURE; diff --git a/src/axl_async_bbapi.c b/src/axl_async_bbapi.c index f6eed51..a4846a8 100644 --- a/src/axl_async_bbapi.c +++ b/src/axl_async_bbapi.c @@ -278,6 +278,7 @@ int axl_async_test_bbapi (int id) { /* get info about transfer */ BBTransferInfo_t tinfo; int rc = BB_GetTransferInfo(thandle, &tinfo); + bb_check(rc); /* check its status */ int status = AXL_STATUS_INPROG; @@ -320,7 +321,6 @@ int axl_async_wait_bbapi (int id) { kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id); /* Sleep until test changes set status */ - int rc; int status = AXL_STATUS_INPROG; while (status == AXL_STATUS_INPROG) { /* delegate work to test call to update status */ diff --git a/src/axl_pthread.c b/src/axl_pthread.c index 7ca1767..411d1fd 100644 --- a/src/axl_pthread.c +++ b/src/axl_pthread.c @@ -1,6 +1,7 @@ #include #include #include +#include #include "axl_internal.h" #include "kvtree_util.h" #include "axl_sync.h" @@ -19,7 +20,7 @@ * * - The number of CPU threads * - MAX_THREADS - * - The number of files being transfered + * - The number of files being transferred */ #define MAX_THREADS 16 /* We don't see much scaling past 16 threads */ @@ -75,6 +76,8 @@ axl_pthread_func(void *arg) kvtree_elem *elem; kvtree *elem_hash; + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); + while (1) { pthread_mutex_lock(&pdata->lock); @@ -83,7 +86,7 @@ axl_pthread_func(void *arg) if (!work) { /* No more work to do */ pthread_mutex_unlock(&pdata->lock); - pthread_exit((void *) AXL_SUCCESS); + break; } else { /* Take our work out of the queue */ pdata->head = pdata->head->next; @@ -109,10 +112,12 @@ axl_pthread_func(void *arg) free(work); - if (rc != AXL_SUCCESS) + if (rc != AXL_SUCCESS) { pthread_exit((void *) AXL_FAILURE); + } } - return NULL; + + return AXL_SUCCESS; } static struct axl_pthread_data * @@ -283,10 +288,11 @@ int axl_pthread_test (int id) { kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id); int64_t ptr; - struct axl_pthread_data *pdata; + struct axl_pthread_data *pdata = NULL; kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr); pdata = (struct axl_pthread_data *) ptr; + assert(pdata); /* Is there still work in the workqueue? */ if (pdata->head) { @@ -301,12 +307,12 @@ int axl_pthread_wait (int id) { /* get pointer to file list for this dataset */ kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id); - int64_t ptr; - struct axl_pthread_data *pdata; + int64_t ptr = 0; + struct axl_pthread_data *pdata = NULL; int rc; int final_rc = AXL_SUCCESS; int i; - void *rc_ptr; + void *rc_ptr = NULL; kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr); pdata = (struct axl_pthread_data *) ptr; @@ -319,23 +325,22 @@ int axl_pthread_wait (int id) /* All our threads are now started. Wait for them to finish */ for (i = 0; i < pdata->threads; i++) { rc = pthread_join(pdata->tid[i], &rc_ptr); - - /* - * Thread either finished successfully, or was canceled by - * AXL_Cancel(). Both statuses are valid. - */ - if (rc != 0 && rc_ptr != PTHREAD_CANCELED) { + if (rc != 0) { AXL_ERR("pthread_join(%d) failed (%d)", i, rc); return AXL_FAILURE; } /* * Check the rc that the thread actually reported. The thread - * returns a void * that we encode our rc value in. + * returns a void * that we encode our rc value in. If the + * thread was canceled, that totally valid and fine. */ - rc = (int) ((unsigned long) rc_ptr); - if (rc) { - final_rc |= AXL_FAILURE; + if (rc_ptr != PTHREAD_CANCELED) { + rc = (int) ((unsigned long) rc_ptr); + if (rc) { + AXL_ERR("pthread join rc_ptr was set as %d\n", rc); + final_rc |= AXL_FAILURE; + } } } if (final_rc != AXL_SUCCESS) { @@ -360,17 +365,31 @@ int axl_pthread_cancel (int id) struct axl_pthread_data *pdata; int rc = 0; int i; + void *rc_ptr; kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr); pdata = (struct axl_pthread_data *) ptr; + assert(pdata); for (i = 0; i < pdata->threads; i++) { - rc |= pthread_cancel(pdata->tid[i]); + /* send the thread a cancellation request */ + int rc = pthread_cancel(pdata->tid[i]); + if (rc) { + AXL_ERR("pthread_cancel failed, rc %d\n"); + break; + } + + /* wait for the thread to actually exit */ + pthread_join(pdata->tid[i], &rc_ptr); + if (rc_ptr != 0 && rc_ptr != PTHREAD_CANCELED) { + AXL_ERR("pthread_join failed, rc_ptr %p", rc_ptr); + rc |= (int) ((unsigned long) rc_ptr); + } } if (rc != 0) { - AXL_ERR("Bad return code from pthread_cancel()\n"); + AXL_ERR("Bad return code from canceling a thread\n"); } - return AXL_FAILURE; + return rc; } void axl_pthread_free (int id) @@ -387,6 +406,7 @@ void axl_pthread_free (int id) kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr); pdata = (struct axl_pthread_data *) ptr; if (pdata) { + kvtree_util_set_int64(file_list, AXL_KEY_PTHREAD_DATA, 0); axl_pthread_free_pdata(pdata); } } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 6ae870c..af90d11 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -6,6 +6,9 @@ LIST(APPEND axl_test_srcs axl_cp.c ) +# Build with debug symbols (-g) +# SET(CMAKE_BUILD_TYPE Debug) + ADD_EXECUTABLE(axl_cp ${axl_test_srcs}) TARGET_LINK_LIBRARIES(axl_cp axl) @@ -20,8 +23,12 @@ ADD_TEST(sync_test test_axl.sh sync) ADD_TEST(pthreads_test test_axl.sh pthread) IF(BBAPI_FOUND) ADD_TEST(bbapi_test test_axl.sh bbapi) + ADD_TEST(bbapi_cancel_test test_axl.sh -n 1500 -c 0.5 bbapi) ENDIF(BBAPI_FOUND) +# Create 1500 files (~871MB) and cancel the test 200ms into the transfer +ADD_TEST(pthreads_cancel_test test_axl.sh -n 1500 -c 0.2 pthread) + #################### # make a verbose "test" target named "check" #################### diff --git a/test/axl_cp.c b/test/axl_cp.c index fb5c8a1..5d81202 100644 --- a/test/axl_cp.c +++ b/test/axl_cp.c @@ -62,16 +62,31 @@ usage(void) void sig_func(int signum) { + int rc; if (id == -1) { /* AXL not initialized yet */ return; } - printf("axl_cp: canceling\n"); - AXL_Cancel(id); - AXL_Wait(id); - AXL_Free(id); - AXL_Finalize(); + rc = AXL_Cancel(id); + if (rc != AXL_SUCCESS) { + printf("axl_cp SIGTERM: AXL_Cancel failed (%d)", rc); + exit(rc); + } + + rc = AXL_Free(id); + if (rc != AXL_SUCCESS) { + printf("axl_cp SIGTERM: AXL_Free failed (%d)", rc); + exit(rc); + } + + rc = AXL_Finalize(); + if (rc != AXL_SUCCESS) { + printf("axl_cp SIGTERM: AXL_Finalized failed (%d)", rc); + exit(rc); + } + + exit(AXL_SUCCESS); } int @@ -91,12 +106,16 @@ main(int argc, char **argv) { memset(&action, 0, sizeof(action)); action.sa_handler = sig_func; sigaction(SIGTERM, &action, NULL); + char *state_file = NULL; while ((opt = getopt(argc, argv, "rRX:")) != -1) { switch (opt) { case 'X': xfer_str = optarg; break; + case 'S': + state_file = optarg; + break; case 'r': case 'R': recursive = 1; @@ -144,7 +163,7 @@ main(int argc, char **argv) { xfer = AXL_XFER_SYNC; } - rc = AXL_Init(NULL); + rc = AXL_Init(state_file); if (rc != AXL_SUCCESS) { printf("AXL_Init() failed (error %d)\n", rc); return rc; diff --git a/test/test_axl.sh b/test/test_axl.sh index 50837ea..7dd9480 100755 --- a/test/test_axl.sh +++ b/test/test_axl.sh @@ -4,11 +4,48 @@ # # Make some files in /tmp and copy them using AXL. # -# -# Usage: test_axl [xfer_type] -# -# xfer_type: sync|pthread|bbapi|dw (defaults to sync if none specified) -# + +function usage +{ +echo " + Usage: test_axl [-c sec [-k]] [-n num_files] [xfer_type] + + -c sec: Cancel transfer after 'sec' seconds (can be decimal number) + -n num_files: Number of files to create (default 50) + xfer_type: sync|pthread|bbapi|dw (defaults to sync if none specified) +" +} + +function isnum +{ + [[ "$1" =~ ^[0-9.]+$ ]] +} + +while getopts "c:kn:" opt; do + case "${opt}" in + c) + sec=${OPTARG} + if ! isnum $sec ; then + echo "'$sec' is not a number of seconds" + usage + exit 1 + fi + ;; + n) + num_files=${OPTARG} + if ! isnum $num_files ; then + echo "'$num_files' is not a number" + usage + exit + fi + ;; + + *) + usage + exit + esac +done +shift $((OPTIND-1)) xfer=$1 if [ -z "$xfer" ] ; then @@ -16,14 +53,14 @@ if [ -z "$xfer" ] ; then fi case $xfer in - sync) ;; - pthread) ;; - bbapi) ;; - dw) ;; - *) - echo "Invalid transfer type '$xfer'" - exit 1; - ;; + sync) ;; + pthread) ;; + bbapi) ;; + dw) ;; + *) + echo "Invalid transfer type '$xfer'" + exit 1; + ;; esac src=$(mktemp -d) @@ -41,7 +78,11 @@ function ctrl_c() { } function create_files { - + num=$1 + if [ -z "$num" ] ; then + # default to creating 50 files + num=50 + fi # Create a directory structure dirs=(./ home project/dir1 project/dir2 docs/c1/data docs/c2/data docs/c2) for i in "${dirs[@]}" ; do @@ -50,35 +91,33 @@ function create_files { num_dirs=${#dirs[@]} # Make 50 files and put them in our directories - for i in {1..50} ; do + for ((i=0; i < $num; i++)) ; do dirnum=$(($i % $num_dirs)) tmp="$src/${dirs[$dirnum]}" dd if=/dev/zero of="$tmp/$i.file" bs=1k count=$i &>/dev/null done } +# $1 Transfer type +# $2 Timeout in seconds (optional). This is needed for the AXL_Cancel tests. function run_test { xfer=$1 + s=$2 + + if [ -z "$s" ] ; then + # No timeout specified, just make it super long + s=9999 + fi rc=0; - echo -en "Testing $xfer transfer...\t" - if ! out1="$(./axl_cp -X $xfer -r $src/* $dest)" ; then - echo "failed copy" + out1="$(timeout --preserve-status $s ./axl_cp -X $xfer -r $src/* $dest)" + rc=$? + if [ "$rc" != "0" ] ; then + echo "failed copy, rc=$rc" echo "$out1" rc=1 - else - # Compare the two directories - if ! out2="$(diff -qr $src $dest)" ; then - echo "failed diff" - echo "$out1" - echo "---" - echo "$out2" - rc=2 - else - echo "success" - fi fi - rm -fr "$dest"/* + if [ "$rc" != "0" ] ; then false else @@ -86,13 +125,51 @@ function run_test fi } -create_files +create_files $num_files # Run our tests -if ! run_test $xfer ; then - # our test failed - cleanup - exit 1 +if [ -z "$sec" ] ; then + echo -en "Testing $xfer transfer...\t" +else + echo -en "Testing $xfer transfer cancel after $sec seconds...\t" +fi + +# There are two types of tests we do here. +# +# 1. A basic copy +# 2. A basic copy where we cancel it partway though to test AXL_Cancel +# +# First run our test, and optionally cancel it +if ! run_test $xfer $sec ; then + # Our copy failed for some reason (independent of the cancellation) + cleanup + exit 1 +else + # Files are copied, verify they're all there and correct + if ! out2="$(diff -qr $src $dest)" ; then + # Files aren't all there. If we canceled the transfer this is + # good, since they shouldn't be all there. Otherwise they + # should be there. + if [ -n "$sec" ] ; then + echo "success" + else + echo "failed. transfer output was:" + echo "$out1" + echo "--- diff was ---" + echo "$out2" + cleanup + exit 1 + fi + + else + if [ -n "$sec" ] ; then + echo "failure. Files were all copied before the cancel" + cleanup + exit 1 + else + echo "success" + fi + fi fi cleanup