Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updates needed when processing the full stack exchange dumps #79

Merged
merged 1 commit into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion stackexchange/extract.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!/usr/bin/env sh

for file in data/dump/*.7z; do
data_dir=${1:-"data"}
data_dir=${data_dir%/}

for file in ${data_dir}/dump/*.7z; do
7z x -o${file%.7z} "${file}"
done
9 changes: 6 additions & 3 deletions stackexchange/get-data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

set -e

./get-dumps.sh
./preprocess-sites.sh
data_dir=${1:-"data"}
data_dir=${data_dir%/}

./get-dumps.sh ${data_dir}
./preprocess-sites.sh ${data_dir}
# process stack overflow
python preprocess.py --input data/dump/stackoverflow.com --output data/stack-exchange/v0/stackoverflow.com --shelve
python preprocess.py --input ${data_dir}/dump/stackoverflow.com --output ${data_dir}/stackexchange/v0/stackoverflow.com --shelve
12 changes: 8 additions & 4 deletions stackexchange/get-dumps.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
#!/usr/bin/env sh

data_dir=${1:-"data"}
data_dir=${data_dir%/}

echo "Downloading Sites map from Internet Archive and saving to ${data_dir}."
# Get all the stackexchange dumps from archive.org
wget https://archive.org/download/stackexchange/Sites.xml -P data/
wget https://archive.org/download/stackexchange/Sites.xml -P ${data_dir}
# List the download links based on the xml file and the archive.org base dir.
# Shuffle so any restarts probably start with a new files
# run 4 wget processins and call each on with 10 urls.
Expand All @@ -12,6 +16,6 @@ wget https://archive.org/download/stackexchange/Sites.xml -P data/
# -P data/dump save all files with the prefix data/dump
# Note: The progress bars from the different wget processes overwrite each other
# They are still useful to ensure that progress is being made.
python list-sites.py | shuf | xargs -n10 -P4 wget -w 2 -t 10 -c -nc -P data/dump --show-progress
./extract.sh
./get-stackoverflow.sh
python list-sites.py | shuf | xargs -n10 -P4 wget -w 2 -t 10 -c -nc -P ${data_dir}/dump --show-progress
./extract.sh ${data_dir}
./get-stackoverflow.sh ${data_dir}
11 changes: 7 additions & 4 deletions stackexchange/get-stackoverflow.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#!/usr/bin/env bash

data_dir=${1:-"data"}
data_dir=${data_dir%/}

# Stackoverflow is larger than the other sites so they distribute each .xml file
# as its own .7z compressed file.
mkdir -p data/dump/stackoverflow.com
mkdir -p ${data_dir}/dump/stackoverflow.com

STACKOVERFLOW=(
"https://archive.org/download/stackexchange/stackoverflow.com-Badges.7z"
Expand All @@ -17,8 +20,8 @@ STACKOVERFLOW=(
for url in "${STACKOVERFLOW[@]}"; do
file="${url##*.com-}"
file="${file%.7z}"
wget -c -nc -P data/dump/stackoverflow.com --show-progress "${url}"
if [[ ! -f "data/dump/stackoverflow.com/${file}.xml" ]]; then
7z x -odata/dump/stackoverflow.com/ data/dump/stackoverflow.com/"${url##*/}"
wget -c -nc -P ${data_dir}/dump/stackoverflow.com --show-progress "${url}"
if [[ ! -f "${data_dir}/dump/stackoverflow.com/${file}.xml" ]]; then
7z x -o${data_dir}/dump/stackoverflow.com/ ${data_dir}/dump/stackoverflow.com/"${url##*/}"
fi
done
11 changes: 7 additions & 4 deletions stackexchange/preprocess-sites.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#!/usr/bin/env bash

