Skip to content

Commit

Permalink
fix(interactive): Implement HttpIrMetaReader to Get Meta Data From …
Browse files Browse the repository at this point in the history
…Remote Http Service (#3908)

1. Implement `HttpIrMetaReader` to get meta data from remote http
service.
2. Implement `DynamicIrMetaFetcher` to fetch schema or statistics
periodically, the interval can be set by configurations.

---------

Co-authored-by: xiaolei.zl <[email protected]>
Co-authored-by: BingqingLyu <[email protected]>
Co-authored-by: Longbin Lai <[email protected]>
  • Loading branch information
4 people authored Jun 26, 2024
1 parent d6017c7 commit 02c3bfa
Show file tree
Hide file tree
Showing 34 changed files with 713 additions and 111 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,18 @@ jobs:
./tests/hqps/query_test ${GS_TEST_DIR}/flex/ldbc-sf01-long-date/audit_graph_schema.yaml \
/tmp/csr-data-dir/
- name: Test get graph meta from admin service
env:
GS_TEST_DIR: ${{ github.workspace }}/gstest
INTERACTIVE_WORKSPACE: /tmp/interactive_workspace
run: |
cd ${GITHUB_WORKSPACE}/flex/tests/hqps
sed -i 's/default_graph: ldbc/default_graph: modern_graph/g' ./engine_config_test.yaml
pip3 install argparse
pip3 install neo4j
bash hqps_compiler_get_meta_test.sh ${INTERACTIVE_WORKSPACE} ./engine_config_test.yaml
sed -i 's/default_graph: modern_graph/default_graph: ldbc/g' ./engine_config_test.yaml
- name: Run codegen test.
env:
GS_TEST_DIR: ${{ github.workspace }}/gstest
Expand Down
56 changes: 52 additions & 4 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,13 @@ seastar::future<admin_query_result> admin_actor::run_get_graph_meta(
add_runnable_info(plugin_meta);
}
auto& graph_meta = meta_res.value();
graph_meta.plugin_metas = all_plugin_metas;
// There can also be procedures that builtin in the graph meta.
for (auto& plugin_meta : graph_meta.plugin_metas) {
add_runnable_info(plugin_meta);
}
graph_meta.plugin_metas.insert(graph_meta.plugin_metas.end(),
all_plugin_metas.begin(),
all_plugin_metas.end());
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(std::move(graph_meta.ToJson())));
} else {
Expand Down Expand Up @@ -694,6 +700,12 @@ seastar::future<admin_query_result> admin_actor::get_procedures_by_graph_name(
for (auto& plugin_meta : all_plugin_metas) {
add_runnable_info(plugin_meta);
}
for (auto& plugin_meta : graph_meta_res.value().plugin_metas) {
add_runnable_info(plugin_meta);
}
all_plugin_metas.insert(all_plugin_metas.end(),
graph_meta_res.value().plugin_metas.begin(),
graph_meta_res.value().plugin_metas.end());
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(to_json_str(all_plugin_metas)));
} else {
Expand Down Expand Up @@ -1123,13 +1135,49 @@ seastar::future<admin_query_result> admin_actor::service_status(
res["bolt_port"] = hqps_service.get_service_config().bolt_port;
res["gremlin_port"] = hqps_service.get_service_config().gremlin_port;
if (running_graph_res.ok()) {
auto graph_meta =
auto graph_meta_res =
metadata_store_->GetGraphMeta(running_graph_res.value());
if (graph_meta.ok()) {
res["graph"] = nlohmann::json::parse(graph_meta.value().ToJson());
if (graph_meta_res.ok()) {
auto& graph_meta = graph_meta_res.value();
// Add the plugin meta.
auto get_all_procedure_res =
metadata_store_->GetAllPluginMeta(running_graph_res.value());
if (get_all_procedure_res.ok()) {
VLOG(10) << "Successfully get all procedures: "
<< get_all_procedure_res.value().size();
auto& all_plugin_metas = get_all_procedure_res.value();
VLOG(10) << "original all plugins : " << all_plugin_metas.size();
for (auto& plugin_meta : all_plugin_metas) {
add_runnable_info(plugin_meta);
}
for (auto& plugin_meta : graph_meta.plugin_metas) {
add_runnable_info(plugin_meta);
}

VLOG(10) << "original graph meta: " << graph_meta.plugin_metas.size();
for (auto& plugin_meta : all_plugin_metas) {
if (plugin_meta.runnable) {
graph_meta.plugin_metas.emplace_back(plugin_meta);
}
}
VLOG(10) << "got graph meta: " << graph_meta.ToJson();
res["graph"] = nlohmann::json::parse(graph_meta.ToJson());
} else {
LOG(ERROR) << "Fail to get all procedures: "
<< get_all_procedure_res.status().error_message();
return seastar::make_exception_future<admin_query_result>(
get_all_procedure_res.status());
}
} else {
LOG(ERROR) << "Fail to get graph meta: "
<< graph_meta_res.status().error_message();
res["graph"] = {};
return seastar::make_exception_future<admin_query_result>(
graph_meta_res.status());
}
} else {
res["graph"] = {};
LOG(INFO) << "No graph is running";
}
res["start_time"] = hqps_service.get_start_time();
} else {
Expand Down
3 changes: 2 additions & 1 deletion flex/engines/http_server/service/hqps_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ bool HQPSService::start_compiler_subprocess(
std::stringstream ss;
ss << "java -cp " << interactive_class_path;
if (!graph_schema_path.empty()) {
ss << " -Dgraph.schema=" << graph_schema_path;
ss << " -Dgraph.schema=http://localhost:" << service_config_.admin_port
<< "/v1/service/status";
}
ss << " " << COMPILER_SERVER_CLASS_NAME;
ss << " " << service_config_.engine_config_path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def __init__(self, uri: str):
# get service port
service_status = self.get_service_status()
if not service_status.is_ok():
raise Exception("Failed to get service status")
raise Exception("Failed to get service status: ", service_status.get_status_message())
service_port = service_status.get_value().hqps_port
# replace the port in uri
uri = uri.split(":")
Expand Down
Empty file.
65 changes: 62 additions & 3 deletions flex/interactive/sdk/python/test/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def tearDown(self):
print("delete graph: ", rep2)

def test_example(self):
self.createGraph()
self._graph_id = self.createGraph()
self.bulkLoading()
self.waitJobFinish()
self.list_graph()
Expand All @@ -103,6 +103,7 @@ def test_example(self):
self.createCypherProcedure()
self.createCppProcedure()
self.restart()
self.restartOnNewGraph()
self.getStatistics()
self.callProcedure()
self.callProcedureWithHttp()
Expand Down Expand Up @@ -154,8 +155,7 @@ def createGraph(self):
create_graph.var_schema = create_schema
resp = self._sess.create_graph(create_graph)
assert resp.is_ok()
self._graph_id = resp.get_value().graph_id
print("create graph: ", self._graph_id)
return resp.get_value().graph_id

def bulkLoading(self):
assert os.environ.get("FLEX_DATA_DIR") is not None
Expand Down Expand Up @@ -263,6 +263,65 @@ def restart(self):
print("restart: ", resp.get_value())
# wait 5 seconds
time.sleep(5)
# get service status
resp = self._sess.get_service_status()
assert resp.is_ok()
print("get service status: ", resp.get_value())

def restartOnNewGraph(self):
original_graph_id = None
status_res = self._sess.get_service_status()
assert status_res.is_ok()
status = status_res.get_value()
if status.status == "Running":
if status.graph is not None and status.graph.id is not None:
original_graph_id = status.graph.id
else:
raise Exception("service status error, graph id is None")
elif status.status == "Stopped":
pass
else:
raise Exception("service status error " + status)
assert original_graph_id is not None
# create new graph
new_graph_id = self.createGraph()
# start service
print("start service on new graph: ", new_graph_id)
start_service_res = self._sess.start_service(
start_service_request=StartServiceRequest(graph_id=new_graph_id)
)
assert start_service_res.is_ok()
# restart service
print("restart service on new graph: ", new_graph_id)
restart_res = self._sess.restart_service()
assert restart_res.is_ok()
# get status
print("get service status: ")
status_res = self._sess.get_service_status()
assert status_res.is_ok()
print("get service status: ", status_res.get_value().status)
# stop
print("stop service: ")
stop_res = self._sess.stop_service()
assert stop_res.is_ok()
# get status
print("get service status: ")
status_res = self._sess.get_service_status()
assert status_res.is_ok()
print("get service status: ", status_res.get_value().status)
assert status_res.get_value().status == "Stopped"
# after stop, we should be able to delete the graph
print("delete graph: ", new_graph_id)
delete_res = self._sess.delete_graph(new_graph_id)
assert delete_res.is_ok()
# start on original graph
print("start service on original graph: ", original_graph_id)
start_service_res = self._sess.start_service(
start_service_request=StartServiceRequest(graph_id=original_graph_id)
)
assert start_service_res.is_ok()
print("finish restartOnNewGraph")
time.sleep(5)

def getStatistics(self):
resp = self._sess.get_graph_statistics(self._graph_id)
Expand Down
2 changes: 2 additions & 0 deletions flex/openapi/openapi_interactive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,8 @@ components:
type: boolean
creation_time:
type: integer
update_time:
type: integer
UpdateProcedureRequest:
x-body-name: update_procedure_request
type: object
Expand Down
15 changes: 15 additions & 0 deletions flex/storages/metadata/graph_meta_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ PluginMeta PluginMeta::FromJson(const nlohmann::json& json) {
}
if (json.contains("name")) {
meta.name = json["name"].get<std::string>();
if (meta.id.empty()) {
meta.id = meta.name;
}
}
if (json.contains("bound_graph")) {
meta.bound_graph = json["bound_graph"].get<GraphId>();
Expand All @@ -155,6 +158,8 @@ PluginMeta PluginMeta::FromJson(const nlohmann::json& json) {
}
if (json.contains("type")) {
meta.type = json["type"].get<std::string>();
} else {
meta.type = "cpp"; // default is cpp
}
if (json.contains("option")) {
meta.setOptionFromJsonString(json["option"].dump());
Expand Down Expand Up @@ -337,6 +342,11 @@ CreateGraphMetaRequest CreateGraphMetaRequest::FromJson(
} else {
request.creation_time = GetCurrentTimeStamp();
}
if (json.contains("stored_procedures")) {
for (auto& plugin : json["stored_procedures"]) {
request.plugin_metas.push_back(PluginMeta::FromJson(plugin));
}
}
return request;
}

Expand All @@ -351,6 +361,11 @@ std::string CreateGraphMetaRequest::ToString() const {
json["data_update_time"] = 0;
}
json["creation_time"] = creation_time;
json["stored_procedures"] = nlohmann::json::array();
for (auto& plugin_meta : plugin_metas) {
json["stored_procedures"].push_back(
nlohmann::json::parse(plugin_meta.ToJson()));
}
return json.dump();
}

Expand Down
2 changes: 2 additions & 0 deletions flex/storages/metadata/graph_meta_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ struct CreateGraphMetaRequest {
std::optional<uint64_t> data_update_time;
int64_t creation_time;

std::vector<PluginMeta> plugin_metas;

static CreateGraphMetaRequest FromJson(const std::string& json_str);

std::string ToString() const;
Expand Down
8 changes: 8 additions & 0 deletions flex/tests/hqps/engine_config_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ compiler:
- FilterIntoJoinRule
- FilterMatchRule
- NotMatchToAntiJoinRule
meta:
reader:
schema:
uri: http://localhost:7777/v1/service/status
interval: 1000 # ms
statistics:
uri: http://localhost:7777/v1/graph/%s/statistics
interval: 86400000 # ms
endpoint:
default_listen_address: localhost
bolt_connector:
Expand Down
99 changes: 99 additions & 0 deletions flex/tests/hqps/hqps_compiler_get_meta_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#!/bin/bash
# Copyright 2020 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
FLEX_HOME=${SCRIPT_DIR}/../../
SERVER_BIN=${FLEX_HOME}/build/bin/interactive_server
GIE_HOME=${FLEX_HOME}/../interactive_engine/
ADMIN_PORT=7777
QUERY_PORT=10000

#
if [ ! $# -eq 2 ]; then
echo "only receives: $# args, need 2"
echo "Usage: $0 <INTERACTIVE_WORKSPACE> <ENGINE_CONFIG>"
exit 1
fi

INTERACTIVE_WORKSPACE=$1
ENGINE_CONFIG_PATH=$2
if [ ! -d ${INTERACTIVE_WORKSPACE} ]; then
echo "INTERACTIVE_WORKSPACE: ${INTERACTIVE_WORKSPACE} not exists"
exit 1
fi
if [ ! -f ${ENGINE_CONFIG_PATH} ]; then
echo "ENGINE_CONFIG: ${ENGINE_CONFIG_PATH} not exists"
exit 1
fi


RED='\033[0;31m'
GREEN='\033[0;32m'
NC='\033[0m' # No Color
err() {
echo -e "${RED}[$(date +'%Y-%m-%d %H:%M:%S')] -ERROR- $* ${NC}" >&2
}

info() {
echo -e "${GREEN}[$(date +'%Y-%m-%d %H:%M:%S')] -INFO- $* ${NC}"
}


kill_service(){
info "Kill Service first"
ps -ef | grep "interactive_server" | awk '{print $2}' | xargs kill -9 || true
ps -ef | grep "GraphServer" | awk '{print $2}' | xargs kill -9 || true
sleep 3
# check if service is killed
info "Kill Service success"
}

# kill service when exit
trap kill_service EXIT

# start engine service and load ldbc graph
start_engine_service(){
#check SERVER_BIN exists
if [ ! -f ${SERVER_BIN} ]; then
err "SERVER_BIN not found"
exit 1
fi

cmd="${SERVER_BIN} -c ${ENGINE_CONFIG_PATH} --enable-admin-service true "
cmd="${cmd} -w ${INTERACTIVE_WORKSPACE} --start-compiler true"

echo "Start engine service with command: ${cmd}"
${cmd} &
sleep 10
#check interactive_server is running, if not, exit
ps -ef | grep "interactive_server" | grep -v grep

info "Start engine service success"
}

run_cypher_test() {
# run a simple cypher query: MATCH (n) RETURN count(n)
python3 ./test_count_vertices.py --endpoint neo4j://localhost:7687
}

kill_service
start_engine_service
# comiper service will fail to start, if the graph meta can not be retrieved
run_cypher_test
kill_service




Loading

0 comments on commit 02c3bfa

Please sign in to comment.