-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrecognize_parallel.py
69 lines (57 loc) · 2.34 KB
/
recognize_parallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import subprocess
import time
from pathlib import Path
import hydra
import pandas as pd
def divide_into_chunks(lst, n_parts):
if n_parts == -1:
return [[x] for x in lst]
parts = [[] for i in range(n_parts)]
for i, item in enumerate(lst):
parts[i % n_parts].append(item)
return parts
def get_best_gpu(priority_list, default_gpu):
"""Check if a job with specified GPU memory can start immediately."""
for gpu in priority_list:
try:
cmd = ["sinfo", "-o", "%N %G", "--state=idle"]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
if gpu in result.stdout:
return gpu
except Exception as e:
print(f"Error checking job submission: {e}")
continue
return default_gpu
@hydra.main(config_path="../configs", config_name="recognize_parallel")
def main(cfg):
Path(cfg.bash_script_dir).mkdir(parents=True, exist_ok=True)
Path(cfg.slurm_logs_dir).mkdir(parents=True, exist_ok=True)
metadata_df = pd.read_csv(cfg.metadata_df, sep="\t").sort_values("cntFiles", ascending=False)
unique_links = metadata_df["repositoryUrl"].unique()
with open(cfg.template_script) as f:
template = f.read()
for i, link_chunk in enumerate(divide_into_chunks(unique_links, cfg.n_jobs)):
if cfg.n_debug is not None and cfg.n_debug > 0 and i > cfg.n_debug:
print(f"Debug mode. Processing only {cfg.n_debug} link.")
break
script_name = f"recognize_{i}.sh"
links = '"[' + ",".join(link_chunk) + ']"'
gpu = get_best_gpu(cfg.gpu_priority, cfg.default_gpu)
instance = template.format(
id=i,
gpu=gpu,
links=links,
n_debug=cfg.n_debug,
config=cfg.job_config,
slurm_logs_dir=cfg.slurm_logs_dir,
output_folder=cfg.output_folder,
)
with open(Path(cfg.bash_script_dir, script_name), "w") as f:
f.write(instance)
cmd = ["sbatch", Path(cfg.bash_script_dir, script_name).as_posix()]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
print(result.stdout)
time.sleep(5 * 60)
print(f"Done. Debug mode is {cfg.n_debug}.")
if __name__ == "__main__":
main()