Skip to content

Commit

Permalink
Test: Mock multiple compute service to run MPPTasks. (#5573)
Browse files Browse the repository at this point in the history
ref #4609
  • Loading branch information
ywqzzy authored Aug 23, 2022
1 parent 6b7b360 commit 112a51e
Show file tree
Hide file tree
Showing 30 changed files with 1,048 additions and 257 deletions.
101 changes: 101 additions & 0 deletions dbms/src/Debug/MockComputeServerManager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <Debug/MockComputeServerManager.h>
#include <fmt/core.h>

namespace DB
{
namespace ErrorCodes
{
extern const int IP_ADDRESS_NOT_ALLOWED;
} // namespace ErrorCodes
namespace tests
{
void MockComputeServerManager::addServer(String addr)
{
MockServerConfig config;
for (const auto & server : server_config_map)
{
RUNTIME_CHECK_MSG(
server.second.addr != addr,
"Already register mock compute server with addr = {}",
addr);
}
config.partition_id = server_config_map.size();
config.addr = addr;
server_config_map[config.partition_id] = config;
}

void MockComputeServerManager::startServers(const LoggerPtr & log_ptr, Context & global_context)
{
global_context.setMPPTest();
for (const auto & server_config : server_config_map)
{
TiFlashSecurityConfig security_config;
TiFlashRaftConfig raft_config;
raft_config.flash_server_addr = server_config.second.addr;
Poco::AutoPtr<Poco::Util::LayeredConfiguration> config = new Poco::Util::LayeredConfiguration;
addServer(server_config.first, std::make_unique<FlashGrpcServerHolder>(global_context, *config, security_config, raft_config, log_ptr));
}

prepareMockMPPServerInfo();
}

void MockComputeServerManager::setMockStorage(MockStorage & mock_storage)
{
for (const auto & server : server_map)
{
server.second->setMockStorage(mock_storage);
}
}

void MockComputeServerManager::reset()
{
server_map.clear();
server_config_map.clear();
}

MockMPPServerInfo MockComputeServerManager::getMockMPPServerInfo(size_t partition_id)
{
return {server_config_map[partition_id].partition_id, server_config_map.size()};
}

std::unordered_map<size_t, MockServerConfig> & MockComputeServerManager::getServerConfigMap()
{
return server_config_map;
}

void MockComputeServerManager::prepareMockMPPServerInfo()
{
for (const auto & server : server_map)
{
server.second->setMockMPPServerInfo(getMockMPPServerInfo(server.first));
}
}

void MockComputeServerManager::resetMockMPPServerInfo(size_t partition_num)
{
size_t i = 0;
for (const auto & server : server_map)
{
server.second->setMockMPPServerInfo({i++, partition_num});
}
}

void MockComputeServerManager::addServer(size_t partition_id, std::unique_ptr<FlashGrpcServerHolder> server)
{
server_map[partition_id] = std::move(server);
}
} // namespace tests
} // namespace DB
57 changes: 57 additions & 0 deletions dbms/src/Debug/MockComputeServerManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Debug/MockStorage.h>
#include <Server/FlashGrpcServerHolder.h>

#include <unordered_map>

namespace DB::tests
{

/** Hold Mock Compute Server to manage the lifetime of them.
* Maintains Mock Compute Server info.
*/
class MockComputeServerManager : public ext::Singleton<MockComputeServerManager>
{
public:
/// register an server to run.
void addServer(String addr);

/// call startServers to run all servers in current test.
void startServers(const LoggerPtr & log_ptr, Context & global_context);

/// set MockStorage for Compute Server in order to mock input columns.
void setMockStorage(MockStorage & mock_storage);

/// stop all servers.
void reset();

MockMPPServerInfo getMockMPPServerInfo(size_t partition_id);

std::unordered_map<size_t, MockServerConfig> & getServerConfigMap();

void resetMockMPPServerInfo(size_t partition_num);

private:
void addServer(size_t partition_id, std::unique_ptr<FlashGrpcServerHolder> server);
void prepareMockMPPServerInfo();

private:
std::unordered_map<size_t, std::unique_ptr<FlashGrpcServerHolder>> server_map;
std::unordered_map<size_t, MockServerConfig> server_config_map;
};
} // namespace DB::tests
31 changes: 31 additions & 0 deletions dbms/src/Debug/MockServerInfo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include <common/types.h>

namespace DB::tests
{
struct MockServerConfig
{
String addr;
size_t partition_id;
};

struct MockMPPServerInfo
{
size_t partition_id;
size_t partition_num;
};
} // namespace DB::tests
153 changes: 153 additions & 0 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <Debug/MockStorage.h>

namespace DB::tests
{
void MockStorage::addTableSchema(const String & name, const MockColumnInfoVec & columnInfos)
{
name_to_id_map[name] = MockTableIdGenerator::instance().nextTableId();
table_schema[getTableId(name)] = columnInfos;
}

void MockStorage::addTableData(const String & name, const ColumnsWithTypeAndName & columns)
{
table_columns[getTableId(name)] = columns;
}

Int64 MockStorage::getTableId(const String & name)
{
if (name_to_id_map.find(name) != name_to_id_map.end())
{
return name_to_id_map[name];
}
throw Exception(fmt::format("Failed to get table id by table name '{}'", name));
}

bool MockStorage::tableExists(Int64 table_id)
{
return table_schema.find(table_id) != table_schema.end();
}

ColumnsWithTypeAndName MockStorage::getColumns(Int64 table_id)
{
if (tableExists(table_id))
{
return table_columns[table_id];
}
throw Exception(fmt::format("Failed to get columns by table_id '{}'", table_id));
}

MockColumnInfoVec MockStorage::getTableSchema(const String & name)
{
if (tableExists(getTableId(name)))
{
return table_schema[getTableId(name)];
}
throw Exception(fmt::format("Failed to get table schema by table name '{}'", name));
}

/// for exchange receiver
void MockStorage::addExchangeSchema(const String & exchange_name, const MockColumnInfoVec & columnInfos)
{
exchange_schemas[exchange_name] = columnInfos;
}

void MockStorage::addExchangeData(const String & exchange_name, const ColumnsWithTypeAndName & columns)
{
exchange_columns[exchange_name] = columns;
}

bool MockStorage::exchangeExists(const String & executor_id)
{
return exchange_schemas.find(executor_id_to_name_map[executor_id]) != exchange_schemas.end();
}

bool MockStorage::exchangeExistsWithName(const String & name)
{
return exchange_schemas.find(name) != exchange_schemas.end();
}

ColumnsWithTypeAndName MockStorage::getExchangeColumns(const String & executor_id)
{
if (exchangeExists(executor_id))
{
return exchange_columns[executor_id_to_name_map[executor_id]];
}
throw Exception(fmt::format("Failed to get exchange columns by executor_id '{}'", executor_id));
}

void MockStorage::addExchangeRelation(const String & executor_id, const String & exchange_name)
{
executor_id_to_name_map[executor_id] = exchange_name;
}

MockColumnInfoVec MockStorage::getExchangeSchema(const String & exchange_name)
{
if (exchangeExistsWithName(exchange_name))
{
return exchange_schemas[exchange_name];
}
throw Exception(fmt::format("Failed to get exchange schema by exchange name '{}'", exchange_name));
}

// use this function to determine where to cut the columns,
// and how many rows are needed for each partition of MPP task.
CutColumnInfo getCutColumnInfo(size_t rows, Int64 partition_id, Int64 partition_num)
{
int start, per_rows, rows_left, cur_rows;
per_rows = rows / partition_num;
rows_left = rows - per_rows * partition_num;
if (partition_id >= rows_left)
{
start = (per_rows + 1) * rows_left + (partition_id - rows_left) * per_rows;
cur_rows = per_rows;
}
else
{
start = (per_rows + 1) * partition_id;
cur_rows = per_rows + 1;
}
return {start, cur_rows};
}

ColumnsWithTypeAndName MockStorage::getColumnsForMPPTableScan(Int64 table_id, Int64 partition_id, Int64 partition_num)
{
if (tableExists(table_id))
{
auto columns_with_type_and_name = table_columns[table_id];
size_t rows = 0;
for (const auto & col : columns_with_type_and_name)
{
if (rows == 0)
rows = col.column->size();
assert(rows == col.column->size());
}

CutColumnInfo cut_info = getCutColumnInfo(rows, partition_id, partition_num);

ColumnsWithTypeAndName res;
for (const auto & column_with_type_and_name : columns_with_type_and_name)
{
res.push_back(
ColumnWithTypeAndName(
column_with_type_and_name.column->cut(cut_info.first, cut_info.second),
column_with_type_and_name.type,
column_with_type_and_name.name));
}
return res;
}
throw Exception(fmt::format("Failed to get table columns by table_id '{}'", table_id));
}
} // namespace DB::tests
Loading

0 comments on commit 112a51e

Please sign in to comment.