Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add files to compute basic stats on pseudo crawl dataset #382

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
169 changes: 169 additions & 0 deletions dashboard/python_scripts/compute_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import os
import json
import logging
import subprocess
from argparse import ArgumentParser
from pathlib import Path
from statistics import mean

import datasets
from datasets import config, load_from_disk
from datasets.utils.logging import set_verbosity_info

set_verbosity_info()
logger = logging.getLogger(__name__)


def get_args():
parser = ArgumentParser()
parser.add_argument(
"--dataset-path",
type=str,
required=True,
help="path to the parquet dataset folder",
)
parser.add_argument(
"--save-path-stats-json",
type=str,
required=True,
help="Where to save the stats json.",
)
parser.add_argument(
"--save-path-stats-full-json", type=str, help="Where to save the stats json."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"--save-path-stats-full-json", type=str, help="Where to save the stats json."
"--save-path-stats-full-json", type=str, required=True, help="Where to save the stats json."

)
parser.add_argument(
"--save-batch-size", type=int, required=True, help="Batch size when writing."
)
parser.add_argument("--use-datasets-caching", action="store_true")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never us it. Let's remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, it seems to me that I used use_datasets_caching

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll removed save-batch-size

parser.add_argument(
"--num-proc", type=int, default=1, help="Number of procs use for preprocessing."
)
parser.add_argument(
"--seed-id",
type=int,
required=True,
help="Value of the seed id.",
)
parser.add_argument(
"--num-examples",
type=int,
default=None,
help="Optional argument to select a subset (used for debugging purposes). Example `10`.",
)
args = parser.parse_args()

return args


def main():
# Setup logging
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
level=logging.INFO,
)
args = get_args()
logger.info(
f"** The job is runned with the following arguments: **\n{args}\n **** "
)

if os.path.isfile(args.save_path_stats_json):
logger.info(f" --- Statistics already computed for seed id {args.seed_id} ")
return

logger.info(f" --- Statistics not already computed for seed id {args.seed_id} ")
if not args.use_datasets_caching:
datasets.set_caching_enabled(False)
else:
logger.info(
f"the datasets results will be cached at {config.HF_DATASETS_CACHE}."
)
Comment on lines +74 to +80
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.info(f" --- Statistics not already computed for seed id {args.seed_id} ")
if not args.use_datasets_caching:
datasets.set_caching_enabled(False)
else:
logger.info(
f"the datasets results will be cached at {config.HF_DATASETS_CACHE}."
)
logger.info(f" --- Statistics not already computed for seed id {args.seed_id} ")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, it seems to me that I used use_datasets_caching


ds = load_from_disk(args.dataset_path)

if args.num_examples is not None:
ds = ds.select([i for i in range(args.num_examples)])

selected_mime_types = ["text/html"]
splits = {
**{
mime_type: ds.filter(
lambda mime_types_: [
mime_type_ == mime_type for mime_type_ in mime_types_
],
input_columns="content_mime_detected",
batched=True,
num_proc=args.num_proc,
)
for mime_type in selected_mime_types
},
"others": ds.filter(
lambda mime_types_: [
mime_type_ not in selected_mime_types for mime_type_ in mime_types_
],
input_columns="content_mime_detected",
batched=True,
num_proc=args.num_proc,
Comment on lines +101 to +106
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not exactly sure what happens when I run this code, but sometimes when this is empty if gives me back the entire dataset instead of saying empty dataset.

),
}

