Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a test case for AXL_Cancel'ing a transfer #58

Merged
merged 2 commits into from
Jun 17, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions src/axl.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ typedef enum {
AXL_XFER_STATE_NULL, /* placeholder for invalid state */
AXL_XFER_STATE_CREATED, /* handle has been created */
AXL_XFER_STATE_DISPATCHED, /* transfer has been dispatched */
AXL_XFER_STATE_COMPLETED, /* wait has been called */
AXL_XFER_STATE_WAITING, /* wait has been called */
AXL_XFER_STATE_COMPLETED, /* files are all copied */
AXL_XFER_STATE_CANCELED, /* transfer was AXL_Cancel'd */
} axl_xfer_state_t;

/*
Expand Down Expand Up @@ -640,18 +642,20 @@ int AXL_Wait (int id)
AXL_ERR("Could not find transfer info for UID %d", id);
return AXL_FAILURE;
}

/* check that handle is in correct state to wait */
if (xstate != AXL_XFER_STATE_DISPATCHED) {
AXL_ERR("Invalid state to wait UID %d", id);
return AXL_FAILURE;
}
kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_COMPLETED);
kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_WAITING);

/* lookup status for the transfer, return if done */
int status;
kvtree_util_get_int(file_list, AXL_KEY_STATUS, &status);

if (status == AXL_STATUS_DEST) {
kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_COMPLETED);
return AXL_SUCCESS;
} else if (status == AXL_STATUS_ERROR) {
return AXL_FAILURE;
Expand Down Expand Up @@ -683,6 +687,7 @@ int AXL_Wait (int id)
rc = AXL_FAILURE;
break;
}
kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_COMPLETED);

/* write data to file if we have one */
if (axl_flush_file) {
Expand All @@ -706,7 +711,8 @@ int AXL_Cancel (int id)
}

/* check that handle is in correct state to cancel */
if (xstate != AXL_XFER_STATE_DISPATCHED) {
if (xstate != AXL_XFER_STATE_DISPATCHED &&
xstate != AXL_XFER_STATE_WAITING) {
AXL_ERR("Invalid state to cancel UID %d", id);
return AXL_FAILURE;
}
Expand All @@ -730,6 +736,7 @@ int AXL_Cancel (int id)
#if 0
case AXL_XFER_SYNC:
rc = axl_sync_cancel(id);
rc = AXL_FAILURE;
break;
#endif
/* TODO: add cancel to backends */
Expand All @@ -746,12 +753,17 @@ int AXL_Cancel (int id)
rc = axl_async_cancel_cppr(id); */
break;
#endif
case AXL_XFER_PTHREAD:
rc = axl_pthread_cancel(id);
break;
default:
AXL_ERR("Unknown transfer type (%d)", (int) xtype);
rc = AXL_FAILURE;
break;
}

kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_CANCELED);

/* write data to file if we have one */
if (axl_flush_file) {
kvtree_write_file(axl_flush_file, axl_file_lists);
Expand All @@ -775,7 +787,8 @@ int AXL_Free (int id)

/* check that handle is in correct state to free */
if (xstate != AXL_XFER_STATE_CREATED &&
xstate != AXL_XFER_STATE_COMPLETED)
xstate != AXL_XFER_STATE_COMPLETED &&
xstate != AXL_XFER_STATE_CANCELED)
{
AXL_ERR("Invalid state to free UID %d", id);
return AXL_FAILURE;
Expand Down Expand Up @@ -819,9 +832,6 @@ int AXL_Stop ()
rc = AXL_FAILURE;
}

/* wait */
AXL_Wait(id);

