Skip to content

Commit

Permalink
finish fast_fair PMDK allocator
Browse files Browse the repository at this point in the history
  • Loading branch information
MaTianmao committed Nov 26, 2019
1 parent cbe5f5a commit 8976c6c
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 86 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -g -march=native -mavx -mavx2

## Instruction options for Cache line flush
add_definitions(-DCLFLUSH)
add_definitions(-DVARIABLE_LENGTH)
add_definitions(-DUSE_PMDK)
#add_definitions(-DCLFLUSH_OPT)
#add_definitions(-DCLWB)

Expand Down
4 changes: 2 additions & 2 deletions benchmark/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ static void usage_exit(FILE *out) {
"Command line options : nstore <options> \n"
" -h --help : Print help message \n"
" -t --type : Index type : 0 (PART) 1 (FAST_FAIR) \n"
" -K --key_type : Key type : 0 (Integer) 1 (String) \n"
" -K --key_type : Key type : 0 (Integer) 1 (String) \n"
" -n --num_threads : Number of workers \n"
" -k --keys : Number of key-value pairs at begin\n"
" -s --non_share_memory : Use different index instances among "
Expand All @@ -101,7 +101,7 @@ static void parse_arguments(int argc, char *argv[], Config &state) {
state.type = PART;
state.num_threads = 4;
state.key_type = Integer;
state.init_keys = 1000000;
state.init_keys = 10000000;
state.time = 5;
state.share_memory = true;
state.duration = 1;
Expand Down
35 changes: 24 additions & 11 deletions benchmark/coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "Tree.h"
#include "benchmarks.h"
#include "config.h"
#include "fast_fair_1.h"
#include "fast_fair_2.h"
#include "nvm_mgr.h"
#include "threadinfo.h"
#include "timer.h"
Expand Down Expand Up @@ -360,6 +360,7 @@ template <typename K, typename V, int size> class Coordinator {
bar = new boost::barrier(conf.num_threads + 1);

PART_ns::Key *k = new PART_ns::Key();
printf("init keys: %d\n", (int)conf.init_keys);
for (unsigned long i = 0; i < conf.init_keys; i++) {
if (conf.key_type == Integer) {
long kk = benchmark->nextInitIntKey();
Expand Down Expand Up @@ -392,33 +393,42 @@ template <typename K, typename V, int size> class Coordinator {
}

printf("[COORDINATOR]\tFinish benchmark..\n");
printf("[COORDINATOR]\ttotal throughput: %.3lf Mtps\n",
(double)final_result.throughput / 1000000.0 / conf.duration);
printf("[RESULT]\ttotal throughput: %.3lf Mtps, %d threads, %s, "
"%s, benchmark %d\n",
(double)final_result.throughput / 1000000.0 / conf.duration,
conf.num_threads, (conf.type == PART) ? "ART" : "FF",
(conf.key_type == Integer) ? "Int" : "Str", conf.benchmark);

delete art;
delete[] pid;
delete[] results;
} else if (conf.type == FAST_FAIR) {
// FAST_FAIR
printf("test FAST_FAIR---------------------\n");
// fastfair::init_pmem();
// fastfair::btree *bt =
// new (fastfair::allocate(sizeof(fastfair::btree)))
// fastfair::btree();
#ifdef USE_PMDK
fastfair::init_pmem();
fastfair::btree *bt =
new (fastfair::allocate(sizeof(fastfair::btree)))
fastfair::btree();
std::cout << "[FF]\tPM create tree\n";
#else
fastfair::btree *bt = new fastfair::btree();
std::cout << "[FF]\tmemory create tree\n";
#endif
Benchmark *benchmark = getBenchmark(conf);

Result *results = new Result[conf.num_threads];
memset(results, 0, sizeof(Result) * conf.num_threads);

std::thread **pid = new std::thread *[conf.num_threads];
bar = new boost::barrier(conf.num_threads + 1);

printf("init keys: %d\n", (int)conf.init_keys);
for (unsigned long i = 0; i < conf.init_keys; i++) {
if (conf.key_type == Integer) {
long kk = benchmark->nextInitIntKey();
bt->btree_insert(kk, (char *)kk);
// std::cout<<"insert key "<<kk << " id: "<<i<<"\n";
// std::cout << "insert key " << kk << "
// id: " << i << "\n";
} else if (conf.key_type == String) {
std::string s = benchmark->nextInitStrKey();
bt->btree_insert((char *)s.c_str(), (char *)s.c_str());
Expand All @@ -444,8 +454,11 @@ template <typename K, typename V, int size> class Coordinator {
}

printf("[COORDINATOR]\tFinish benchmark..\n");
printf("[COORDINATOR]\ttotal throughput: %.3lf Mtps\n",
(double)final_result.throughput / 1000000.0 / conf.duration);
printf("[RESULT]\ttotal throughput: %.3lf Mtps, %d threads, %s, "
"%s, benchmark %d\n",
(double)final_result.throughput / 1000000.0 / conf.duration,
conf.num_threads, (conf.type == PART) ? "ART" : "FF",
(conf.key_type == Integer) ? "Int" : "Str", conf.benchmark);

delete[] pid;
delete[] results;
Expand Down
108 changes: 56 additions & 52 deletions fast_fair/fast_fair_1.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <time.h>
#include <unistd.h>
#include <vector>

#ifdef LOCK_INIT
#include "tbb/concurrent_vector.h"
#endif
Expand Down Expand Up @@ -214,11 +213,7 @@ class page {

void *operator new(size_t size) {
void *ret;
#ifdef USE_PMDK
// TODO
#else
posix_memalign(&ret, 64, size);
#endif
return ret;
}

Expand Down Expand Up @@ -558,41 +553,47 @@ class page {

array_end->ptr = (char *)NULL;

clflush((char *)this, CACHE_LINE_SIZE);
if (flush) {
clflush((char *)this, CACHE_LINE_SIZE);
}
} else {
int i = *num_entries - 1, inserted = 0, to_flush_cnt = 0;
records[*num_entries + 1].ptr = records[*num_entries].ptr;
if ((uint64_t) &
(records[*num_entries + 1].ptr) % CACHE_LINE_SIZE == 0)
clflush((char *)&(records[*num_entries + 1].ptr),
sizeof(char *));
if (flush) {
if ((uint64_t) &
(records[*num_entries + 1].ptr) % CACHE_LINE_SIZE == 0)
clflush((char *)&(records[*num_entries + 1].ptr),
sizeof(char *));
}

// FAST
for (i = *num_entries - 1; i >= 0; i--) {
if (key < records[i].key.ikey) {
records[i + 1].ptr = records[i].ptr;
records[i + 1].key.ikey = records[i].key.ikey;

uint64_t records_ptr = (uint64_t)(&records[i + 1]);

int remainder = records_ptr % CACHE_LINE_SIZE;
bool do_flush =
(remainder == 0) ||
((((int)(remainder + sizeof(entry)) /
CACHE_LINE_SIZE) == 1) &&
((remainder + sizeof(entry)) % CACHE_LINE_SIZE) != 0);
if (do_flush) {
clflush((char *)records_ptr, CACHE_LINE_SIZE);
to_flush_cnt = 0;
} else
++to_flush_cnt;

if (flush) {
uint64_t records_ptr = (uint64_t)(&records[i + 1]);

int remainder = records_ptr % CACHE_LINE_SIZE;
bool do_flush = (remainder == 0) ||
((((int)(remainder + sizeof(entry)) /
CACHE_LINE_SIZE) == 1) &&
((remainder + sizeof(entry)) %
CACHE_LINE_SIZE) != 0);
if (do_flush) {
clflush((char *)records_ptr, CACHE_LINE_SIZE);
to_flush_cnt = 0;
} else
++to_flush_cnt;
}
} else {
records[i + 1].ptr = records[i].ptr;
records[i + 1].key.ikey = key;
records[i + 1].ptr = ptr;

clflush((char *)&records[i + 1], sizeof(entry));
if (flush)
clflush((char *)&records[i + 1], sizeof(entry));
inserted = 1;
break;
}
Expand All @@ -601,7 +602,8 @@ class page {
records[0].ptr = (char *)hdr.leftmost_ptr;
records[0].key.ikey = key;
records[0].ptr = ptr;
clflush((char *)&records[0], sizeof(entry));
if (flush)
clflush((char *)&records[0], sizeof(entry));
}
}

Expand Down Expand Up @@ -634,11 +636,12 @@ class page {
} else {
int i = *num_entries - 1, inserted = 0, to_flush_cnt = 0;
records[*num_entries + 1].ptr = records[*num_entries].ptr;

if ((uint64_t) &
(records[*num_entries + 1].ptr) % CACHE_LINE_SIZE == 0)
clflush((char *)&(records[*num_entries + 1].ptr),
sizeof(char *));
if (flush) {
if ((uint64_t) &
(records[*num_entries + 1].ptr) % CACHE_LINE_SIZE == 0)
clflush((char *)&(records[*num_entries + 1].ptr),
sizeof(char *));
}

// FAST
for (i = *num_entries - 1; i >= 0; i--) {
Expand All @@ -648,26 +651,28 @@ class page {
records[i + 1].ptr = records[i].ptr;
records[i + 1].key.skey = records[i].key.skey;

uint64_t records_ptr = (uint64_t)(&records[i + 1]);

int remainder = records_ptr % CACHE_LINE_SIZE;
bool do_flush =
(remainder == 0) ||
((((int)(remainder + sizeof(entry)) /
CACHE_LINE_SIZE) == 1) &&
((remainder + sizeof(entry)) % CACHE_LINE_SIZE) != 0);
if (do_flush) {
clflush((char *)records_ptr, CACHE_LINE_SIZE);
to_flush_cnt = 0;
} else
++to_flush_cnt;

if (flush) {
uint64_t records_ptr = (uint64_t)(&records[i + 1]);

int remainder = records_ptr % CACHE_LINE_SIZE;
bool do_flush = (remainder == 0) ||
((((int)(remainder + sizeof(entry)) /
CACHE_LINE_SIZE) == 1) &&
((remainder + sizeof(entry)) %
CACHE_LINE_SIZE) != 0);
if (do_flush) {
clflush((char *)records_ptr, CACHE_LINE_SIZE);
to_flush_cnt = 0;
} else
++to_flush_cnt;
}
} else {
records[i + 1].ptr = records[i].ptr;
records[i + 1].key.skey = key;
records[i + 1].ptr = ptr;

clflush((char *)&records[i + 1], sizeof(entry));
if (flush)
clflush((char *)&records[i + 1], sizeof(entry));
inserted = 1;
break;
}
Expand All @@ -676,7 +681,8 @@ class page {
records[0].ptr = (char *)hdr.leftmost_ptr;
records[0].key.skey = key;
records[0].ptr = ptr;
clflush((char *)&records[0], sizeof(entry));
if (flush)
clflush((char *)&records[0], sizeof(entry));
}
}

Expand Down Expand Up @@ -753,7 +759,7 @@ class page {
} else { // FAIR
// overflow
// create a new node
page *sibling = new page(hdr.level); // 重载了new运算符
page *sibling = new page(hdr.level);
register int m = (int)ceil(num_entries / 2);
uint64_t split_key = records[m].key.ikey;

Expand Down Expand Up @@ -1888,15 +1894,13 @@ void btree::setNewRoot(char *new_root) {

key_item *btree::make_key_item(char *key, size_t key_len, bool flush) {
void *aligned_alloc;
#ifdef USE_PMDK
#else
posix_memalign(&aligned_alloc, 64, sizeof(key_item) + key_len);
#endif
key_item *new_key = (key_item *)aligned_alloc;
new_key->key_len = key_len;
memcpy(new_key->key, key, key_len); // copy including NULL character

clflush((char *)new_key, sizeof(key_item) + key_len);
if (flush)
clflush((char *)new_key, sizeof(key_item) + key_len);

return new_key;
}
Expand Down
Loading

0 comments on commit 8976c6c

Please sign in to comment.