Skip to content

Commit

Permalink
ups
Browse files Browse the repository at this point in the history
  • Loading branch information
blester125 committed Jul 31, 2024
1 parent a76f2fd commit 3e31f23
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 1 deletion.
104 changes: 104 additions & 0 deletions licensed_pile/remove_html.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#!/usr/bin/env python3

import argparse
import multiprocessing as mp
from tempfile import TemporaryDirectory

import bs4

from licensed_pile import logs, utils
from licensed_pile.write import ShardParallelProcessor

parser = argparse.ArgumentParser(description="Remove HTML from dolma documents.")
parser.add_argument(
"--input",
required=True,
help="The input version, this directory should be where the `documents` dir lives.",
)
parser.add_argument(
"--output",
required=True,
help="The output version, this directory should be where the `documents` dir will live.",
)
parser.add_argument(
"--filename",
default="*.jsonl.gz",
help="The filename to match with globs, probably needs to be escaped.",
)
# TODO: Respect this flag
parser.add_argument(
"--overwrite",
action="store_true",
help="Should we overwrite previously processed examples?",
)
parser.add_argument(
"--debug",
action="store_true",
help="Should we log when documents are not changed by preprocessing.",
)
parser.add_argument(
"--processes",
type=int,
default=mp.cpu_count(),
help="Number of processors for multicore.",
)

logs.configure_logging("dolma.RemoveHTMLParallel", level="INFO")


class RemoveHTMLParallel(ShardParallelProcessor):
@classmethod
def process_example(cls, example, **kwargs):
try:
example["text"] = bs4.BeautifulSoup(
example["text"], "html.parser"
).get_text()
except bs4.ParserRejectedMarkup:
logger = cls.get_logger()
# If this exception is raised, it will be before the assignment so
# example["text"] is still the original text.
logger.warning(
"Failed to remove HTML from %s/%s, probably due to text that looks likes an html tag, keeping text as is.",
example["source"],
example["id"],
extra={
"file": kwargs.get("source_file"),
"line": kwargs.get("line_number"),
"source": example["source"],
"example_id": example["id"],
},
exc_info=True,
)
except:
logger.error(
"Failed HTML parsing in %s/%s",
example["source"],
example["id"],
extra={
"file": kwargs.get("source_file"),
"line": kwargs.get("line_number"),
"source": example["source"],
"example_id": example["id"],
},
exc_info=True,
)
# Just pass the text through for now
return example


def main(args):
with TemporaryDirectory() as tempdir:
processor = RemoveHTMLParallel(
source_prefix=utils.dolma_input(args.input, args.filename),
destination_prefix=utils.dolma_output(args.output),
metadata_prefix=tempdir,
num_processes=args.processes,
)
processor(debug=args.debug, overwrite=args.overwrite)


if __name__ == "__main__":
# Dolma examples use spawn over fork, unsure why but lets follow them.
mp.set_start_method("spawn")
args = parser.parse_args()
main(args)
59 changes: 59 additions & 0 deletions licensed_pile/scripts/id_to_shard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env python3

import argparse
import glob
import json
import multiprocessing as mp
import os
import re
from tempfile import TemporaryDirectory

import smart_open

from licensed_pile import utils, write


class IdToShardParallel(write.ShardParallelProcessor):
@classmethod
def process_example(cls, example, **kwargs):
return {"id": example["id"]}


def main():
mp.set_start_method("spawn")
parser = argparse.ArgumentParser(description="")
parser.add_argument("--input", help="", required=True)
parser.add_argument(
"--output",
help="",
default="id_to_shards.json",
)
parser.add_argument("--processes", type=int, default=mp.cpu_count(), help="")
args = parser.parse_args()

args.input = utils.dolma_input(args.input)

with TemporaryDirectory() as tempdir:
processor = IdToShardParallel(
source_prefix=args.input,
destination_prefix=tempdir,
metadata_prefix=tempdir,
num_processes=args.processes,
)
processor()

id_to_shard = {}
for shard_file in glob.iglob(
os.path.join(tempdir, os.path.basename(args.input))
):
if shard := re.search("^(\d{5})_", os.path.basename(shard_file)):
shard = shard.group(1)
with smart_open.smart_open(shard_file) as f:
ids = [json.loads(l)["id"] for l in f if l]
id_to_shard |= dict.fromkeys(ids, shard)
with smart_open.smart_open(args.output, "w") as wf:
json.dump(id_to_shard, wf)


if __name__ == "__main__":
main()
4 changes: 3 additions & 1 deletion licensed_pile/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ def process_single(
if debug:
og = copy.deepcopy(data["text"])

processed = cls.process_example(data, **kwargs)
processed = cls.process_example(
data, source_file=source_path, line_number=i, **kwargs
)

if processed is None:
logger.warning(
Expand Down

0 comments on commit 3e31f23

Please sign in to comment.