Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

750 configure workers #777

Merged
merged 29 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
27de763
Use itertuples over iterrows since iterrows is an enormous memory hog.
mauch Oct 22, 2024
b4c6088
Drop sources_df columns before renaming id column to avoid a copy of …
mauch Oct 22, 2024
e180a4e
Decrease default partition size to 15MB
mauch Oct 22, 2024
9e9668d
Dont split (large-in-memory) list of DataFrames into dask bags (No pe…
mauch Oct 22, 2024
d7e7b21
Don't write forced parquets in parallel (No perfomance hit for this).
mauch Oct 22, 2024
2ee8052
Initial configuration updates for processing options.
mauch Oct 22, 2024
0a08b7b
Dont overwrite input DataFrame when writing parquets.
mauch Oct 23, 2024
ff6269c
Update CHANGELOG.md
mauch Oct 23, 2024
2d82245
Merge branch 'dev' into optimise_mem
mauch Oct 29, 2024
67ea721
Address review comments.
mauch Oct 29, 2024
c98720b
Copy YAML objects before revalidation so the can be garbage collected.
mauch Oct 29, 2024
5c24a7d
Appease flake8
mauch Oct 29, 2024
cf8256e
Add processing options as optional with defaults.
mauch Oct 29, 2024
4ca1c8b
filter processing config to parallel association.
mauch Oct 29, 2024
8e710f9
Add a funtion to determine the number of workers and partitions for D…
mauch Oct 31, 2024
772ec63
Merge branch 'optimise_mem' into 750_configure_workers
mauch Oct 31, 2024
8cd8eee
Use config values for num_workers and max_partition_size throughout p…
mauch Nov 1, 2024
3fed937
Correct working in config template.
mauch Nov 1, 2024
a9e2b5a
Update CHANGELOG.md
mauch Nov 4, 2024
0f596d9
Remove unused imports.
mauch Nov 4, 2024
15783ef
Merge branch 'dev' into 750_configure_workers
mauch Nov 4, 2024
0231f2c
Bump strictyaml to 1.6.2
mauch Nov 4, 2024
c23753a
Use YAML 'null' to create Python None for all cores option.
mauch Nov 4, 2024
5f9cd36
Make None the default in `calculate_workers_and_partitions` instead of 0
mauch Nov 4, 2024
c0e6bd1
Merge branch '750_configure_workers' of https://github.com/askap-vast…
mauch Nov 4, 2024
137e425
Updated run config docs
ddobie Nov 4, 2024
f27869a
Allow null for num_workers_io and improve validation of processing pa…
mauch Nov 4, 2024
5a8f826
Merge branch '750_configure_workers' of https://github.com/askap-vast…
mauch Nov 5, 2024
2789445
Update num_workers_io default in docs.
mauch Nov 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### Added

- Added configuration options to specify number of workers and maximum partition size for parallel operations. [#777](https://github.com/askap-vast/vast-pipeline/pull/777)
- Added vast_pipeline.utils.delete_run.py to enable deletion of pipeline runs using raw SQL [#775](https://github.com/askap-vast/vast-pipeline/pull/775)

#### Changed
Expand All @@ -27,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### List of PRs

- [#777](https://github.com/askap-vast/vast-pipeline/pull/777): feat: Allow user to specify number of cores and memory size of partitions via configuration.
- [#776](https://github.com/askap-vast/vast-pipeline/pull/776): fix: Minor memory optimisations
- [#775](https://github.com/askap-vast/vast-pipeline/pull/775): fix, feat: Enabled deletion of pipeline runs directly using SQL rather than via django
- [#734](https://github.com/askap-vast/vast-pipeline/pull/734): Shortened forced fits measurement names
Expand Down
15 changes: 15 additions & 0 deletions docs/using/runconfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,21 @@ Below is an example of a default `config.yaml` file. Note that no images or othe
# aggregate pair metrics that are stored in Source objects.
source_aggregate_pair_metrics_min_abs_vs: 4.3

processing:
# Options to control use of Dask parallelism
# NOTE: These are advanced options and you should only change them if you know what you are doing.

# The total number of workers available to Dask ('null' means use one less than all cores)
num_workers: null

# The number of workers to use for disk IO operations (e.g. when reading images for forced extraction)
num_workers_io: 5

# The default maximum size (in MB) to allow per partition of Dask DataFrames
# Increasing this will create fewer partitions and will potentially increase the memory footprint
# of parallel tasks.
max_partition_mb: 15

```

!!! note
Expand Down
1,390 changes: 734 additions & 656 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ whitenoise = "^5.2.0"
gevent = { version = "^21.1.2", optional = true }
gunicorn = { version = "^20.0.4", optional = true }
forced-phot = { git = "https://github.com/askap-vast/forced_phot.git" }
strictyaml = "^1.3.2"
strictyaml = "^1.6.2"
colorcet = "^2.0.6"
matplotlib = "^3.5.0"
holoviews = "^1.14.7"
Expand Down
15 changes: 15 additions & 0 deletions vast_pipeline/config_template.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,18 @@ variability:
# Only measurement pairs where the Vs metric exceeds this value are selected for the
# aggregate pair metrics that are stored in Source objects.
source_aggregate_pair_metrics_min_abs_vs: {{ source_aggregate_pair_metrics_min_abs_vs }}

processing:
# Options to control use of Dask parallelism
# NOTE: These are advanced options and you should only change them if you know what you are doing.

# The total number of workers available to Dask ('null' means use one less than all cores)
num_workers: {{ num_workers }}

# The number of workers to use for disk IO operations (e.g. when reading images for forced extraction)
num_workers_io: {{ num_workers_io }}

# The default maximum size (in MB) to allow per partition of Dask DataFrames
# Increasing this will create fewer partitions and will potentially increase the memory footprint
# of parallel tasks.
max_partition_mb: {{ max_partition_mb }}
16 changes: 8 additions & 8 deletions vast_pipeline/pipeline/association.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import pandas as pd
from typing import Tuple, Dict, List
import dask.dataframe as dd
from psutil import cpu_count

from astropy import units as u
from astropy.coordinates import SkyCoord
Expand All @@ -20,7 +19,7 @@
reconstruct_associtaion_dfs
)
from vast_pipeline.pipeline.config import PipelineConfig
from vast_pipeline.utils.utils import StopWatch, calculate_n_partitions
from vast_pipeline.utils.utils import StopWatch, calculate_workers_and_partitions


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -1586,11 +1585,12 @@ def parallel_association(
# Add an increment to any new source values when using add_mode to avoid
# getting duplicates in the result laater
id_incr_par_assoc = max(done_source_ids) if add_mode else 0

n_cpu = cpu_count() - 1
logger.debug(f"Running association with {n_cpu} CPUs")
n_partitions = calculate_n_partitions(images_df, n_cpu)

n_workers, n_partitions = calculate_workers_and_partitions(
images_df,
n_cpu=config['processing']['num_workers'],
max_partition_mb=config['processing']['max_partition_mb']
)
logger.debug(f"Running association with {n_workers} CPUs")
# pass each skyreg_group through the normal association process.
results = (
dd.from_pandas(images_df.set_index('skyreg_group'), npartitions=n_partitions)
Expand All @@ -1608,7 +1608,7 @@ def parallel_association(
id_incr_par_assoc=id_incr_par_assoc,
parallel=True,
meta=meta
).compute(n_workers=n_cpu, scheduler='processes')
).compute(n_workers=n_workers, scheduler='processes')
)

# results are the normal dataframe of results with the columns:
Expand Down
25 changes: 24 additions & 1 deletion vast_pipeline/pipeline/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


def make_config_template(template_path: str, **kwargs) -> str:
"""Generate the contents of a run configuration file from on a Jinja2 template.
"""Generate the contents of a run configuration file from a Jinja2 template.

Args:
template_path: Path to a Jinja2 template.
Expand Down Expand Up @@ -114,6 +114,22 @@ class PipelineConfig:
"source_aggregate_pair_metrics_min_abs_vs": yaml.Float(),
}
),
yaml.Optional("processing"): yaml.Map(
{
yaml.Optional(
"num_workers",
default=settings.PIPE_RUN_CONFIG_DEFAULTS['num_workers']):
yaml.NullNone() | yaml.Int() | yaml.Str(),
yaml.Optional(
"num_workers_io",
default=settings.PIPE_RUN_CONFIG_DEFAULTS['num_workers_io']):
yaml.NullNone() | yaml.Int() | yaml.Str(),
yaml.Optional(
"max_partition_mb",
default=settings.PIPE_RUN_CONFIG_DEFAULTS['max_partition_mb']):
yaml.Int()
}
)
}
)
# path to default run config template
Expand Down Expand Up @@ -511,6 +527,13 @@ def validate(self, user: User = None):
if not os.path.exists(file):
raise PipelineConfigError(f"{file} does not exist.")

# ensure num_workers and num_workers_io are
# either None (from null in config yaml) or an integer
for param_name in ('num_workers', 'num_workers_io'):
param_value = self['processing'][param_name]
if (param_value is not None) and (type(param_value) is not int):
raise PipelineConfigError(f"{param_name} can only be an integer or 'null'")

def check_prev_config_diff(self) -> bool:
"""
Checks if the previous config file differs from the current config file. Used in
Expand Down
13 changes: 10 additions & 3 deletions vast_pipeline/pipeline/finalise.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ def final_operations(
source_aggregate_pair_metrics_min_abs_vs: float,
add_mode: bool,
done_source_ids: List[int],
previous_parquets: Dict[str, str]
previous_parquets: Dict[str, str],
n_cpu: int = 0,
max_partition_mb: int = 15
) -> Tuple[int, int]:
"""
Performs the final operations of the pipeline:
Expand Down Expand Up @@ -136,7 +138,9 @@ def final_operations(
)
log_total_memory_usage()

srcs_df = parallel_groupby(sources_df)
srcs_df = parallel_groupby(sources_df,
n_cpu=n_cpu,
max_partition_mb=max_partition_mb)

mem_usage = get_df_memory_usage(srcs_df)
logger.info('Groupby-apply time: %.2f seconds', timer.reset())
Expand Down Expand Up @@ -179,7 +183,10 @@ def final_operations(
# create measurement pairs, aka 2-epoch metrics
if calculate_pairs:
timer.reset()
measurement_pairs_df = calculate_measurement_pair_metrics(sources_df)
measurement_pairs_df = calculate_measurement_pair_metrics(
sources_df,
n_cpu=n_cpu,
max_partition_mb=max_partition_mb)
logger.info(
'Measurement pair metrics time: %.2f seconds',
timer.reset())
Expand Down
24 changes: 15 additions & 9 deletions vast_pipeline/pipeline/forced_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import pandas as pd
import dask.dataframe as dd
import dask.bag as db
from psutil import cpu_count
from glob import glob

from astropy import units as u
Expand All @@ -19,7 +18,7 @@
from vast_pipeline.pipeline.loading import make_upload_measurements

from forced_phot import ForcedPhot
from ..utils.utils import StopWatch
from ..utils.utils import StopWatch, calculate_workers_and_partitions
from vast_pipeline.image.utils import open_fits


Expand Down Expand Up @@ -259,7 +258,7 @@ def finalise_forced_dfs(
def parallel_extraction(
df: pd.DataFrame, df_images: pd.DataFrame, df_sources: pd.DataFrame,
min_sigma: float, edge_buffer: float, cluster_threshold: float,
allow_nan: bool, add_mode: bool, p_run_path: str
allow_nan: bool, add_mode: bool, p_run_path: str, n_workers: int = 5
) -> pd.DataFrame:
"""
Parallelize forced extraction with Dask
Expand Down Expand Up @@ -289,6 +288,8 @@ def parallel_extraction(
True when the pipeline is running in add image mode.
p_run_path:
The system path of the pipeline run output.
n_workers:
The desired number of workers for Dask

Returns:
Dataframe with forced extracted measurements data, columns are
Expand Down Expand Up @@ -374,7 +375,7 @@ def image_data_func(image_name: str) -> Dict[str, Any]:
npartitions=len(list_meas_parquets)
)
.map(get_data_from_parquet, p_run_path, add_mode)
.compute()
.compute(num_workers=n_workers, scheduler="processes")
)
mapping = pd.DataFrame(mapping)
# remove not used columns from images_df and merge into mapping
Expand All @@ -393,7 +394,6 @@ def image_data_func(image_name: str) -> Dict[str, Any]:
)
del col_to_drop

n_cpu = cpu_count() - 1
bags = db.from_sequence(list_to_map, npartitions=len(list_to_map))
forced_dfs = (
bags.map(lambda x: extract_from_image(
Expand All @@ -402,7 +402,7 @@ def image_data_func(image_name: str) -> Dict[str, Any]:
allow_nan=allow_nan,
**x
))
.compute()
.compute(num_workers=n_workers, scheduler='processes')
)
del bags
# create intermediates dfs combining the mapping data and the forced
Expand Down Expand Up @@ -497,7 +497,8 @@ def forced_extraction(
sources_df: pd.DataFrame, cfg_err_ra: float, cfg_err_dec: float,
p_run: Run, extr_df: pd.DataFrame, min_sigma: float, edge_buffer: float,
cluster_threshold: float, allow_nan: bool, add_mode: bool,
done_images_df: pd.DataFrame, done_source_ids: List[int]
done_images_df: pd.DataFrame, done_source_ids: List[int],
n_cpu: int = 5
) -> Tuple[pd.DataFrame, int]:
"""
Check and extract expected measurements, and associated them with the
Expand Down Expand Up @@ -533,6 +534,8 @@ def forced_extraction(
done_source_ids:
List of the source ids that were already present in the previous
run (used in add image mode).
n_cpu:
The desired number of workers for Dask.

Returns:
The `sources_df` with the extracted sources added.
Expand Down Expand Up @@ -604,11 +607,14 @@ def forced_extraction(
f" (from {total_to_extract} total)"
)

# Don't care about n_partitions in this step
n_workers, _ = calculate_workers_and_partitions(None, n_cpu)

timer.reset()
extr_df = parallel_extraction(
extr_df, images_df, sources_df[['source', 'image', 'flux_peak']],
min_sigma, edge_buffer, cluster_threshold, allow_nan, add_mode,
p_run.path
p_run.path, n_workers=n_workers
)
logger.info(
'Force extraction step time: %.2f seconds', timer.reset()
Expand Down Expand Up @@ -712,7 +718,7 @@ def forced_extraction(
n_forced = (
dd.read_parquet(forced_parquets, columns=['id'])
.count()
.compute()
.compute(num_workers=n_workers, scheduler='processes')
.values[0]
)
else:
Expand Down
13 changes: 10 additions & 3 deletions vast_pipeline/pipeline/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ def process_pipeline(self, p_run: Run) -> None:
sources_df.loc[sources_df['forced'] == False, missing_source_cols],
images_df,
skyregs_df,
n_cpu=self.config['processing']['num_workers'],
max_partition_mb=self.config['processing']['max_partition_mb']
)

# STEP #4 New source analysis
Expand All @@ -274,7 +276,9 @@ def process_pipeline(self, p_run: Run) -> None:
missing_sources_df,
self.config["new_sources"]["min_sigma"],
self.config["source_monitoring"]["edge_buffer_scale"],
p_run
p_run,
n_cpu=self.config['processing']['num_workers_io'],
max_partition_mb=self.config['processing']['max_partition_mb']
)

# Drop column no longer required in missing_sources_df.
Expand All @@ -299,7 +303,8 @@ def process_pipeline(self, p_run: Run) -> None:
self.config["source_monitoring"]["allow_nan"],
self.add_mode,
done_images_df,
done_source_ids
done_source_ids,
n_cpu=self.config['processing']['num_workers_io']
)
mem_usage = get_df_memory_usage(sources_df)
logger.debug(f"Step 5: sources_df memory usage: {mem_usage}MB")
Expand All @@ -319,7 +324,9 @@ def process_pipeline(self, p_run: Run) -> None:
self.config["variability"]["source_aggregate_pair_metrics_min_abs_vs"],
self.add_mode,
done_source_ids,
self.previous_parquets
self.previous_parquets,
n_cpu=self.config['processing']['num_workers'],
max_partition_mb=self.config['processing']['max_partition_mb']
)

log_total_memory_usage()
Expand Down
Loading
Loading