Skip to content

Commit

Permalink
snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma27 committed Nov 6, 2024
1 parent 7bcf65e commit d9394d0
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 73 deletions.
130 changes: 63 additions & 67 deletions src/hydra-eval-jobs/hydra-eval-jobs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,20 +502,69 @@ int main(int argc, char * * argv)
auto store = openStore();

std::set<std::string> namedConstituentsJobs;
std::map<std::string, std::set<std::string>> expandedConstituents;
std::map<std::string, std::unordered_map<std::string, std::string>> brokenChildJobs;
for (auto i = state->jobs.begin(); i != state->jobs.end(); ++i) {
auto jobName = i.key();
auto & job = i.value();

if (job.find("namedConstituents") != job.end()) {
auto named = job.find("namedConstituents");
if (named != job.end()) {
bool globConstituents = job.value<bool>("globConstituents", false);
std::unordered_map<std::string, std::string> brokenJobs;

auto isBroken = [&brokenJobs, &jobName](
const std::string & childJobName, nlohmann::json & job) -> bool {
if (job.find("error") != job.end()) {
std::string error = job["error"];
printError("aggregate job '%s' references broken job '%s': %s", jobName, childJobName, error);
brokenJobs[childJobName] = error;
return true;
} else {
return false;
}
};

std::set<std::string> results;
for (const std::string & childJobName : *named) {
auto childJob = state->jobs.find(childJobName);
if (childJob == state->jobs.end()) {
if (!globConstituents) {
printError("aggregate job '%s' references non-existent job '%s'", jobName, childJobName);
brokenJobs[childJobName] = "does not exist";
} else {
bool expansionFound = false;
for (auto job = state->jobs.begin(); job != state->jobs.end(); job++) {
auto jobName = job.key();
if (fnmatch(childJobName.c_str(), jobName.c_str(), 0) == 0
&& !isBroken(jobName, *job)
&& childJobName != jobName
) {
results.insert(jobName);
expansionFound = true;
}
}
if (!expansionFound) {
warn("aggregate job '%s' references constituent glob pattern '%s' with no matches", jobName, childJobName);
brokenJobs[childJobName] = "constituent glob pattern had no matches";
}
}
} else if (!isBroken(childJobName, *childJob)) {
results.insert(childJobName);
}
}

namedConstituentsJobs.insert(jobName);
expandedConstituents.insert({jobName, results});
brokenChildJobs.insert({jobName, brokenJobs});
}
}

auto aggregatesWithNamedConstituents = topoSort(
namedConstituentsJobs,
{[&namedConstituentsJobs, &state](const std::string & jobName) {
{[&namedConstituentsJobs, &state, &expandedConstituents](const std::string & jobName) {
if (namedConstituentsJobs.find(jobName) != namedConstituentsJobs.end()) {
return std::set<std::string>(state->jobs[jobName]["namedConstituents"]);
return expandedConstituents.at(jobName);
} else {
return std::set<std::string>();
}
Expand All @@ -528,77 +577,24 @@ int main(int argc, char * * argv)
std::reverse(aggregatesWithNamedConstituents.begin(), aggregatesWithNamedConstituents.end());

for (auto & jobName : aggregatesWithNamedConstituents) {
auto brokenJobs = brokenChildJobs.at(jobName);
auto & job = state->jobs[jobName];
printError("FOO: %s", jobName);
bool globConstituents = job.value<bool>("globConstituents", false);
auto named = job.find("namedConstituents");

std::unordered_map<std::string, std::string> brokenJobs;
auto isBroken = [&brokenJobs, &jobName](
const std::string & childJobName, nlohmann::json & job) -> bool {
if (job.find("error") != job.end()) {
std::string error = job["error"];
printError("aggregate job '%s' references broken job '%s': %s", jobName, childJobName, error);
brokenJobs[childJobName] = error;
return true;
} else {
return false;
}
};
auto getNonBrokenJobsOrRecordError = [&state, &isBroken, &jobName, &brokenJobs, &globConstituents](
const std::string & childJobName) -> std::vector<nlohmann::json> {
auto childJob = state->jobs.find(childJobName);
std::vector<nlohmann::json> results;
if (childJob == state->jobs.end()) {
if (!globConstituents) {
printError("aggregate job '%s' references non-existent job '%s'", jobName, childJobName);
brokenJobs[childJobName] = "does not exist";
} else {
for (auto job = state->jobs.begin(); job != state->jobs.end(); job++) {
auto jobName = job.key();
if (fnmatch(childJobName.c_str(), jobName.c_str(), 0) == 0
&& !isBroken(jobName, *job)
) {
results.push_back(*job);
}
}
if (results.empty()) {
warn("aggregate job '%s' references constituent glob pattern '%s' with no matches", jobName, childJobName);
brokenJobs[childJobName] = "constituent glob pattern had no matches";
}
}
} else if (!isBroken(childJobName, *childJob)) {
results.push_back(*childJob);
}
return results;
};

if (myArgs.dryRun) {
for (std::string jobName2 : *named) {
auto foundJobs = getNonBrokenJobsOrRecordError(jobName2);
if (foundJobs.empty()) {
continue;
}
for (auto & childJob : foundJobs) {
std::string constituentDrvPath = childJob["drvPath"];
job["constituents"].push_back(constituentDrvPath);
}
auto foundJobs = expandedConstituents.at(jobName);
for (auto & childJobName : foundJobs) {
std::string constituentDrvPath = state->jobs[childJobName]["drvPath"];
job["constituents"].push_back(constituentDrvPath);
}
} else {
auto drvPath = store->parseStorePath((std::string) job["drvPath"]);
auto drv = store->readDerivation(drvPath);

for (std::string jobName2 : *named) {
auto foundJobs = getNonBrokenJobsOrRecordError(jobName2);
if (foundJobs.empty()) {
continue;
}
for (auto & childJob : foundJobs) {
auto childDrvPath = store->parseStorePath((std::string) childJob["drvPath"]);
auto childDrv = store->readDerivation(childDrvPath);
job["constituents"].push_back(store->printStorePath(childDrvPath));
drv.inputDrvs.map[childDrvPath].value = {childDrv.outputs.begin()->first};
}
auto foundJobs = expandedConstituents.at(jobName);
for (auto & childJobName : foundJobs) {
auto childDrvPath = store->parseStorePath((std::string) state->jobs[childJobName]["drvPath"]);
auto childDrv = store->readDerivation(childDrvPath);
job["constituents"].push_back(store->printStorePath(childDrvPath));
drv.inputDrvs.map[childDrvPath].value = {childDrv.outputs.begin()->first};
}

if (brokenJobs.empty()) {
Expand Down
14 changes: 8 additions & 6 deletions t/evaluator/evaluate-constituents-globbing.t
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ my ($res, $stdout, $stderr) = captureStdoutStderr(60,
("hydra-eval-jobset", $jobsetCtx->{"project"}->name, $jobset->name)
);

print STDERR "HIER: $stdout\n$stderr\n";

subtest "non_match_aggregate failed" => sub {
ok(utf8::decode($stderr), "Stderr output is UTF8-clean");
like(
Expand Down Expand Up @@ -48,11 +50,11 @@ subtest "basic globbing works" => sub {
is($sortedConstituentNames[1], "empty-dir-B", "second constituent of 'ok_aggregate' is 'empty-dir-B'");
};

#subtest "transitivity is OK" => sub {
#ok(defined $builds->{"indirect_aggregate"}, "'indirect_aggregate' is part of the jobset evaluation");
#my @constituents = $builds->{"indirect_aggregate"}->constituents->all;
#is(1, scalar @constituents, "'indirect_aggregate' has one constituent");
#is($constituents[0]->nixname, "direct_aggregate", "'indirect_aggregate' has 'direct_aggregate' as single constituent");
#};
subtest "transitivity is OK" => sub {
ok(defined $builds->{"indirect_aggregate"}, "'indirect_aggregate' is part of the jobset evaluation");
my @constituents = $builds->{"indirect_aggregate"}->constituents->all;
is(1, scalar @constituents, "'indirect_aggregate' has one constituent");
is($constituents[0]->nixname, "direct_aggregate", "'indirect_aggregate' has 'direct_aggregate' as single constituent");
};

done_testing;

0 comments on commit d9394d0

Please sign in to comment.