diff --git a/integration_tests.sh b/integration_tests.sh index ed5fe4b7..fc03030e 100755 --- a/integration_tests.sh +++ b/integration_tests.sh @@ -19,5 +19,5 @@ else . "${1}"/bin/activate fi -pytest src/test/integration +pytest -sv src/test/integration diff --git a/src/include/benchmark.h b/src/include/benchmark.h index 84ed5299..9aecaea6 100644 --- a/src/include/benchmark.h +++ b/src/include/benchmark.h @@ -166,6 +166,9 @@ typedef struct threaddata_s { // which workload stage we're currrently on _Atomic(uint32_t) stage_idx; + // For async linear workloads + _Atomic(uint64_t) current_key; + /* * note: to stop threads, tdata->finished must be set before tdata->do_work * to prevent deadlocking diff --git a/src/include/common.h b/src/include/common.h index cf7ba21c..a5036870 100644 --- a/src/include/common.h +++ b/src/include/common.h @@ -99,6 +99,11 @@ static inline void timespec_add_us(struct timespec* ts, uint64_t us) ts->tv_nsec = nsec % 1000000000LU; } +static inline void timespec_add_s(struct timespec* ts, uint64_t s) +{ + ts->tv_sec += s; +} + /* * returns the length of the given number were it to be printed in decimal @@ -186,4 +191,3 @@ char* parse_string_literal(const char* restrict str, void print_hdr_percentiles(struct hdr_histogram* h, const char* name, uint64_t elapsed_s, as_vector* percentiles, FILE *out_file); - diff --git a/src/main/benchmark.c b/src/main/benchmark.c index 9ecffa27..7e1fd03f 100644 --- a/src/main/benchmark.c +++ b/src/main/benchmark.c @@ -322,6 +322,7 @@ init_tdata(const args_t* args, cdata_t* cdata, thr_coord_t* coord, tdata->t_idx = t_idx; // always start on the first stage atomic_init(&tdata->stage_idx, 0); + atomic_init(&tdata->current_key, args->start_key); atomic_init(&tdata->do_work, true); atomic_init(&tdata->finished, false); diff --git a/src/main/benchmark_init.c b/src/main/benchmark_init.c index 807274fd..9e0c038f 100644 --- a/src/main/benchmark_init.c +++ b/src/main/benchmark_init.c @@ -544,6 +544,8 @@ print_usage(const char* program) printf("-z --threads # Default: 16\n"); printf(" Load generating thread count.\n"); + printf(" This is set to 1 if using --async.\n"); + printf(" Use --event-loops in async mode.\n"); printf("\n"); printf("-g --throughput # Default: 0\n"); @@ -745,6 +747,7 @@ print_usage(const char* program) printf("-a --async # Default: synchronous mode\n"); printf(" Enable asynchronous mode.\n"); + printf(" Use --event-loops to tune performance in async mode.\n"); printf("\n"); printf("-c --async-max-commands # Default: 50\n"); diff --git a/src/main/common.c b/src/main/common.c index 9aa1f89a..4fd880fa 100644 --- a/src/main/common.c +++ b/src/main/common.c @@ -456,7 +456,6 @@ void print_hdr_percentiles(struct hdr_histogram* h, const char* name, fprintf(out_file, "\n"); } - //========================================================== // Local helpers. // diff --git a/src/main/transaction.c b/src/main/transaction.c index 0038d5a3..978593d3 100644 --- a/src/main/transaction.c +++ b/src/main/transaction.c @@ -6,6 +6,10 @@ #include #include +#include +#include +#include +#include #ifndef __aarch64__ #include #endif @@ -17,7 +21,6 @@ #include #include #include -#include #include @@ -25,16 +28,20 @@ // Typedefs & constants. // +struct async_data_s; + struct async_data_s { + tdata_t* tdata; cdata_t* cdata; - stage_t* stage; - // queue to place this item back on once the callback has finished - queue_t* adata_q; + thr_coord_t* coord; + stage_t* stage; // keep each async_data in the same event loop to prevent the possibility // of overflowing an event loop due to bad scheduling as_event_loop* ev_loop; + void (*workload_cb)(struct async_data_s* adata); + // the time at which the async call was made uint64_t start_time; @@ -48,6 +55,11 @@ struct async_data_s { delete_op, udf_op } op; + + bool inactive; + _Atomic(uint64_t)* tpp; + + pthread_mutex_t done_lock; }; @@ -59,11 +71,16 @@ struct async_data_s { LOCAL_HELPER uint32_t _pct_to_fp(float pct); LOCAL_HELPER uint32_t _random_fp(as_random*); -// Latency recrding helpers +// Latency recording helpers LOCAL_HELPER void _record_read(cdata_t* cdata, uint64_t dt_us); LOCAL_HELPER void _record_write(cdata_t* cdata, uint64_t dt_us); LOCAL_HELPER void _record_udf(cdata_t* cdata, uint64_t dt_us); +// Timing helper methods +LOCAL_HELPER int sleep_for_ns(uint64_t n_secs); +LOCAL_HELPER struct timespec timespec_diff(struct timespec l, struct timespec r); +LOCAL_HELPER double timespec_get_ms(struct timespec ts); + // Read/Write singular/batch synchronous operations LOCAL_HELPER int _write_record_sync(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, as_key* key, as_record* rec); @@ -157,17 +174,11 @@ LOCAL_HELPER void _async_batch_write_listener(as_error* err, as_batch_read_recor void* udata, as_event_loop* event_loop); LOCAL_HELPER void _async_val_listener(as_error* err, as_val* val, void* udata, as_event_loop* event_loop); -LOCAL_HELPER struct async_data_s* queue_pop_wait(queue_t* adata_q); -LOCAL_HELPER void linear_writes_async(tdata_t* tdata, cdata_t* cdata, - thr_coord_t* coord, const stage_t* stage, queue_t* adata_q); -LOCAL_HELPER void random_read_write_async(tdata_t* tdata, cdata_t* cdata, - thr_coord_t* coord, const stage_t* stage, queue_t* adata_q); -LOCAL_HELPER void random_read_write_udf_async(tdata_t* tdata, cdata_t* cdata, - thr_coord_t* coord, const stage_t* stage, queue_t* adata_q); -LOCAL_HELPER void linear_deletes_async(tdata_t* tdata, cdata_t* cdata, - thr_coord_t* coord, const stage_t* stage, queue_t* adata_q); -LOCAL_HELPER void random_read_write_delete_async(tdata_t* tdata, cdata_t* cdata, - thr_coord_t* coord, const stage_t* stage, queue_t* adata_q); +LOCAL_HELPER void linear_writes_async(struct async_data_s* adata); +LOCAL_HELPER void random_read_write_async(struct async_data_s* adata); +LOCAL_HELPER void random_read_write_udf_async(struct async_data_s* adata); +LOCAL_HELPER void linear_deletes_async(struct async_data_s* adata); +LOCAL_HELPER void random_read_write_delete_async(struct async_data_s* adata); // Main worker thread loop LOCAL_HELPER void do_sync_workload(tdata_t* tdata, cdata_t* cdata, @@ -280,6 +291,41 @@ _record_udf(cdata_t* cdata, uint64_t dt_us) cdata->udf_count++; } +/****************************************************************************** + * Timing helpers + *****************************************************************************/ + +struct timespec timespec_diff(struct timespec l, struct timespec r) +{ + struct timespec res; + res.tv_sec = l.tv_sec - r.tv_sec; + res.tv_nsec = l.tv_nsec - r.tv_nsec; + + return res; +} + +double timespec_get_ms(struct timespec ts) { + double res; + res = ts.tv_sec * 1000.0; + res += ts.tv_nsec / 1.0e6; + + return res; +} + +int sleep_for_ns(uint64_t n_secs) +{ + struct timespec sleep_time; + int res; + + sleep_time.tv_sec = 0; + sleep_time.tv_nsec = n_secs; + + do { + res = nanosleep(&sleep_time, &sleep_time); + } while (res != 0 && errno == EINTR); + + return res; +} /****************************************************************************** * Read/Write singular/batch synchronous operations @@ -1172,7 +1218,8 @@ linear_deletes(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, thr_coordinator_complete(coord); } -LOCAL_HELPER void random_read_write_delete(tdata_t* tdata, cdata_t* cdata, +LOCAL_HELPER void +random_read_write_delete(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, const stage_t* stage) { uint32_t read_pct = _pct_to_fp(stage->workload.read_pct); @@ -1373,15 +1420,9 @@ _async_listener(as_error* err, void* udata, as_event_loop* event_loop) err->code, err->message); } } - - if (err->code == AEROSPIKE_ERR_NO_MORE_CONNECTIONS) { - // this event loop is full, try another - adata->ev_loop = NULL; - } } - // put this adata object back on the queue - queue_push(adata->adata_q, adata); + adata->workload_cb(adata); } LOCAL_HELPER void @@ -1432,128 +1473,111 @@ _async_val_listener(as_error* err, as_val* val, void* udata, } } -LOCAL_HELPER struct async_data_s* -queue_pop_wait(queue_t* adata_q) -{ - struct async_data_s* adata; - - while (1) { - adata = queue_pop(adata_q); - if (adata == NULL) { - #ifdef __aarch64__ - __asm__ __volatile__("yield"); - #else - _mm_pause(); - #endif - - continue; - } - break; - } - return adata; -} - LOCAL_HELPER void -linear_writes_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, - const stage_t* stage, queue_t* adata_q) +linear_writes_async(struct async_data_s* adata) { - uint64_t key_val, end_key; - struct async_data_s* adata; + tdata_t* tdata = adata->tdata; + cdata_t* cdata = adata->cdata; + const stage_t* stage = adata->stage; - struct timespec wake_time; - uint64_t start_time; + if (! tdata->do_work) { + goto finished; + } + + if (*adata->tpp == 0) { + // throttle this adata + adata->inactive = true; + goto finished; + } + --(*adata->tpp); - key_val = stage->key_start; + uint64_t key_val, end_key; + key_val = atomic_fetch_add(&tdata->current_key, 1); end_key = stage->key_end; - while (tdata->do_work && - key_val < end_key) { - adata = queue_pop_wait(adata_q); + if (key_val >= end_key) { + goto finished; + } - clock_gettime(COORD_CLOCK, &wake_time); - start_time = timespec_to_us(&wake_time); - adata->start_time = start_time; + adata->op = write_op; - adata->op = write_op; + if (stage->batch_write_size <= 1) { + as_record* rec; + _gen_key(key_val, &adata->key, cdata); + rec = _gen_record(tdata->random, cdata, tdata, stage); - if (stage->batch_write_size <= 1) { - as_record* rec; - _gen_key(key_val, &adata->key, cdata); - rec = _gen_record(tdata->random, cdata, tdata, stage); + _write_record_async(&adata->key, rec, adata, tdata, cdata); + + _destroy_record(rec, stage); + key_val++; + } + else { + as_batch_records* batch; - _write_record_async(&adata->key, rec, adata, tdata, cdata); + batch = _gen_batch_writes_sequential_keys(cdata, tdata, stage, key_val); + _batch_write_record_async(batch, adata, tdata, cdata); + key_val += stage->batch_write_size; + } - _destroy_record(rec, stage); - key_val++; - } - else { - as_batch_records* batch; + return; - batch = _gen_batch_writes_sequential_keys(cdata, tdata, stage, key_val); - _batch_write_record_async(batch, adata, tdata, cdata); - key_val += stage->batch_write_size; - } +finished:; + int rv; - uint64_t pause_for = - dyn_throttle_pause_for(&tdata->dyn_throttle, start_time); - timespec_add_us(&wake_time, pause_for); - thr_coordinator_sleep(coord, &wake_time); + if ((rv = pthread_mutex_unlock(&adata->done_lock)) != 0) { + blog_error("failed to unlock mutex - %d\n", rv); + exit(-1); } - - // once we've written everything, there's nothing left to do, so tell - // coord we're done and exit - thr_coordinator_complete(coord); } LOCAL_HELPER void -random_read_write_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, - const stage_t* stage, queue_t* adata_q) +random_read_write_async(struct async_data_s* adata) { - struct async_data_s* adata; - - struct timespec wake_time; - uint64_t start_time; + tdata_t* tdata = adata->tdata; + cdata_t* cdata = adata->cdata; + thr_coord_t* coord = adata->coord; + const stage_t* stage = adata->stage; uint32_t read_pct = _pct_to_fp(stage->workload.read_pct); - // since this workload has no target number of transactions to be made, we - // are always ready to be reaped, and so we notify the coordinator that we - // are finished with our required tasks and can be stopped whenever - thr_coordinator_complete(coord); + if (! tdata->do_work) { + goto finished; + } - while (tdata->do_work) { + if (*adata->tpp == 0) { + adata->inactive = true; + goto finished; + } + --(*adata->tpp); - adata = queue_pop_wait(adata_q); + // roll the die + uint32_t die = _random_fp(tdata->random); - clock_gettime(COORD_CLOCK, &wake_time); - start_time = timespec_to_us(&wake_time); - adata->start_time = start_time; + if (die < read_pct) { + random_read_async(tdata, cdata, coord, stage, adata); + } + else { + random_write_async(tdata, cdata, coord, stage, adata); + } - // roll the die - uint32_t die = _random_fp(tdata->random); + return; - if (die < read_pct) { - random_read_async(tdata, cdata, coord, stage, adata); - } - else { - random_write_async(tdata, cdata, coord, stage, adata); - } +finished:; + int rv; - uint64_t pause_for = - dyn_throttle_pause_for(&tdata->dyn_throttle, start_time); - timespec_add_us(&wake_time, pause_for); - thr_coordinator_sleep(coord, &wake_time); + if ((rv = pthread_mutex_unlock(&adata->done_lock)) != 0) { + blog_error("failed to unlock mutex - %d\n", rv); + exit(-1); } } LOCAL_HELPER void -random_read_write_udf_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, - const stage_t* stage, queue_t* adata_q) +random_read_write_udf_async(struct async_data_s* adata) { - struct async_data_s* adata; - - struct timespec wake_time; - uint64_t start_time; + tdata_t* tdata = adata->tdata; + cdata_t* cdata = adata->cdata; + thr_coord_t* coord = adata->coord; + const stage_t* stage = adata->stage; uint32_t read_pct = _pct_to_fp(stage->workload.read_pct); uint32_t write_pct = _pct_to_fp(stage->workload.write_pct); @@ -1561,100 +1585,104 @@ random_read_write_udf_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, // store the cumulative probability in write_pct write_pct = read_pct + write_pct; - // since this workload has no target number of transactions to be made, we - // are always ready to be reaped, and so we notify the coordinator that we - // are finished with our required tasks and can be stopped whenever - thr_coordinator_complete(coord); + if (! tdata->do_work) { + goto finished; + } - while (tdata->do_work) { + if (*adata->tpp == 0) { + adata->inactive = true; + goto finished; + } + --(*adata->tpp); - adata = queue_pop_wait(adata_q); + // roll the die + uint32_t die = _random_fp(tdata->random); - clock_gettime(COORD_CLOCK, &wake_time); - start_time = timespec_to_us(&wake_time); - adata->start_time = start_time; + if (die < read_pct) { + random_read_async(tdata, cdata, coord, stage, adata); + } + else if (die < write_pct) { + random_write_async(tdata, cdata, coord, stage, adata); + } + else { + random_udf_async(tdata, cdata, coord, stage, adata); + } - // roll the die - uint32_t die = _random_fp(tdata->random); + return; - if (die < read_pct) { - random_read_async(tdata, cdata, coord, stage, adata); - } - else if (die < write_pct) { - random_write_async(tdata, cdata, coord, stage, adata); - } - else { - random_udf_async(tdata, cdata, coord, stage, adata); - } +finished:; + int rv; - uint64_t pause_for = - dyn_throttle_pause_for(&tdata->dyn_throttle, start_time); - timespec_add_us(&wake_time, pause_for); - thr_coordinator_sleep(coord, &wake_time); + if ((rv = pthread_mutex_unlock(&adata->done_lock)) != 0) { + blog_error("failed to unlock mutex - %d\n", rv); + exit(-1); } } LOCAL_HELPER void -linear_deletes_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, - const stage_t* stage, queue_t* adata_q) +linear_deletes_async(struct async_data_s* adata) { - uint64_t key_val, end_key; - struct async_data_s* adata; + tdata_t* tdata = adata->tdata; + cdata_t* cdata = adata->cdata; + const stage_t* stage = adata->stage; - struct timespec wake_time; - uint64_t start_time; + if (! tdata->do_work) { + goto finished; + } - key_val = stage->key_start; + if (*adata->tpp == 0) { + adata->inactive = true; + goto finished; + } + --(*adata->tpp); + + uint64_t key_val, end_key; + key_val = atomic_fetch_add(&tdata->current_key, 1); end_key = stage->key_end; - while (tdata->do_work && - key_val < end_key) { - adata = queue_pop_wait(adata_q); + if (key_val >= end_key) { + goto finished; + } - clock_gettime(COORD_CLOCK, &wake_time); - start_time = timespec_to_us(&wake_time); - adata->start_time = start_time; + adata->op = delete_op; - adata->op = delete_op; + if (stage->batch_delete_size <= 1) { - if (stage->batch_delete_size <= 1) { + as_record* rec; + _gen_key(key_val, &adata->key, cdata); + rec = _gen_nil_record(tdata); - as_record* rec; - _gen_key(key_val, &adata->key, cdata); - rec = _gen_nil_record(tdata); + _write_record_async(&adata->key, rec, adata, tdata, cdata); - _write_record_async(&adata->key, rec, adata, tdata, cdata); + _destroy_record(rec, stage); + key_val++; + } + else { + as_batch_records* batch; - _destroy_record(rec, stage); - key_val++; - } - else { - as_batch_records* batch; + batch = _gen_batch_deletes_sequential_keys(cdata, tdata, stage, key_val); + _batch_write_record_async(batch, adata, tdata, cdata); + key_val += stage->batch_delete_size; + } - batch = _gen_batch_deletes_sequential_keys(cdata, tdata, stage, key_val); - _batch_write_record_async(batch, adata, tdata, cdata); - key_val += stage->batch_delete_size; - } + return; - uint64_t pause_for = - dyn_throttle_pause_for(&tdata->dyn_throttle, start_time); - timespec_add_us(&wake_time, pause_for); - thr_coordinator_sleep(coord, &wake_time); - } +finished:; + int rv; - // once we've written everything, there's nothing left to do, so tell - // coord we're done and exit - thr_coordinator_complete(coord); + if ((rv = pthread_mutex_unlock(&adata->done_lock)) != 0) { + blog_error("failed to unlock mutex - %d\n", rv); + exit(-1); + } } LOCAL_HELPER void -random_read_write_delete_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, - const stage_t* stage, queue_t* adata_q) +random_read_write_delete_async(struct async_data_s* adata) { - struct async_data_s* adata; - - struct timespec wake_time; - uint64_t start_time; + tdata_t* tdata = adata->tdata; + cdata_t* cdata = adata->cdata; + thr_coord_t* coord = adata->coord; + const stage_t* stage = adata->stage; uint32_t read_pct = _pct_to_fp(stage->workload.read_pct); uint32_t write_pct = _pct_to_fp(stage->workload.write_pct); @@ -1662,36 +1690,37 @@ random_read_write_delete_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coor // store the cumulative probability in write_pct write_pct = read_pct + write_pct; - // since this workload has no target number of transactions to be made, we - // are always ready to be reaped, and so we notify the coordinator that we - // are finished with our required tasks and can be stopped whenever - thr_coordinator_complete(coord); + if (! tdata->do_work) { + goto finished; + } - while (tdata->do_work) { + if (*adata->tpp == 0) { + adata->inactive = true; + goto finished; + } + --(*adata->tpp); - adata = queue_pop_wait(adata_q); + // roll the die + uint32_t die = _random_fp(tdata->random); - clock_gettime(COORD_CLOCK, &wake_time); - start_time = timespec_to_us(&wake_time); - adata->start_time = start_time; + if (die < read_pct) { + random_read_async(tdata, cdata, coord, stage, adata); + } + else if (die < write_pct) { + random_write_async(tdata, cdata, coord, stage, adata); + } + else { + random_delete_async(tdata, cdata, coord, stage, adata); + } - // roll the die - uint32_t die = _random_fp(tdata->random); + return; - if (die < read_pct) { - random_read_async(tdata, cdata, coord, stage, adata); - } - else if (die < write_pct) { - random_write_async(tdata, cdata, coord, stage, adata); - } - else { - random_delete_async(tdata, cdata, coord, stage, adata); - } +finished:; + int rv; - uint64_t pause_for = - dyn_throttle_pause_for(&tdata->dyn_throttle, start_time); - timespec_add_us(&wake_time, pause_for); - thr_coordinator_sleep(coord, &wake_time); + if ((rv = pthread_mutex_unlock(&adata->done_lock)) != 0) { + blog_error("failed to unlock mutex - %d\n", rv); + exit(-1); } } @@ -1728,10 +1757,7 @@ LOCAL_HELPER void do_async_workload(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, stage_t* stage) { - struct async_data_s* adatas; uint32_t t_idx = tdata->t_idx; - uint64_t n_adatas; - queue_t adata_q; // thread 0 is designated to handle async calls, the rest can immediately // terminate @@ -1740,46 +1766,147 @@ do_async_workload(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, return; } - n_adatas = cdata->async_max_commands; - adatas = - (struct async_data_s*) cf_malloc(n_adatas * sizeof(struct async_data_s)); + int rv; + uint64_t n_adatas = cdata->async_max_commands; + struct async_data_s* adatas = + (struct async_data_s*) cf_calloc(sizeof(struct async_data_s), n_adatas); + + _Atomic(uint64_t) throttle_tpp; + atomic_init(&throttle_tpp, UINT64_MAX); + + int cycles = 1000; + double desired_tpp = MAXFLOAT; + + if (stage->tps > 0) { + desired_tpp = stage->tps / (double) cycles; + throttle_tpp = floor(desired_tpp); + } + + switch (stage->workload.type) { + case WORKLOAD_TYPE_RU: + case WORKLOAD_TYPE_RR: + case WORKLOAD_TYPE_RUF: + case WORKLOAD_TYPE_RUD: + // No target num txns - tell coord this is reapable. + thr_coordinator_complete(coord); + break; + default: + break; + } - queue_init(&adata_q, n_adatas); for (uint32_t i = 0; i < n_adatas; i++) { struct async_data_s* adata = &adatas[i]; + adata->tdata = tdata; adata->cdata = cdata; + adata->coord = coord; adata->stage = stage; - adata->adata_q = &adata_q; adata->ev_loop = NULL; + adata->tpp = &throttle_tpp; - queue_push(&adata_q, adata); - } + if ((rv = pthread_mutex_init(&adata->done_lock, NULL)) != 0) { + blog_error("failed to initialize mutex - %d\n", rv); + exit(-1); + } - switch (stage->workload.type) { + if ((rv = pthread_mutex_lock(&adata->done_lock)) != 0) { + blog_error("failed to lock mutex - %d\n", rv); + exit(-1); + } + + switch (stage->workload.type) { case WORKLOAD_TYPE_I: - linear_writes_async(tdata, cdata, coord, stage, &adata_q); + adata->workload_cb = linear_writes_async; + linear_writes_async(adata); break; case WORKLOAD_TYPE_RU: case WORKLOAD_TYPE_RR: - random_read_write_async(tdata, cdata, coord, stage, &adata_q); + adata->workload_cb = random_read_write_async; + random_read_write_async(adata); break; case WORKLOAD_TYPE_RUF: - random_read_write_udf_async(tdata, cdata, coord, stage, &adata_q); + adata->workload_cb = random_read_write_udf_async; + random_read_write_udf_async(adata); break; case WORKLOAD_TYPE_D: - linear_deletes_async(tdata, cdata, coord, stage, &adata_q); + adata->workload_cb = linear_deletes_async; + linear_deletes_async(adata); break; case WORKLOAD_TYPE_RUD: - random_read_write_delete_async(tdata, cdata, coord, stage, &adata_q); + adata->workload_cb = random_read_write_delete_async; + random_read_write_delete_async(adata); break; + } + } + + struct timespec start; + clock_gettime(CLOCK_MONOTONIC, &start); + + uint64_t period_us = 1000000 / cycles; + + double partial = 0.0; + while(tdata->do_work && tdata->current_key < stage->key_end) { + sleep_for_ns(period_us); + + struct timespec cycle_end; + clock_gettime(CLOCK_MONOTONIC, &cycle_end); + + struct timespec delta = timespec_diff(cycle_end, start); + double calculated_tpp = timespec_get_ms(delta) * desired_tpp + partial; + double actual_tpp = floor(calculated_tpp); + throttle_tpp = actual_tpp; + + partial = calculated_tpp - actual_tpp; + + for (int i = 0; i < n_adatas; i++) { + struct async_data_s* adata = &adatas[i]; + // if the adata was set to inactive due to throttling + // restart it while more work needs to be done + if (adata->inactive && throttle_tpp >= 1) { + adata->inactive = false; + + if ((rv = pthread_mutex_lock(&adata->done_lock)) != 0) { + blog_error("failed to lock mutex - %d\n", rv); + exit(-1); + } + + adata->workload_cb(adata); + } + } + + start = cycle_end; } // wait for all the async calls to finish for (uint32_t i = 0; i < n_adatas; i++) { - queue_pop_wait(&adata_q); + struct async_data_s* adata = &adatas[i]; + + if ((rv = pthread_mutex_lock(&adata->done_lock)) != 0) { + blog_error("failed to lock mutex - %d\n", rv); + exit(-1); + } + + if ((rv = pthread_mutex_unlock(&adata->done_lock)) != 0) { + blog_error("failed to unlock mutex - %d\n", rv); + exit(-1); + } + + if ((rv = pthread_mutex_destroy(&adata->done_lock)) != 0) { + blog_error("failed to destroy mutex - %d\n", rv); + exit(-1); + } + } + + switch (stage->workload.type) { + case WORKLOAD_TYPE_I: + case WORKLOAD_TYPE_D: + // once we've written everything, there's nothing left to do, so tell + // coord we're done and exit + thr_coordinator_complete(coord); + break; + default: + break; } - queue_free(&adata_q); // free the async_data structs cf_free(adatas);