Skip to content

Commit

Permalink
doAllEvenlyPartition for ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
AdityaAtulTewari committed May 24, 2024
1 parent 1bbc9e0 commit de77f47
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 34 deletions.
39 changes: 20 additions & 19 deletions include/pando-lib-galois/import/ingest_rmat_el.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ struct ELEdge {
};

void loadELFilePerThread(
galois::WaitGroup::HandleType wgh, pando::Array<char> filename, std::uint64_t segmentsPerThread,
std::uint64_t numThreads, std::uint64_t threadID,
galois::ThreadLocalVector<pando::Vector<ELEdge>> localReadEdges,
pando::Array<char> filename, std::uint64_t segmentsPerThread, std::uint64_t numThreads,
std::uint64_t threadID, galois::ThreadLocalVector<pando::Vector<ELEdge>> localReadEdges,
galois::ThreadLocalStorage<galois::HashTable<std::uint64_t, std::uint64_t>> perThreadRename,
std::uint64_t numVertices);

Expand Down Expand Up @@ -70,19 +69,19 @@ ReturnType initializeELDACSR(pando::Array<char> filename, std::uint64_t numVerti
PANDO_CHECK(fmap(hashRef, initialize, 0));
}

std::uint64_t hosts = static_cast<std::uint64_t>(pando::getPlaceDims().node.id);

galois::WaitGroup wg;
PANDO_CHECK(wg.initialize(numThreads));
PANDO_CHECK(wg.initialize(0));
auto wgh = wg.getHandle();

PANDO_MEM_STAT_NEW_KERNEL("loadELFilePerThread Start");
for (std::uint64_t i = 0; i < numThreads; i++) {
pando::Place place = pando::Place{pando::NodeIndex{static_cast<std::int64_t>(i % hosts)},
pando::anyPod, pando::anyCore};
PANDO_CHECK(pando::executeOn(place, &galois::loadELFilePerThread, wgh, filename, 1, numThreads,
i, localReadEdges, perThreadRename, numVertices));
}
auto elFileTuple = galois::make_tpl(filename, 1, localReadEdges, perThreadRename, numVertices);
PANDO_CHECK(galois::doAllEvenlyPartition(
wgh, elFileTuple, numThreads,
+[](decltype(elFileTuple) elFileTuple, uint64_t i, uint64_t numThreads) {
auto [filename, striping, localReadEdges, perThreadRename, numVertices] = elFileTuple;
galois::loadELFilePerThread(filename, striping, numThreads, i, localReadEdges,
perThreadRename, numVertices);
}));
PANDO_CHECK(wg.wait());
PANDO_MEM_STAT_NEW_KERNEL("loadELFilePerThread End");

