Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
liulx20 committed Jan 11, 2025
1 parent 19e2c90 commit 05ecabe
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 125 deletions.
50 changes: 31 additions & 19 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ GraphDB& GraphDB::get() {
return db;
}

QueryCache& GraphDB::getQueryCache() const { return query_cache_; }

Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,
int32_t thread_num, bool warmup, bool memory_only,
bool enable_auto_compaction) {
Expand Down Expand Up @@ -231,12 +229,17 @@ Result<bool> GraphDB::Open(const GraphDBConfig& config) {
timestamp_t ts = this->version_manager_.acquire_update_timestamp();
auto txn = CompactTransaction(this->graph_, this->contexts_[0].logger,
this->version_manager_, ts);
OutputCypherProfiles("./" + std::to_string(ts) + "_");
txn.Commit();
VLOG(10) << "Finish compaction";
}
}
});
}

unlink((work_dir_ + "/statistics.json").c_str());
unlink((work_dir_ + "/.compiler.yaml").c_str());
graph_.generateStatistics(work_dir_);
query_cache_.cache.clear();

return Result<bool>(true);
Expand Down Expand Up @@ -265,8 +268,9 @@ void GraphDB::Close() {
std::fill(app_factories_.begin(), app_factories_.end(), nullptr);
}

ReadTransaction GraphDB::GetReadTransaction(int thread_id) {
return contexts_[thread_id].session.GetReadTransaction();
ReadTransaction GraphDB::GetReadTransaction() {
uint32_t ts = version_manager_.acquire_read_timestamp();
return {graph_, version_manager_, ts};
}

InsertTransaction GraphDB::GetInsertTransaction(int thread_id) {
Expand Down Expand Up @@ -304,21 +308,6 @@ timestamp_t GraphDB::GetLastCompactionTimestamp() const {
return last_compaction_ts_;
}

const MutablePropertyFragment& GraphDB::graph() const { return graph_; }
MutablePropertyFragment& GraphDB::graph() { return graph_; }

const Schema& GraphDB::schema() const { return graph_.schema(); }

std::shared_ptr<ColumnBase> GraphDB::get_vertex_property_column(
uint8_t label, const std::string& col_name) const {
return graph_.get_vertex_property_column(label, col_name);
}

std::shared_ptr<RefColumnBase> GraphDB::get_vertex_id_column(
uint8_t label) const {
return graph_.get_vertex_id_column(label);
}

AppWrapper GraphDB::CreateApp(uint8_t app_type, int thread_id) {
if (app_factories_[app_type] == nullptr) {
LOG(ERROR) << "Stored procedure " << static_cast<int>(app_type)
Expand Down Expand Up @@ -516,4 +505,27 @@ size_t GraphDB::getExecutedQueryNum() const {
return ret;
}

QueryCache& GraphDB::getQueryCache() const { return query_cache_; }

void GraphDB::OutputCypherProfiles(const std::string& prefix) {
runtime::OprTimer read_timer, write_timer;
int session_num = SessionNum();
for (int i = 0; i < session_num; ++i) {
auto read_app_ptr = GetSession(i).GetApp(Schema::CYPHER_READ_PLUGIN_ID);
auto casted_read_app = dynamic_cast<CypherReadApp*>(read_app_ptr);
if (casted_read_app) {
read_timer += casted_read_app->timer();
}

auto write_app_ptr = GetSession(i).GetApp(Schema::CYPHER_WRITE_PLUGIN_ID);
auto casted_write_app = dynamic_cast<CypherWriteApp*>(write_app_ptr);
if (casted_write_app) {
write_timer += casted_write_app->timer();
}
}

read_timer.output(prefix + "read_profile.log");
write_timer.output(prefix + "write_profile.log");
}

} // namespace gs
30 changes: 19 additions & 11 deletions flex/engines/graph_db/database/graph_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <map>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <vector>

Expand Down Expand Up @@ -93,8 +94,6 @@ class GraphDB {

static GraphDB& get();

QueryCache& getQueryCache() const;

/**
* @brief Load the graph from data directory.
* @param schema The schema of graph. It should be the same as the schema,
Expand All @@ -119,7 +118,7 @@ class GraphDB {
*
* @return graph_dir The directory of graph data.
*/
ReadTransaction GetReadTransaction(int thread_id = 0);
ReadTransaction GetReadTransaction();

/** @brief Create a transaction to insert vertices and edges with a default
* allocator.
Expand Down Expand Up @@ -150,15 +149,20 @@ class GraphDB {
*/
UpdateTransaction GetUpdateTransaction(int thread_id = 0);

const MutablePropertyFragment& graph() const;
MutablePropertyFragment& graph();
inline const MutablePropertyFragment& graph() const { return graph_; }
inline MutablePropertyFragment& graph() { return graph_; }

const Schema& schema() const;
inline const Schema& schema() const { return graph_.schema(); }

std::shared_ptr<ColumnBase> get_vertex_property_column(
uint8_t label, const std::string& col_name) const;
inline std::shared_ptr<ColumnBase> get_vertex_property_column(
uint8_t label, const std::string& col_name) const {
return graph_.get_vertex_table(label).get_column(col_name);
}

std::shared_ptr<RefColumnBase> get_vertex_id_column(uint8_t label) const;
inline std::shared_ptr<RefColumnBase> get_vertex_id_column(
uint8_t label) const {
return graph_.get_vertex_id_column(label);
}

AppWrapper CreateApp(uint8_t app_type, int thread_id);

Expand All @@ -172,8 +176,12 @@ class GraphDB {
void UpdateCompactionTimestamp(timestamp_t ts);
timestamp_t GetLastCompactionTimestamp() const;

QueryCache& getQueryCache() const;

std::string work_dir() const { return work_dir_; }

void OutputCypherProfiles(const std::string& prefix);

private:
bool registerApp(const std::string& path, uint8_t index = 0);

Expand All @@ -193,8 +201,6 @@ class GraphDB {

friend class GraphDBSession;

mutable QueryCache query_cache_;

std::string work_dir_;
SessionLocalContext* contexts_;

Expand All @@ -206,6 +212,8 @@ class GraphDB {
std::array<std::string, 256> app_paths_;
std::array<std::shared_ptr<AppFactoryBase>, 256> app_factories_;

mutable QueryCache query_cache_;

std::thread monitor_thread_;
bool monitor_thread_running_;

Expand Down
4 changes: 1 addition & 3 deletions flex/engines/graph_db/runtime/common/columns/edge_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -656,8 +656,7 @@ class SDSLEdgeColumnBuilder : public IContextColumnBuilder {
label_(label),
prop_type_(prop_type),
prop_col_(EdgePropVecBase::make_edge_prop_vec(prop_type)),
sub_types_(sub_types),
cap_(0) {}
sub_types_(sub_types) {}
~SDSLEdgeColumnBuilder() = default;

void reserve(size_t size) override { edges_.reserve(size); }
Expand Down Expand Up @@ -686,7 +685,6 @@ class SDSLEdgeColumnBuilder : public IContextColumnBuilder {
PropertyType prop_type_;
std::shared_ptr<EdgePropVecBase> prop_col_;
std::vector<PropertyType> sub_types_;
size_t cap_;
};

template <typename T>
Expand Down
4 changes: 2 additions & 2 deletions flex/engines/graph_db/runtime/common/rt_any.h
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ class SetImpl : public SetImplBase {
return set_ == (dynamic_cast<const SetImpl<T>&>(p)).set_;
}

void insert(const RTAny& val) {
void insert(const RTAny& val) override {
set_.insert(TypedConverter<T>::to_typed(val));
}
void insert(const T& val) { set_.insert(val); }
Expand Down Expand Up @@ -989,7 +989,7 @@ class SetImpl<VertexRecord> : public SetImplBase {
return set_ == (dynamic_cast<const SetImpl<VertexRecord>&>(p)).set_;
}

void insert(const RTAny& val) {
void insert(const RTAny& val) override {
insert(TypedConverter<VertexRecord>::to_typed(val));
}
void insert(VertexRecord val) {
Expand Down
164 changes: 92 additions & 72 deletions flex/storages/rt_mutable_graph/mutable_property_fragment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ void MutablePropertyFragment::Open(const std::string& work_dir,
// We will reserve the at least 4096 slots for each vertex label
size_t vertex_capacity =
std::max(lf_indexers_[i].capacity(), (size_t) 4096);
if (vertex_capacity >= lf_indexers_[i].size()) {
if (vertex_capacity >= lf_indexers_[i].capacity()) {
lf_indexers_[i].reserve(vertex_capacity);
}
vertex_data_[i].resize(vertex_capacity);
Expand Down Expand Up @@ -389,15 +389,6 @@ const Schema& MutablePropertyFragment::schema() const { return schema_; }

Schema& MutablePropertyFragment::mutable_schema() { return schema_; }

Table& MutablePropertyFragment::get_vertex_table(label_t vertex_label) {
return vertex_data_[vertex_label];
}

const Table& MutablePropertyFragment::get_vertex_table(
label_t vertex_label) const {
return vertex_data_[vertex_label];
}

vid_t MutablePropertyFragment::vertex_num(label_t vertex_label) const {
return static_cast<vid_t>(lf_indexers_[vertex_label].size());
}
Expand Down Expand Up @@ -464,69 +455,98 @@ MutablePropertyFragment::get_incoming_edges_mut(label_t label, vid_t u,
return get_ie_csr(label, neighbor_label, edge_label)->edge_iter_mut(u);
}

CsrBase* MutablePropertyFragment::get_oe_csr(label_t label,
label_t neighbor_label,
label_t edge_label) {
size_t index = label * vertex_label_num_ * edge_label_num_ +
neighbor_label * edge_label_num_ + edge_label;
return oe_[index];
}

const CsrBase* MutablePropertyFragment::get_oe_csr(label_t label,
label_t neighbor_label,
label_t edge_label) const {
size_t index = label * vertex_label_num_ * edge_label_num_ +
neighbor_label * edge_label_num_ + edge_label;
return oe_[index];
}

CsrBase* MutablePropertyFragment::get_ie_csr(label_t label,
label_t neighbor_label,
label_t edge_label) {
size_t index = neighbor_label * vertex_label_num_ * edge_label_num_ +
label * edge_label_num_ + edge_label;
return ie_[index];
}

const CsrBase* MutablePropertyFragment::get_ie_csr(label_t label,
label_t neighbor_label,
label_t edge_label) const {
size_t index = neighbor_label * vertex_label_num_ * edge_label_num_ +
label * edge_label_num_ + edge_label;
return ie_[index];
}

std::shared_ptr<ColumnBase> MutablePropertyFragment::get_vertex_property_column(
uint8_t label, const std::string& prop) const {
return vertex_data_[label].get_column(prop);
}
void MutablePropertyFragment::generateStatistics(
const std::string& work_dir) const {
std::string filename = work_dir + "/statistics.json";
size_t vertex_count = 0;

std::string ss = "\"vertex_type_statistics\": [\n";
size_t vertex_label_num = schema_.vertex_label_num();
for (size_t idx = 0; idx < vertex_label_num; ++idx) {
ss += "{\n\"type_id\": " + std::to_string(idx) + ", \n";
ss += "\"type_name\": \"" + schema_.get_vertex_label_name(idx) + "\", \n";
size_t count = lf_indexers_[idx].size();
ss += "\"count\": " + std::to_string(count) + "\n}";
vertex_count += count;
if (idx != vertex_label_num - 1) {
ss += ", \n";
} else {
ss += "\n";
}
}
ss += "]\n";
size_t edge_count = 0;

size_t edge_label_num = schema_.edge_label_num();
std::vector<std::thread> count_threads;
std::vector<size_t> edge_count_list(dual_csr_list_.size(), 0);
for (size_t src_label = 0; src_label < vertex_label_num; ++src_label) {
const auto& src_label_name = schema_.get_vertex_label_name(src_label);
for (size_t dst_label = 0; dst_label < vertex_label_num; ++dst_label) {
const auto& dst_label_name = schema_.get_vertex_label_name(dst_label);
for (size_t edge_label = 0; edge_label < edge_label_num; ++edge_label) {
const auto& edge_label_name = schema_.get_edge_label_name(edge_label);
if (schema_.exist(src_label_name, dst_label_name, edge_label_name)) {
size_t index = src_label * vertex_label_num * edge_label_num +
dst_label * edge_label_num + edge_label;
if (dual_csr_list_[index] != NULL) {
count_threads.emplace_back([&edge_count_list, index, this] {
edge_count_list[index] = dual_csr_list_[index]->EdgeNum();
});
}
}
}
}
}
for (auto& t : count_threads) {
t.join();
}
ss += ",\n";
ss += "\"edge_type_statistics\": [";

for (size_t edge_label = 0; edge_label < edge_label_num; ++edge_label) {
const auto& edge_label_name = schema_.get_edge_label_name(edge_label);

ss += "{\n\"type_id\": " + std::to_string(edge_label) + ", \n";
ss += "\"type_name\": \"" + edge_label_name + "\", \n";
ss += "\"vertex_type_pair_statistics\": [\n";
bool first = true;
std::string props_content{};
for (size_t src_label = 0; src_label < vertex_label_num; ++src_label) {
const auto& src_label_name = schema_.get_vertex_label_name(src_label);
for (size_t dst_label = 0; dst_label < vertex_label_num; ++dst_label) {
const auto& dst_label_name = schema_.get_vertex_label_name(dst_label);
size_t index = src_label * vertex_label_num * edge_label_num +
dst_label * edge_label_num + edge_label;
if (schema_.exist(src_label_name, dst_label_name, edge_label_name)) {
if (!first) {
ss += ",\n";
}
first = false;
ss += "{\n\"source_vertex\" : \"" + src_label_name + "\", \n";
ss += "\"destination_vertex\" : \"" + dst_label_name + "\", \n";
ss += "\"count\" : " + std::to_string(edge_count_list[index]) + "\n";
edge_count += edge_count_list[index];
ss += "}";
}
}
}

std::shared_ptr<RefColumnBase> MutablePropertyFragment::get_vertex_id_column(
uint8_t label) const {
if (lf_indexers_[label].get_type() == PropertyType::kInt64) {
return std::make_shared<TypedRefColumn<int64_t>>(
dynamic_cast<const TypedColumn<int64_t>&>(
lf_indexers_[label].get_keys()));
} else if (lf_indexers_[label].get_type() == PropertyType::kInt32) {
return std::make_shared<TypedRefColumn<int32_t>>(
dynamic_cast<const TypedColumn<int32_t>&>(
lf_indexers_[label].get_keys()));
} else if (lf_indexers_[label].get_type() == PropertyType::kUInt64) {
return std::make_shared<TypedRefColumn<uint64_t>>(
dynamic_cast<const TypedColumn<uint64_t>&>(
lf_indexers_[label].get_keys()));
} else if (lf_indexers_[label].get_type() == PropertyType::kUInt32) {
return std::make_shared<TypedRefColumn<uint32_t>>(
dynamic_cast<const TypedColumn<uint32_t>&>(
lf_indexers_[label].get_keys()));
} else if (lf_indexers_[label].get_type() == PropertyType::kStringView) {
return std::make_shared<TypedRefColumn<std::string_view>>(
dynamic_cast<const TypedColumn<std::string_view>&>(
lf_indexers_[label].get_keys()));
} else {
LOG(ERROR) << "Unsupported vertex id type: "
<< lf_indexers_[label].get_type();
return nullptr;
ss += "\n]\n}";
if (edge_label != edge_label_num - 1) {
ss += ", \n";
} else {
ss += "\n";
}
}
ss += "]\n";
{
std::ofstream out(filename);
out << "{\n\"total_vertex_count\": " << vertex_count << ",\n";
out << "\"total_edge_count\": " << edge_count << ",\n";
out << ss;
out << "}\n";
out.close();
}
}

Check notice on line 552 in flex/storages/rt_mutable_graph/mutable_property_fragment.cc

View check run for this annotation

codefactor.io / CodeFactor

flex/storages/rt_mutable_graph/mutable_property_fragment.cc#L458-L552

Complex Method
Expand Down
Loading

0 comments on commit 05ecabe

Please sign in to comment.