/* and free it */
if (AXL_Free(id) != AXL_SUCCESS) {
rc = AXL_FAILURE;
Expand Down
2 changes: 1 addition & 1 deletion src/axl_async_bbapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ int axl_async_test_bbapi (int id) {
/* get info about transfer */
BBTransferInfo_t tinfo;
int rc = BB_GetTransferInfo(thandle, &tinfo);
bb_check(rc);

/* check its status */
int status = AXL_STATUS_INPROG;
Expand Down Expand Up @@ -320,7 +321,6 @@ int axl_async_wait_bbapi (int id) {
kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id);

/* Sleep until test changes set status */
int rc;
int status = AXL_STATUS_INPROG;
while (status == AXL_STATUS_INPROG) {
/* delegate work to test call to update status */
Expand Down
62 changes: 41 additions & 21 deletions src/axl_pthread.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <assert.h>
#include "axl_internal.h"
#include "kvtree_util.h"
#include "axl_sync.h"
Expand All @@ -19,7 +20,7 @@
*
* - The number of CPU threads
* - MAX_THREADS
* - The number of files being transfered
* - The number of files being transferred
*/
#define MAX_THREADS 16 /* We don't see much scaling past 16 threads */

Expand Down Expand Up @@ -75,6 +76,8 @@ axl_pthread_func(void *arg)
kvtree_elem *elem;
kvtree *elem_hash;

pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

while (1) {
pthread_mutex_lock(&pdata->lock);

Expand All @@ -83,7 +86,7 @@ axl_pthread_func(void *arg)
if (!work) {
/* No more work to do */
pthread_mutex_unlock(&pdata->lock);
pthread_exit((void *) AXL_SUCCESS);
break;
} else {
/* Take our work out of the queue */
pdata->head = pdata->head->next;
Expand All @@ -109,10 +112,12 @@ axl_pthread_func(void *arg)

free(work);

if (rc != AXL_SUCCESS)
if (rc != AXL_SUCCESS) {
pthread_exit((void *) AXL_FAILURE);
}
}
return NULL;

return AXL_SUCCESS;
}

static struct axl_pthread_data *
Expand Down Expand Up @@ -283,10 +288,11 @@ int axl_pthread_test (int id)
{
kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id);
int64_t ptr;
struct axl_pthread_data *pdata;
struct axl_pthread_data *pdata = NULL;

kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr);
pdata = (struct axl_pthread_data *) ptr;
assert(pdata);
gonsie marked this conversation as resolved.
Show resolved Hide resolved

/* Is there still work in the workqueue? */
if (pdata->head) {
Expand All @@ -301,12 +307,12 @@ int axl_pthread_wait (int id)
{
/* get pointer to file list for this dataset */
kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id);
int64_t ptr;
struct axl_pthread_data *pdata;
int64_t ptr = 0;
struct axl_pthread_data *pdata = NULL;
int rc;
int final_rc = AXL_SUCCESS;
int i;
void *rc_ptr;
void *rc_ptr = NULL;

kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr);
pdata = (struct axl_pthread_data *) ptr;
Expand All @@ -319,23 +325,22 @@ int axl_pthread_wait (int id)
/* All our threads are now started. Wait for them to finish */
for (i = 0; i < pdata->threads; i++) {
rc = pthread_join(pdata->tid[i], &rc_ptr);

/*
* Thread either finished successfully, or was canceled by
* AXL_Cancel(). Both statuses are valid.
*/
if (rc != 0 && rc_ptr != PTHREAD_CANCELED) {
if (rc != 0) {
AXL_ERR("pthread_join(%d) failed (%d)", i, rc);
return AXL_FAILURE;
}

/*
* Check the rc that the thread actually reported. The thread
* returns a void * that we encode our rc value in.
* returns a void * that we encode our rc value in. If the
* thread was canceled, that totally valid and fine.
*/
rc = (int) ((unsigned long) rc_ptr);
if (rc) {
final_rc |= AXL_FAILURE;
if (rc_ptr != PTHREAD_CANCELED) {
rc = (int) ((unsigned long) rc_ptr);
if (rc) {
AXL_ERR("pthread join rc_ptr was set as %d\n", rc);
final_rc |= AXL_FAILURE;
}
}
}
if (final_rc != AXL_SUCCESS) {
Expand All @@ -360,17 +365,31 @@ int axl_pthread_cancel (int id)
struct axl_pthread_data *pdata;
int rc = 0;
int i;
void *rc_ptr;

kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr);
pdata = (struct axl_pthread_data *) ptr;
assert(pdata);

for (i = 0; i < pdata->threads; i++) {
rc |= pthread_cancel(pdata->tid[i]);
/* send the thread a cancellation request */
int rc = pthread_cancel(pdata->tid[i]);
if (rc) {
AXL_ERR("pthread_cancel failed, rc %d\n");
break;
}

/* wait for the thread to actually exit */
pthread_join(pdata->tid[i], &rc_ptr);
if (rc_ptr != 0 && rc_ptr != PTHREAD_CANCELED) {
AXL_ERR("pthread_join failed, rc_ptr %p", rc_ptr);
rc |= (int) ((unsigned long) rc_ptr);
}
}
if (rc != 0) {
AXL_ERR("Bad return code from pthread_cancel()\n");
AXL_ERR("Bad return code from canceling a thread\n");
}
return AXL_FAILURE;
return rc;
}

void axl_pthread_free (int id)
Expand All @@ -387,6 +406,7 @@ void axl_pthread_free (int id)
kvtree_util_get_int64(file_list, AXL_KEY_PTHREAD_DATA, &ptr);
pdata = (struct axl_pthread_data *) ptr;
if (pdata) {
kvtree_util_set_int64(file_list, AXL_KEY_PTHREAD_DATA, 0);
axl_pthread_free_pdata(pdata);
}
}
7 changes: 7 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ LIST(APPEND axl_test_srcs
axl_cp.c
)

# Build with debug symbols (-g)
# SET(CMAKE_BUILD_TYPE Debug)

ADD_EXECUTABLE(axl_cp ${axl_test_srcs})

TARGET_LINK_LIBRARIES(axl_cp axl)
Expand All @@ -20,8 +23,12 @@ ADD_TEST(sync_test test_axl.sh sync)
ADD_TEST(pthreads_test test_axl.sh pthread)
IF(BBAPI_FOUND)
ADD_TEST(bbapi_test test_axl.sh bbapi)
ADD_TEST(bbapi_cancel_test test_axl.sh -n 1500 -c 0.5 bbapi)
ENDIF(BBAPI_FOUND)

# Create 1500 files (~871MB) and cancel the test 200ms into the transfer
ADD_TEST(pthreads_cancel_test test_axl.sh -n 1500 -c 0.2 pthread)

####################
# make a verbose "test" target named "check"
####################
Expand Down
31 changes: 25 additions & 6 deletions test/axl_cp.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,31 @@ usage(void)

void sig_func(int signum)
{
int rc;
if (id == -1) {
/* AXL not initialized yet */
return;
}

printf("axl_cp: canceling\n");
AXL_Cancel(id);
AXL_Wait(id);
AXL_Free(id);
AXL_Finalize();
rc = AXL_Cancel(id);
if (rc != AXL_SUCCESS) {
printf("axl_cp SIGTERM: AXL_Cancel failed (%d)", rc);
exit(rc);
}

rc = AXL_Free(id);
if (rc != AXL_SUCCESS) {
printf("axl_cp SIGTERM: AXL_Free failed (%d)", rc);
exit(rc);
}

rc = AXL_Finalize();
if (rc != AXL_SUCCESS) {
printf("axl_cp SIGTERM: AXL_Finalized failed (%d)", rc);
exit(rc);
}

exit(AXL_SUCCESS);
}

int
Expand All @@ -91,12 +106,16 @@ main(int argc, char **argv) {
memset(&action, 0, sizeof(action));
action.sa_handler = sig_func;
sigaction(SIGTERM, &action, NULL);
char *state_file = NULL;

while ((opt = getopt(argc, argv, "rRX:")) != -1) {
switch (opt) {
case 'X':
xfer_str = optarg;
break;
case 'S':
state_file = optarg;
break;
case 'r':
case 'R':
recursive = 1;
Expand Down Expand Up @@ -144,7 +163,7 @@ main(int argc, char **argv) {
xfer = AXL_XFER_SYNC;
}

rc = AXL_Init(NULL);
rc = AXL_Init(state_file);
if (rc != AXL_SUCCESS) {
printf("AXL_Init() failed (error %d)\n", rc);
return rc;
Expand Down
Loading