for site_dump in data/dump/*/; do
data_dir=${1:-"data"}
data_dir=${data_dir%/}

for site_dump in ${data_dir}/dump/*/; do
site=$(basename ${site_dump})
if [[ "${site}" != "stackoverflow.com" ]]; then
output="data/stack-exchange/v0/${site}"
output="${data_dir}/stackexchange/v0/${site}"
if [[ ! -d ${output} ]]; then
echo "python preprocess.py --input ${site_dump} --output data/stack-exchange/v0/${site}"
time python preprocess.py --input ${site_dump} --output data/stack-exchange/v0/${site}
echo "python preprocess.py --input ${site_dump} --output ${output}"
time python preprocess.py --input ${site_dump} --output ${output}
fi
fi
done
70 changes: 42 additions & 28 deletions stackexchange/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import datetime
import functools
import itertools
import logging
import multiprocessing as mp
import operator as op
import os
import re
import shelve
import urllib.parse
from dataclasses import dataclass
Expand All @@ -20,18 +20,14 @@
from markdown_it import MarkdownIt

import licensed_pile.xml as xml
from licensed_pile import logs
from licensed_pile.licenses import PermissiveLicenses
from licensed_pile.write import to_dolma

logging.basicConfig(
level=logging.INFO,
format="StackExchange Processing: [%(asctime)s] %(levelname)s - %(message)s",
)

