Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
blester125 committed Jul 30, 2024
1 parent 7bc6e07 commit a76f2fd
Show file tree
Hide file tree
Showing 15 changed files with 431 additions and 174 deletions.
37 changes: 29 additions & 8 deletions licensed_pile/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,20 @@ def to_dolma(
wf.write(data + "\n")


def smart_open_exists(path):
try:
with smart_open.open(path):
return True
except:
return False


def create_shadow(path):
h, t = os.path.split(path)
# Add shadow at the start to not break any filename inference from smart_open
return os.path.join(h, f"shadow.{t}")


class ShardParallelProcessor(BaseParallelProcessor):
"""Handle read/writes to jsonl.gz so our processor code only needs to processing a single example."""

Expand Down Expand Up @@ -93,10 +107,14 @@ def process_single(
**kwargs,
):
logger = cls.get_logger()
overwrite = kwargs.pop("overwrite", False)
logger.debug("Processing %s into %s", source_path, destination_path)
with smart_open.open(source_path) as f, smart_open.open(
destination_path, "w"
) as wf:
if not overwrite and smart_open_exists(destination_path):
logger.info("%s already exists, skipping", destination_path)
cls.increment_progressbar(queue, shards=1)
return
shadow_path = create_shadow(destination_path)
with smart_open.open(source_path) as f, smart_open.open(shadow_path, "w") as wf:
document_count = 0
update_interval = kwargs.pop("update_interval", 1)
debug = kwargs.pop("debug", False)
Expand Down Expand Up @@ -126,6 +144,7 @@ def process_single(
source_path,
i,
)
document_count += 1
continue

if debug and og == processed["text"]:
Expand All @@ -142,8 +161,10 @@ def process_single(
update_interval *= 2
document_count = 0
except Exception as e:
logger.warning(
"Failed to process %s:%s %s", source_path, i, e, exc_info=True
)
return
cls.increment_progressbar(queue, shards=1, documents=document_count)
e.add_note(f"Exception occured while processing {source_path}:{i}")
logger.warning("Failed to process %s:%s", source_path, i, exc_info=True)
raise
# return
# Move, only works on local atm
os.rename(shadow_path, destination_path)
cls.increment_progressbar(queue, shards=1, documents=document_count)
1 change: 1 addition & 0 deletions wiki/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
data/*
7 changes: 7 additions & 0 deletions wiki/dump/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# MediaWiki

## Steps:

1. Run `download.sh YYYYMMDD` to download xml dumps
2. Run `to_dolma.sh YYYYMMDD` (date must match) to convert to the dolma format
3. Run `python preprocess.py --input ... --output ...`
4 changes: 3 additions & 1 deletion wiki/dump/download.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data_dir=${2:-"data"}
data_dir=${data_dir%/}

if [ -z ${DATE} ]; then
echo "usage: download.sh [date] data/" 2> /dev/null
echo "usage: download.sh [date YYYYMMDD] data/" 2> /dev/null
exit 1
fi

Expand All @@ -20,6 +20,8 @@ declare -a wikis=(
wiktionary
)

mkdir -p "${data_dir}/dumps"

for wiki in ${wikis[@]}; do
filename="en${wiki}-${DATE}-pages-meta-current.xml.bz2"
url="https://dumps.wikimedia.org/en${wiki}/${DATE}/${filename}"
Expand Down
4 changes: 2 additions & 2 deletions wiki/dump/to_dolma.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ output_dir=${3:-"data/wiki/dump/raw"}
output_dir=${output_dir%/}

if [ -z ${DATE} ]; then
echo "usage: to_dolma.sh [date] dump/ data/wiki/raw/documents" 2> /dev/null
echo "usage: to_dolma.sh [date YYYYMMDD] dump/ data/wiki/raw/documents" 2> /dev/null
exit 1
fi

declare -a wikis=(
# wiki
wiki
wikibooks
wikinews
wikiquote
Expand Down
14 changes: 14 additions & 0 deletions wiki/parser/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
.DS_Store
.idea
*.log
tmp/

*.tern-port
node_modules/
npm-debug.log*
yarn-debug.log*
yarn-error.log*
*.tsbuildinfo
.npm
.eslintcache
logs/
34 changes: 34 additions & 0 deletions wiki/parser/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# WTF WIKIPEDIA parsing server

We use the dolma format and a server running `wtf_wikipedia` for wikitext parsing instead of they dumpster dip as we want to be able to parse wikitext even when it is not in the standard xml format.

## Starting the Server

1. Install HAProxy `sudo apt install haproxy`
2. Install nvm and node
3. Install dependencies `npm install`
4. edit `haproxy.cfg` to include one `server ${name} 127.0.0.1:${port} check` line for each server you plan to run.
5. move/link `haproxy.cfg` to `/etc/haproxy/haproxy.cfg`
6. Restart haproxy (`systemctl restart haproxy` on systemd based systems)
7. Run `./start ${numserver}`. Should match the number of `server` lines in `haproxy`
8. Go to `localhost:8404/stats` to check that each server is seen by haproxy

## Why?

Each server uses a worker pool with `1` worker. This is because `wtf_wikipedia` is syncronous code, so we need to run it in a thread to be able to use timeouts to cancel execution for long running documents. This also helps in cases where the parsing causes an OoM error, this happens in the thread instead of the real server.

We then have multiple copies of the server behing the load balancer, this allows for recovery in cases where the main server itself crashes.

### v8 garbage collection

v8, and therefore node, seem to have a pretty complex garbage collector and includes things like different heaps for persistant objects and "young" objects that are short-lived. Despite various efforts to set the sizes for these heaps (defaults to 64 and 32 GB in our code for each worker), I have found a lot of javascript OoM error, even though they seem to say that the heap is much smaller than the limits. This is set in the optinos for the constructor for the worker pool.

There were also cases where using a large worker pool and a single server, the main server can have OoM errors. This crashes the whole server and grinds the dolma conversion to a halt. Even with commandline arguments to set the size of the heap, this was still happening, again despite it seeming to not have much on the heap. When this happens, our load balancer stops routing traffic to this server and out start script brings a new version online. Once it is live it is added back to the pool.

These errors tend to happen on pages that have over 2 million characters.

## Settings

It seems to be fast to try to make sure that each server is currently working on 1 document and have already received a second document to be processed next. As the python code is syncronous, this means we need ~twice as many dolma processes as we have servers. Having extra python processes allows for the server to not have to wait for python string manipulataions.

On a Ryzen 9 7950X using 30 dolma processes and 16 servers, the whole system processes ~5.5k documents/second and takes ~4 hours and 15 mins to process wikipeadia + talk pages and the other mediawiki pages.
40 changes: 40 additions & 0 deletions wiki/parser/haproxy.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defaults
mode http
timeout client 10m
timeout connect 10m
timeout server 10m
timeout http-request 10m
balance leastconn

frontend stats
mode http
bind 127.0.0.1:8404
stats enable
stats uri /stats
stats refresh 5s
stats admin if LOCALHOST

frontend wtf
bind 127.0.0.1:5000
default_backend wtf_workers

backend wtf_workers
option httpchk
http-check send meth GET uri /health
http-check expect status 200
server wtf1 127.0.0.1:5001 check
server wtf2 127.0.0.1:5002 check
server wtf3 127.0.0.1:5003 check
server wtf4 127.0.0.1:5004 check
server wtf5 127.0.0.1:5005 check
server wtf6 127.0.0.1:5006 check
server wtf7 127.0.0.1:5007 check
server wtf8 127.0.0.1:5008 check
server wtf9 127.0.0.1:5009 check
server wtf10 127.0.0.1:5010 check
server wtf11 127.0.0.1:5011 check
server wtf12 127.0.0.1:5012 check
server wtf13 127.0.0.1:5013 check
server wtf14 127.0.0.1:5014 check
server wtf15 127.0.0.1:5015 check
server wtf16 127.0.0.1:5016 check
71 changes: 0 additions & 71 deletions wiki/parser/parser-old.js

This file was deleted.

59 changes: 52 additions & 7 deletions wiki/parser/parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,87 @@ const express = require("express");
// cli parsing
const { program } = require("commander");
const workerpool = require("workerpool");
const pool = workerpool.pool("./worker.js");

// Convert the cli argument into an actual int.
function parseIntArg(value, prev) {
const parsedValue = parseInt(value, 10);
if (isNaN(parsedValue)) {
throw new commander.InvalidArgumentError("Not an Int.")
}
return parsedValue;
}

// Parse CLI arguments
program
.option("--port <int>", "port", 3000)
.option("--port <int>", "port", parseIntArg, 3000)
.option("--host", "host", "localhost")
.option("--timeout <int>", "timeout (seconds)", 120)
.option("--timeout <int>", "timeout (seconds)", parseIntArg, 120)
.option("--maxworkers <int>", "max #workers in pool", parseIntArg, 1)
.parse();
const args = program.opts(process.argv);

// TODO: make pool settings configurable
console.log(`Starting worker pool with at most ${args.maxworkers} workers.`)
const pool = workerpool.pool("./worker.js", {
maxWorkers: args.maxworkers,
emitStdStreams: false,
workerThreadOpts: {
resourceLimits: {
maxOldGenerationSizeMb: 65536,
maxYoungGenerationSizeMb: 32768,
}}});

const app = express();

// TODO: How to set no size limit?
app.use(express.json({limit: "1000mb"}));
// This is an endpoint the load balancer and the runner script will hit to make
// sure the server is running. Sometime the main server and crash when multiple
// large document requests come in.
app.get("/health", async (req, res) => {
res.status(200).send("");
})
// Endpoint to parse wikitext.
app.post("/", async (req, res) => {
// Document comes as json {"wikitext": str, "id": str, "source": str}
const data = req.body;

console.log(`Parsing wikitext from document ${data['id']} of ${data['source']}`);

// var response = await pool.exec('wtf_parse', [data["wikitext"]]);
// Pass this document to the worker pool. Using a worker pool allows us to
// put a timeout on syncronous code (wtf_wikipedia) as the main server will
// run async and kill the worker if it is taking too long.
pool
// Run the parsing function `wtf_parse` in the worker file `worker.js`
.exec('wtf_parse', [data["wikitext"]])
// If the worker doesn't return a result in this time, an error is thrown
.timeout(args.timeout * 1000)
// When the worker returns, this is run
.then((response) => {
// Log finish and return parsed text.
console.log(`Finished parsing wikitext from document ${data['id']} of ${data['source']}`);
res.json(response);
})
// If there was an error in the worker,
.catch((err) => {
console.log(err.message);
// If this is a timeout error, set the status code.
if (err.message.indexOf("timed out") != -1) {
console.error(`Parsing wikitext from document ${data['id']} of ${data['source']} timed out.`)
// This is technaially for the server to send the client when the client has
// timed out, but there isn't a server side timeout code. 504 is for when the
// server is a proxy, not just long running.
res.status(408).json({ timeout: err.message });
// Log other errors, these are generally from the worker running out of
// memory
} else {
console.log(`~~~~~~~~~~ Error processing ${data['id']} of ${data['source']} ~~~~~~~~~~`);
console.error(err);
res.status(500).json({ error: "Internal server error"});
res.status(500).json({ error: err.message});
}
});

})
app.listen(args.port)
// Start the server.
app.listen(args.port, () => {
console.log(`Server started on port=${args.port} with timeout=${args.timeout} seconds.`)
})
34 changes: 34 additions & 0 deletions wiki/parser/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/usr/bin/env bash

NUMSERVERS=${1:-16}

function port {
local id=${1}
if [[ ${id} -ge 10 ]]; then
echo "50${id}"
else
echo "500${id}"
fi
}

function launch {
local id=${1}
node --max-old-space-size=65536 --max-semi-space-size=16384 parser.js --port $(port ${id}) --timeout 180 --maxworkers 1 >> ./logs/worker${id}.log 2>&1 &
}

function ping {
local id=${1}
echo $(curl -I -X GET localhost:$(port ${id})/health 2> /dev/null | head -n 1 | cut -d$" " -f2)
}

mkdir -p logs

while true; do
for i in $(seq 1 $NUMSERVERS); do
if [[ $(ping ${i}) -ne "200" ]]; then
echo "Worker ${i} not running, starting."
launch ${i}
fi
done
sleep 5
done
Loading

0 comments on commit a76f2fd

Please sign in to comment.