data_stats = {f"{split_name}_total": len(ds) for split_name, ds in splits.items()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
data_stats = {f"{split_name}_total": len(ds) for split_name, ds in splits.items()}
data_stats = {f"{split_name}_total_samples": len(ds) for split_name, ds in splits.items()}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this naming is best but are you sure to want to change this as we already computed some statistics by naming it with f"{split_name}_total" ?


ds_html = splits["text/html"]

logger.info(f"the currents splits are {data_stats}.")

def get_length_text(example):
example["length_text"] = (
len(example["text"]) if example["text"] is not None else 0
)
return example

cols_to_remove = [
col
for col in ds.column_names
if col not in ["content_languages", "url_host_tld"]
]
ds_html = ds_html.map(
get_length_text,
batched=False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
batched=False,
batched=True,

You just have to code a batched version of get_length_text

num_proc=args.num_proc,
remove_columns=cols_to_remove,
)

data_stats["html_empty_text"] = len([e for e in ds_html["length_text"] if e == 0])

non_empty_texts = [e for e in ds_html["length_text"] if e != 0]
data_stats["html_mean_length_non_empty_text"] = (
mean(non_empty_texts) if non_empty_texts != [] else None
)
data_stats["seed_id"] = args.seed_id

logger.info(
f"There is {data_stats['html_empty_text']} empty text rows out of {len(ds_html)} rows."
)

save_path = Path(args.save_path_stats_json)
save_path_tmp = f"{str(save_path.absolute())}.tmp"
logger.info(f"Saving the dataset at {save_path_tmp}")
with open(save_path_tmp, "w", encoding="utf-8") as f:
json.dump(data_stats, f, ensure_ascii=False, indent=4)
logger.info(f"Moving the saved dataset to {str(save_path.absolute())}")
subprocess.run(["mv", save_path_tmp, str(save_path.absolute())])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'd recomment os.rename. subprocess doesn't tell you if it fails.


save_path = Path(args.save_path_stats_full_json)
tmp_file_name = f"tmp-{str(save_path.name)}"
save_path_tmp = os.path.join(save_path.parent, tmp_file_name)
logger.info(f"Saving the dataset at {save_path_tmp}")
ds_html.to_json(
save_path_tmp,
batch_size=args.save_batch_size,
num_proc=args.num_proc,
compression="gzip",
)
logger.info(f"Moving the saved dataset to {str(save_path.absolute())}")
Comment on lines +154 to +164
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
save_path = Path(args.save_path_stats_full_json)
tmp_file_name = f"tmp-{str(save_path.name)}"
save_path_tmp = os.path.join(save_path.parent, tmp_file_name)
logger.info(f"Saving the dataset at {save_path_tmp}")
ds_html.to_json(
save_path_tmp,
batch_size=args.save_batch_size,
num_proc=args.num_proc,
compression="gzip",
)
logger.info(f"Moving the saved dataset to {str(save_path.absolute())}")
save_path_full = Path(args.save_path_stats_full_json)
save_path_full_tmp = save_path_full.rename(f"{save_path_full.name}.tmp")
logger.info(f"Saving the dataset at {save_path_full_tmp}")
ds_html.to_json(
save_path_full_tmp,
batch_size=args.save_batch_size,
num_proc=args.num_proc,
compression="gzip",
)
logger.info(f"Moving the saved dataset to {str(save_path_full.absolute())}")

two things:

  • move with os.rename, or shutil.move
  • You're not scared of overiting previous full_json?

subprocess.run(["mv", save_path_tmp, str(save_path.absolute())])


if __name__ == "__main__":
main()
44 changes: 44 additions & 0 deletions dashboard/slurm_scripts/compute_stats_on_pseudo_crawl.slurm
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/bin/bash
#SBATCH --job-name=pseudo_crawl_compute_stats_v5
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1 # crucial - only 1 task per dist per node!
#SBATCH --cpus-per-task=4 # number of cores per tasks
#SBATCH --hint=nomultithread # we get physical cores not logical
#SBATCH --partition=cpu_p1
#SBATCH --time 10:00:00 # maximum execution time (HH:MM:SS)
#SBATCH --output=/gpfswork/rech/six/uty16tp/code/big_science/logs/compute_stats_v5/%x-%j.out # output file name
#SBATCH --array=1-604
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to add the new seeds. And I need to update the csv with the new seeds ....

#SBATCH --account=six@cpu

set -x -e

source $six_ALL_CCFRWORK/start-prod
conda activate thomas_data_tooling # Debug deepspeed temporarily

CC_INDEX_FOLDER=$six_ALL_CCFRSCRATCH/pseudo_crawl/cc
DATA_TOOLING_REPO=$WORK/repos/sync_data_tooling/data_tooling

pushd $DATA_TOOLING_REPO

SEED_ID=$(python cc_pseudo_crawl/load_all_seed_ids.py --seed-path "$DATA_TOOLING_REPO"/cc_pseudo_crawl/sourcing_sheet_seeds/ --seed-index $SLURM_ARRAY_TASK_ID)

echo "Computing stats on seed id ${SEED_ID}"

DATASET_PATH=$six_ALL_CCFRSCRATCH/pseudo_crawl/datasets-seeds/bigscience-catalogue-data/pseudo_crawl_seed--seed-id--"$SEED_ID"
SAVE_STATS_PATH=$six_ALL_CCFRSCRATCH/pseudo_crawl/datasets-stats/bigscience-catalogue-data/seed_id="$SEED_ID"/stats.json
SAVE_STATS_PATH_DIR=$six_ALL_CCFRSCRATCH/pseudo_crawl/datasets-stats/bigscience-catalogue-data/seed_id="$SEED_ID"/full
SAVE_STATS_PATH_FULL=$SAVE_STATS_PATH_DIR/full.jsonl.gz

mkdir -p $SAVE_STATS_PATH_DIR

export HF_DATASETS_OFFLINE=1
export HF_DATASETS_CACHE=$SCRATCH/to_delete

python dashboard/python_scripts/compute_stats.py \
--dataset-path $DATASET_PATH \
--num-proc 4 \
--save-path-stats-json $SAVE_STATS_PATH \
--save-path-stats-full-json $SAVE_STATS_PATH_FULL \
--use-datasets-caching\
--seed-id $SEED_ID\
--save-batch-size 2 # 100 # 10 # 2