Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quick memory optimisations #776

Merged
merged 11 commits into from
Nov 4, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### Changed

- Small memory optimisations: Use `itertuples` in favour of `iterrows`, Loop over mappings rather than converting them to lists up-front. [#776](https://github.com/askap-vast/vast-pipeline/pull/776)
- Updated clearpiperun to delete runs using raw SQL rather than via django [#775](https://github.com/askap-vast/vast-pipeline/pull/775)
- Shortened forced fits measurement names to ensure they fit within the character limits - remove image prefix and limited to 1000 forced fits per source [#734](https://github.com/askap-vast/vast-pipeline/pull/734)
- Cleaned up Code of Conduct including adding Zenodo DOI [#773](https://github.com/askap-vast/vast-pipeline/pull/773)
Expand All @@ -26,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### List of PRs

- [#776](https://github.com/askap-vast/vast-pipeline/pull/776): fix: Minor memory optimisations
- [#775](https://github.com/askap-vast/vast-pipeline/pull/775): fix, feat: Enabled deletion of pipeline runs directly using SQL rather than via django
- [#734](https://github.com/askap-vast/vast-pipeline/pull/734): Shortened forced fits measurement names
- [#773](https://github.com/askap-vast/vast-pipeline/pull/773): docs: Cleaned up Code of Conduct including adding Zenodo DOI
Expand Down
25 changes: 9 additions & 16 deletions vast_pipeline/pipeline/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,12 @@ def validate(self, user: User = None):
# of the user's input format.
# Ensure all input file types have the same epochs.
try:
schema = yaml.Map({epoch: yaml.Seq(yaml.Str()) for epoch in epochs_image})
for input_type in inputs.keys():
self._yaml["inputs"][input_type].revalidate(
yaml.Map({epoch: yaml.Seq(yaml.Str()) for epoch in epochs_image})
)
# Generate a new YAML object on-the-fly per input to avoid saving
# a validation schema per file in the PipelineConfig object
# (These can consume a lot of RAM for long lists of input files).
yaml.load(self._yaml["inputs"][input_type].as_yaml(), schema=schema)
except yaml.YAMLValidationError:
# number of epochs could be different or the name of the epochs may not match
# find out which by counting the number of unique epochs per input type
Expand Down Expand Up @@ -438,20 +440,11 @@ def validate(self, user: User = None):
# This could be combined with the number of epochs validation above, but we want
# to give specific feedback to the user on failure.
try:
schema = yaml.Map(
{epoch: yaml.FixedSeq([yaml.Str()] * epoch_n_files["image"][epoch])
for epoch in epochs_image})
for input_type in inputs.keys():
self._yaml["inputs"][input_type].revalidate(
yaml.Map(
{
epoch: yaml.FixedSeq(
[
yaml.Str()
for _ in range(epoch_n_files["image"][epoch])
]
)
for epoch in epochs_image
}
)
)
yaml.load(self._yaml["inputs"][input_type].as_yaml(), schema=schema)
except yaml.YAMLValidationError:
# map input type to a mapping of epoch to file count
file_counts_str = ""
Expand Down
6 changes: 3 additions & 3 deletions vast_pipeline/pipeline/finalise.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,9 @@ def final_operations(
make_upload_associations(sources_df_upload)

# write associations to parquet file
sources_df.rename(columns={'id': 'meas_id'})[
['source_id', 'meas_id', 'd2d', 'dr']
].to_parquet(os.path.join(p_run.path, 'associations.parquet'))
sources_df[['source_id', 'id', 'd2d', 'dr']]. \
rename(columns={'id': 'meas_id'}). \
to_parquet(os.path.join(p_run.path, 'associations.parquet'))

if calculate_pairs:
# get the Source object primary keys for the measurement pairs
Expand Down
34 changes: 18 additions & 16 deletions vast_pipeline/pipeline/forced_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,15 @@ def image_data_func(image_name: str) -> Dict[str, Any]:
))

# compute the rest of the columns
intermediate_df = (
db.from_sequence(intermediate_df)
.map(lambda x: finalise_forced_dfs(**x))
.compute()
)
# NOTE: Avoid using dask bags to parallelise the mapping
# over DataFrames, since these tend to get very large in memory and
# dask bags make a copy of the output before collecting the results.
# There is also a minimal speed penalty for doing this step without
# parallelism.
intermediate_df = list(map(
lambda x: finalise_forced_dfs(**x),
intermediate_df
))
ddobie marked this conversation as resolved.
Show resolved Hide resolved
df_out = (
pd.concat(intermediate_df, axis=0, sort=False)
.rename(
Expand Down Expand Up @@ -457,10 +461,10 @@ def write_group_to_parquet(
pass


def parallel_write_parquet(
def write_forced_parquet(
df: pd.DataFrame, run_path: str, add_mode: bool = False) -> None:
'''
Parallelize writing parquet files for forced measurements.
Write parquet files for forced measurements.

Args:
df:
Expand All @@ -479,15 +483,13 @@ def get_fname(n): return os.path.join(
run_path,
'forced_measurements_' + n.replace('.', '_') + '.parquet'
)
dfs = list(map(lambda x: (df[df['image'] == x], get_fname(x)), images))
n_cpu = cpu_count() - 1

# writing parquets using Dask bag
bags = db.from_sequence(dfs)
bags = bags.starmap(
lambda df, fname: write_group_to_parquet(df, fname, add_mode))
bags.compute(num_workers=n_cpu)
# Avoid saving the maping to a list since this copies the the entire
# DataFrame which can already be very large in memory at this point.
dfs = map(lambda x: (df[df['image'] == x], get_fname(x)), images)
ddobie marked this conversation as resolved.
Show resolved Hide resolved

# Write parquets
for this_df, fname in dfs:
write_group_to_parquet(this_df, fname, add_mode)
pass


Expand Down Expand Up @@ -689,7 +691,7 @@ def forced_extraction(
logger.info(
'Saving forced measurements to specific parquet file...'
)
parallel_write_parquet(extr_df, p_run.path, add_mode)
write_forced_parquet(extr_df, p_run.path, add_mode)

# Required to rename this column for the image add mode.
extr_df = extr_df.rename(columns={'time': 'datetime'})
Expand Down
30 changes: 15 additions & 15 deletions vast_pipeline/pipeline/model_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ def measurement_models_generator(
An iterable generator object containing the yielded Measurement
objects.
"""
for i, row in meas_df.iterrows():
for row in meas_df.itertuples():
one_m = Measurement()
for fld in one_m._meta.get_fields():
if getattr(fld, 'attname', None) and fld.attname in row.index:
setattr(one_m, fld.attname, row[fld.attname])
if getattr(fld, 'attname', None) and hasattr(row, fld.attname):
setattr(one_m, fld.attname, getattr(row, fld.attname))
yield one_m


Expand All @@ -55,19 +55,19 @@ def source_models_generator(
Returns:
An iterable generator object containing the yielded Source objects.
"""
for i, row in src_df.iterrows():
for row in src_df.itertuples():
# generate an IAU compliant source name, see
# https://cdsweb.u-strasbg.fr/Dic/iau-spec.html
name = (
f"J{deg2hms(row['wavg_ra'], precision=1, truncate=True)}"
f"{deg2dms(row['wavg_dec'], precision=0, truncate=True)}"
f"J{deg2hms(row.wavg_ra, precision=1, truncate=True)}"
f"{deg2dms(row.wavg_dec, precision=0, truncate=True)}"
).replace(":", "")
src = Source()
src.run_id = pipeline_run.id
src.name = name
for fld in src._meta.get_fields():
if getattr(fld, 'attname', None) and fld.attname in row.index:
setattr(src, fld.attname, row[fld.attname])
if getattr(fld, 'attname', None) and hasattr(row, fld.attname):
setattr(src, fld.attname, getattr(row, fld.attname))

yield src

Expand All @@ -88,12 +88,12 @@ def association_models_generator(
An iterable generator object containing the yielded Association objects.
"""
logger.debug(f"Building {len(assoc_df)} association generators")
for i, row in assoc_df.iterrows():
for row in assoc_df.itertuples():
yield Association(
meas_id=row['id'],
source_id=row['source_id'],
d2d=row['d2d'],
dr=row['dr'],
meas_id=row.id,
source_id=row.source_id,
d2d=row.d2d,
dr=row.dr,
)
logger.debug(f"Built {len(assoc_df)} association generators")

Expand All @@ -113,5 +113,5 @@ def related_models_generator(
Returns:
An iterable generator object containing the yielded Association objects.
"""
for i, row in related_df.iterrows():
yield RelatedSource(**row.to_dict())
for row in related_df.itertuples(index=False):
yield RelatedSource(**row._asdict())
10 changes: 9 additions & 1 deletion vast_pipeline/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ def timeStamped(fname, fmt="%Y-%m-%d-%H-%M-%S_{fname}"):
return datetime.now().strftime(fmt).format(fname=fname)


def calculate_n_partitions(df, n_cpu, partition_size_mb=100):
def calculate_n_partitions(df, n_cpu, partition_size_mb=15):
ddobie marked this conversation as resolved.
Show resolved Hide resolved
"""
This function will calculate how many partitions a dataframe should be
split into.
Expand All @@ -401,6 +401,14 @@ def calculate_n_partitions(df, n_cpu, partition_size_mb=100):
df: The pandas dataframe to be partitionined.
n_cpu: The number of available CPUs.
partition_size: The optimal partition size in MB.
NOTE: The default partition size of 15MB is chosen because
many of the parallelised operations on partitioned
DataFrames can consume a much larger amount of memory
than the size of the partition. 15MB avoids consuming
too much memory for significant amounts of parallelism
(e.g. n_cpu > 10) without significant cost to processing
speed.

Returns:
The optimal number of partitions.
"""
Expand Down
Loading