Skip to content

Commit

Permalink
Revert "Yq 3560 Add row dispatcher to dqrun (#9697)" (#10266)
Browse files Browse the repository at this point in the history
  • Loading branch information
maximyurchuk authored Oct 9, 2024
1 parent 2b4c9f9 commit 0d12e54
Show file tree
Hide file tree
Showing 8 changed files with 11 additions and 109 deletions.
7 changes: 3 additions & 4 deletions ydb/core/fq/libs/control_plane_storage/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ PEERDIR(
library/cpp/lwtrace
library/cpp/protobuf/interop
ydb/core/base
ydb/core/external_sources
ydb/core/fq/libs/actors/logging
ydb/core/fq/libs/common
ydb/core/fq/libs/config
Expand All @@ -34,13 +33,13 @@ PEERDIR(
ydb/core/fq/libs/shared_resources
ydb/core/fq/libs/ydb
ydb/core/mon
ydb/library/db_pool
ydb/library/security
ydb/library/yql/providers/s3/path_generator
ydb/library/yql/public/issue
ydb/public/api/protos
ydb/public/sdk/cpp/client/ydb_scheme
ydb/public/sdk/cpp/client/ydb_table
ydb/library/db_pool
ydb/library/yql/providers/s3/path_generator
ydb/library/yql/public/issue
)

YQL_LAST_ABI_VERSION()
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ void Init(
NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg = NYql::NDq::CreateReadActorFactoryConfig(protoConfig.GetGateways().GetS3());

RegisterDqInputTransformLookupActorFactory(*asyncIoFactory);

NYql::TPqGatewayServices pqServices(
yqSharedResources->UserSpaceYdbDriver,
pqCmConnections,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class TLocalServiceHolder {
NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
IMetricsRegistryPtr metricsRegistry,
const std::function<IActor*(void)>& metricsPusherFactory,
bool withSpilling,
TVector<std::pair<TActorId, TActorSetupCmd>>&& additionalLocalServices)
bool withSpilling)
: MetricsRegistry(metricsRegistry
? metricsRegistry
: CreateMetricsRegistry(GetSensorsGroupFor(NSensorComponent::kDq))
Expand Down Expand Up @@ -90,9 +89,6 @@ class TLocalServiceHolder {
NDq::MakeDqLocalFileSpillingServiceID(nodeId),
TActorSetupCmd(spillingActor, TMailboxType::Simple, 0));
}
for (auto& [actorId, setupCmd] : additionalLocalServices) {
ServiceNode->AddLocalService(actorId, std::move(setupCmd));
}

auto statsCollector = CreateStatsCollector(1, *ServiceNode->GetSetup(), MetricsRegistry->GetSensors());

Expand Down Expand Up @@ -252,8 +248,7 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I
NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort,
NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
IMetricsRegistryPtr metricsRegistry,
const std::function<IActor*(void)>& metricsPusherFactory, bool withSpilling,
TVector<std::pair<TActorId, TActorSetupCmd>>&& additionalLocalServices)
const std::function<IActor*(void)>& metricsPusherFactory, bool withSpilling)
{
return MakeHolder<TLocalServiceHolder>(functionRegistry,
compFactory,
Expand All @@ -265,17 +260,15 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I
threads,
metricsRegistry,
metricsPusherFactory,
withSpilling,
std::move(additionalLocalServices));
withSpilling);
}

TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
bool withSpilling, NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
IMetricsRegistryPtr metricsRegistry,
const std::function<IActor*(void)>& metricsPusherFactory,
TVector<std::pair<TActorId, TActorSetupCmd>>&& additionalLocalServices)
const std::function<IActor*(void)>& metricsPusherFactory)
{
int startPort = 31337;
TRangeWalker<int> portWalker(startPort, startPort+100);
Expand All @@ -294,8 +287,7 @@ TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctio
threads,
metricsRegistry,
metricsPusherFactory,
withSpilling,
std::move(additionalLocalServices)),
withSpilling),
CreateDqGateway("[::1]", grpcPort.Addr.GetPort()));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once

#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
Expand All @@ -18,7 +17,6 @@ TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctio
bool withSpilling,
NDq::IDqAsyncIoFactory::TPtr = nullptr, int threads = 16,
IMetricsRegistryPtr metricsRegistry = {},
const std::function<NActors::IActor*(void)>& metricsPusherFactory = {},
TVector<std::pair<NActors::TActorId, NActors::TActorSetupCmd>>&& additionalLocalServices = {});
const std::function<NActors::IActor*(void)>& metricsPusherFactory = {});

} // namespace NYql
52 changes: 1 addition & 51 deletions ydb/library/yql/tools/dqrun/dqrun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,8 @@
#include <ydb/library/yql/public/result_format/yql_result_format_data.h>

#include <ydb/core/fq/libs/actors/database_resolver.h>
#include <ydb/core/fq/libs/config/protos/fq_config.pb.h>
#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h>
#include <ydb/core/fq/libs/init/init.h>
#include <ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h>

#include <ydb/core/util/pb.h>

#include <yt/cpp/mapreduce/interface/init.h>
Expand Down Expand Up @@ -184,17 +180,6 @@ void ReadGatewaysConfig(const TString& configFile, TGatewaysConfig* config, THas
}
}