parser = argparse.ArgumentParser(description="Parse a stack exchange dump.")
parser.add_argument("--input", help="Path to the dump, data/dump/${site}")
parser.add_argument(
"--output", help="Path to the output, data/stack-exchange/v0/${site}/documents"
"--output", help="Path to the output, data/stackexchange/v0/${site}/documents"
)
parser.add_argument(
"--processes",
Expand Down Expand Up @@ -301,22 +297,35 @@ def _cmp_answers(a, b):
return sorted(answers, key=functools.cmp_to_key(_cmp_answers))


def find_file(directory: str, file_name: str) -> str:
"""Some dumps use lowercase files names :/"""
for f in (file_name, file_name.lower()):
if os.path.exists(path := os.path.join(directory, f)):
return path
logger = logs.configure_logging("stackexchange")
logger.error(f"Filed to find {file_name} in {directory}")
raise ValueError(f"Failed to find {file_name} in {directory}")


def main(args):
logger = logs.configure_logging("stackexchange")
# Note: The Stack Exchage data doesn't lend itself to being shared into the
# dolma format before the preprocessing is done, therefore we manually use
# multiprocessing as we go to generate examples in parallel which are
# eventually stored in the dolma format.
site = os.path.basename(args.input)
# Make sure the ending the input dir with a `/` doesn't results in an empty
# string as the site value.
site = os.path.basename(re.sub(r"/$", "", args.input))
os.makedirs(args.output, exist_ok=True)

date_sort = functools.partial(sorted, key=op.attrgetter("date"))
# Comments are always sorted by date
sort_comments = date_sort
if args.sort == "time":
logging.info("Answers will be sorted based on the date.")
logger.info("Answers will be sorted based on the date.")
sort_answers = date_sort
else:
logging.info("Aanswers will be sorted based on votes (accepted answer first).")
logger.info("Answers will be sorted based on votes (accepted answer first).")
sort_answers = vote_sort

# TODO: Does setting the start method to `spawn` help reduce memory usage?
Expand All @@ -326,8 +335,8 @@ def main(args):
# pool will be "finalized" (deleted) before all the data is processed and
# the program will hang.
with mp.Pool(processes=args.processes) as pool:
logging.info("Building Lookup from user id -> user names")
user_xml = xml.iterate_xml(os.path.join(args.input, "Users.xml"), "row")
logger.info("Building Lookup from user id -> user names")
user_xml = xml.iterate_xml(find_file(args.input, "Users.xml"), "row")
# This table is fairly small so we don't need to create a shelve for it.
author_display = collections.defaultdict(set)
for user_id, user_names in pool.imap_unordered(
Expand All @@ -337,10 +346,8 @@ def main(args):
continue
author_display[user_id].update(user_names)

logging.info("Building Lookup from post id -> authors")
history_xml = xml.iterate_xml(
os.path.join(args.input, "PostHistory.xml"), "row"
)
logger.info("Building Lookup from post id -> authors")
history_xml = xml.iterate_xml(find_file(args.input, "PostHistory.xml"), "row")
# It would probably be better/faster to use a database to store these
# intermediate lookups instead of a shelve (which requires multiple
# pickle serialization/deserialization) but I didn't want to implement
Expand Down Expand Up @@ -371,10 +378,8 @@ def main(args):
else:
comments = {}
if args.include_comments:
logging.info("Building Lookup from post/answer id -> comments")
comment_xml = xml.iterate_xml(
os.path.join(args.input, "Comments.xml"), "row"
)
logger.info("Building Lookup from post/answer id -> comments")
comment_xml = xml.iterate_xml(find_file(args.input, "Comments.xml"), "row")
for post_id, user_id, text, date, license in pool.imap_unordered(
process_comment, comment_xml, chunksize=100
):
Expand All @@ -398,7 +403,7 @@ def main(args):
for cid, cs in comments.items():
comments[cid] = sort_comments(cs)
else:
logging.info("Comments will not be included in the text output.")
logger.info("Comments will not be included in the text output.")

if args.shelve:
parsed_dump = shelve.open(os.path.join(args.output, "questions.shelve"))
Expand All @@ -407,39 +412,47 @@ def main(args):

# Questions are the "document" level for this dataset, therefore we do
# no need to sort them.
logging.info("Parsing Questions")
post_xml = xml.iterate_xml(os.path.join(args.input, "Posts.xml"), "row")
logger.info("Parsing Questions")
post_xml = xml.iterate_xml(find_file(args.input, "Posts.xml"), "row")
for post_id, text, date, license, accepted_id in pool.imap_unordered(
process_question, post_xml, chunksize=100
):
if post_id is None:
continue
if post_id not in post_authors:
logger.warning(
f"Failed to find authors associated with post: {post_id}"
)
parsed_dump[post_id] = Question(
text=text,
id=post_id,
authors=post_authors[post_id],
authors=post_authors.get(post_id, {"Unknown"}),
# Comments are sorted in chronological order.
comments=comments.get(post_id, []),
date=date,
license=license,
accepted_answer=accepted_id,
)

logging.info("Parsing Answers")
logger.info("Parsing Answers")
# Reinitialize the iterator over the Posts as it was consumed when
# looking for questions. We do this as a second pass so we know that
# there will always be a question we can attach this answer to.
post_xml = xml.iterate_xml(os.path.join(args.input, "Posts.xml"), "row")
post_xml = xml.iterate_xml(find_file(args.input, "Posts.xml"), "row")
for question_id, answer_id, answer, date, score, license in pool.imap_unordered(
process_answer, post_xml, chunksize=100
):
if question_id is None:
continue
if answer_id not in post_authors:
logger.warning(
f"Failed to find authors assocaited with answer: {answer_id}"
)
question = parsed_dump[question_id]
question.answers.append(
Answer(
text=answer,
authors=post_authors[answer_id],
authors=post_authors.get(answer_id, {"Unknown"}),
# Comments are sorted in chronological order.
comments=comments.get(answer_id, []),
date=date,
Expand All @@ -460,7 +473,7 @@ def main(args):
parsed_dump[qid] = q

# Use iterators so we don't need to have the full dataset loaded at once.
logging.info("Formatting Questions as Dolma Documents")
logger.info("Formatting Questions as Dolma Documents")
# Even on rather large datasets, such as askubuntu.com, and shelves it
# was faster to do the comment/answer sorting and run format dolma in
# the main process. I assume the cost to serialize and decerialize the
Expand All @@ -482,4 +495,5 @@ def main(args):

if __name__ == "__main__":
args = parser.parse_args()
logs.configure_logging("stackexchange")
main(args)
Loading