Expand Down Expand Up @@ -137,19 +136,21 @@ ReturnType initializeELDLCSR(pando::Array<char> filename, std::uint64_t numVerti
const std::uint64_t numVHosts = hosts * vHostsScaleFactor;

galois::WaitGroup wg;
PANDO_CHECK(wg.initialize(numThreads));
PANDO_CHECK(wg.initialize(0));
auto wgh = wg.getHandle();

galois::DAccumulator<std::uint64_t> totVerts;
PANDO_CHECK(totVerts.initialize());

PANDO_MEM_STAT_NEW_KERNEL("loadELFilePerThread Start");
for (std::uint64_t i = 0; i < numThreads; i++) {
pando::Place place = pando::Place{pando::NodeIndex{static_cast<std::int64_t>(i % hosts)},
pando::anyPod, pando::anyCore};
PANDO_CHECK(pando::executeOn(place, &galois::loadELFilePerThread, wgh, filename, 1, numThreads,
i, localReadEdges, perThreadRename, numVertices));
}
auto elFileTuple = galois::make_tpl(filename, 1, localReadEdges, perThreadRename, numVertices);
PANDO_CHECK(galois::doAllEvenlyPartition(
wgh, elFileTuple, numThreads,
+[](decltype(elFileTuple) elFileTuple, uint64_t i, uint64_t numThreads) {
auto [filename, striping, localReadEdges, perThreadRename, numVertices] = elFileTuple;
galois::loadELFilePerThread(filename, striping, numThreads, i, localReadEdges,
perThreadRename, numVertices);
}));

PANDO_CHECK(wg.wait());
PANDO_MEM_STAT_NEW_KERNEL("loadELFilePerThread End");
Expand Down
2 changes: 1 addition & 1 deletion include/pando-lib-galois/loops/do_all.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ class DoAll {
static pando::Status doAllEvenlyPartition(WaitGroup::HandleType wgh, State s, uint64_t workItems,
const F& func) {
constexpr SchedulerPolicy EVENLY_PARITION_SCHEDULER_POLICY = SchedulerPolicy::CORE_STRIPE;
LoopLocalSchedulerStruct<EVENLY_PARITION_SCHEDULER_POLICY> loopLocal;
LoopLocalSchedulerStruct<EVENLY_PARITION_SCHEDULER_POLICY> loopLocal{};
pando::Status err = pando::Status::Success;
if (workItems == 0) {
return err;
Expand Down
6 changes: 2 additions & 4 deletions src/ingest_rmat_el.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ auto generateRMATParser(
}

void galois::loadELFilePerThread(
galois::WaitGroup::HandleType wgh, pando::Array<char> filename, std::uint64_t segmentsPerThread,
std::uint64_t numThreads, std::uint64_t threadID,
galois::ThreadLocalVector<pando::Vector<ELEdge>> localReadEdges,
pando::Array<char> filename, std::uint64_t segmentsPerThread, std::uint64_t numThreads,
std::uint64_t threadID, galois::ThreadLocalVector<pando::Vector<ELEdge>> localReadEdges,
ThreadLocalStorage<HashTable<std::uint64_t, std::uint64_t>> perThreadRename,
std::uint64_t numVertices) {
auto parser =
generateRMATParser(localReadEdges.getLocal(), perThreadRename.getLocal(), numVertices);
PANDO_CHECK(
internal::loadGraphFilePerThread(filename, segmentsPerThread, numThreads, threadID, parser));
wgh.done();
}

const char* galois::elGetOne(const char* line, std::uint64_t& val) {
Expand Down
20 changes: 10 additions & 10 deletions test/import/test_cusp_importer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -753,17 +753,17 @@ TEST(loadGraphFilePerThread, loadEdgeList) {
}

galois::WaitGroup wg;
EXPECT_EQ(pando::Status::Success, wg.initialize(numThreads));
EXPECT_EQ(pando::Status::Success, wg.initialize(0));
auto wgh = wg.getHandle();
for (uint64_t i = 0; i < numThreads; i++) {
std::int64_t nodeId =
static_cast<std::int64_t>(i % static_cast<std::uint64_t>(pando::getPlaceDims().node.id));
pando::Place place = pando::Place{pando::NodeIndex{nodeId}, pando::anyPod, pando::anyCore};
pando::Status err =
pando::executeOn(place, &galois::loadELFilePerThread, wgh, filename, segmentsPerThread,
numThreads, i, localEdges, perThreadRename, numVertices);
EXPECT_EQ(err, pando::Status::Success);
}
auto elFileTuple =
galois::make_tpl(filename, segmentsPerThread, localEdges, perThreadRename, numVertices);
PANDO_CHECK(galois::doAllEvenlyPartition(
wgh, elFileTuple, numThreads,
+[](decltype(elFileTuple) elFileTuple, uint64_t i, uint64_t numThreads) {
auto [filename, striping, localReadEdges, perThreadRename, numVertices] = elFileTuple;
galois::loadELFilePerThread(filename, striping, numThreads, i, localReadEdges,
perThreadRename, numVertices);
}));
EXPECT_EQ(wg.wait(), pando::Status::Success);

for (galois::HashTable<std::uint64_t, std::uint64_t> hash : perThreadRename) {
Expand Down

0 comments on commit de77f47

Please sign in to comment.