void ReadFqConfig(const TString& fqCfgFile, NFq::NConfig::TConfig* fqConfig) {
if (fqCfgFile.empty()) {
return;
}
auto configData = TFileInput(fqCfgFile).ReadAll();
using ::google::protobuf::TextFormat;
if (!TextFormat::ParseFromString(configData, fqConfig)) {
ythrow yexception() << "Bad format of fq configuration";
}
}

void PatchGatewaysConfig(TGatewaysConfig* config, const TString& mrJobBin, const TString& mrJobUdfsDir,
size_t numThreads, bool keepTemp)
{
Expand Down Expand Up @@ -498,35 +483,9 @@ int RunProgram(TProgramPtr program, const TRunOptions& options, const THashMap<T
return 0;
}

void InitFq(const NFq::NConfig::TConfig& fqConfig, TVector<std::pair<TActorId, TActorSetupCmd>>& additionalLocalServices) {
if (fqConfig.HasRowDispatcher() && fqConfig.GetRowDispatcher().GetEnabled()) {
NFq::IYqSharedResources::TPtr iSharedResources = NFq::CreateYqSharedResources(
fqConfig,
NKikimr::CreateYdbCredentialsProviderFactory,
MakeIntrusive<NMonitoring::TDynamicCounters>());
NFq::TYqSharedResources::TPtr yqSharedResources = NFq::TYqSharedResources::Cast(iSharedResources);
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory;

NFq::NConfig::TCommonConfig commonConfig;
auto rowDispatcher = NFq::NewRowDispatcherService(
fqConfig.GetRowDispatcher(),
commonConfig,
NKikimr::CreateYdbCredentialsProviderFactory,
yqSharedResources,
credentialsFactory,
"/tenant",
MakeIntrusive<NMonitoring::TDynamicCounters>());

additionalLocalServices.emplace_back(
NFq::RowDispatcherServiceActorId(),
TActorSetupCmd(rowDispatcher.release(), TMailboxType::Simple, 0));
}
}

int RunMain(int argc, const char* argv[])
{
TString gatewaysCfgFile;
TString fqCfgFile;
TString progFile;
TVector<TString> tablesMappingList;
THashMap<TString, TString> tablesMapping;
Expand Down Expand Up @@ -622,10 +581,6 @@ int RunMain(int argc, const char* argv[])
.Optional()
.RequiredArgument("FILE")
.StoreResult(&gatewaysCfgFile);
opts.AddLongOption("fq-cfg", "federated query configuration file")
.Optional()
.RequiredArgument("FILE")
.StoreResult(&fqCfgFile);
opts.AddLongOption("fs-cfg", "Path to file storage config")
.Optional()
.StoreResult(&fileStorageCfg);
Expand Down Expand Up @@ -919,9 +874,6 @@ int RunMain(int argc, const char* argv[])
setting->SetValue("1");
}

NFq::NConfig::TConfig fqConfig;
ReadFqConfig(fqCfgFile, &fqConfig);

if (res.Has("enable-spilling")) {
auto* setting = gatewaysConfig.MutableDq()->AddDefaultSettings();
setting->SetName("SpillingEngine");
Expand Down Expand Up @@ -1088,8 +1040,6 @@ int RunMain(int argc, const char* argv[])
clusters.emplace(to_lower(cluster.GetName()), TString{NYql::SolomonProviderName});
}
}
TVector<std::pair<TActorId, TActorSetupCmd>> additionalLocalServices;
InitFq(fqConfig, additionalLocalServices);

std::function<NActors::IActor*(void)> metricsPusherFactory = {};

Expand All @@ -1114,7 +1064,7 @@ int RunMain(int argc, const char* argv[])
bool enableSpilling = res.Has("enable-spilling");
dqGateway = CreateLocalDqGateway(funcRegistry.Get(), dqCompFactory, dqTaskTransformFactory, dqTaskPreprocessorFactories, enableSpilling,
CreateAsyncIoFactory(driver, httpGateway, ytFileServices, genericClient, credentialsFactory, *funcRegistry, requestTimeout, maxRetries, pqGateway), threads,
metricsRegistry, metricsPusherFactory, std::move(additionalLocalServices));
metricsRegistry, metricsPusherFactory);
}

dataProvidersInit.push_back(GetDqDataProviderInitializer(&CreateDqExecTransformer, dqGateway, dqCompFactory, {}, storage));
Expand Down
20 changes: 0 additions & 20 deletions ydb/library/yql/tools/dqrun/examples/fq.conf

This file was deleted.

15 changes: 0 additions & 15 deletions ydb/library/yql/tools/dqrun/examples/gateways.conf
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ Dq {
Name: "EnableDqReplicate"
Value: "true"
}
DefaultSettings {
Name: "_TableTimeout"
Value: "600000"
}
}

Generic {
Expand Down Expand Up @@ -137,14 +133,3 @@ SqlCore {
TranslationFlags: ["FlexibleTypes", "DisableAnsiOptionalAs", "EmitAggApply"]
}

Pq {
ClusterMapping {
Name: "pq"
Endpoint: "localhost:2135"
Database: "local"
ClusterType: CT_DATA_STREAMS
UseSsl: True
SharedReading:False
}
}

2 changes: 0 additions & 2 deletions ydb/library/yql/tools/dqrun/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ ENDIF()
ydb/library/yql/utils/actor_system
ydb/core/fq/libs/actors
ydb/core/fq/libs/db_id_async_resolver_impl
ydb/core/fq/libs/init
ydb/core/external_sources

ydb/library/yql/udfs/common/clickhouse/client
)
Expand Down

0 comments on commit 0d12e54

Please sign in to comment.