diff --git a/kgdata/dataset.py b/kgdata/dataset.py index 6aec872..f31ac6b 100644 --- a/kgdata/dataset.py +++ b/kgdata/dataset.py @@ -32,7 +32,7 @@ from kgdata.config import init_dbdir_from_env from kgdata.misc.query import PropQuery, every from kgdata.spark import ExtendedRDD, SparkLikeInterface, get_spark_context -from kgdata.spark.common import does_result_dir_exist +from kgdata.spark.common import does_result_dir_exist, text_file from kgdata.spark.extended_rdd import DatasetSignature V = TypeVar("V") @@ -95,7 +95,7 @@ def get_files( return files def get_rdd(self) -> RDD[T_co]: - rdd = get_spark_context().textFile(str(self.file_pattern)) + rdd = text_file(Path(self.file_pattern)) if self.prefilter is not None: rdd = rdd.filter(self.prefilter) diff --git a/kgdata/dbpedia/datasets/entity_types.py b/kgdata/dbpedia/datasets/entity_types.py index b1db326..ab4dce1 100644 --- a/kgdata/dbpedia/datasets/entity_types.py +++ b/kgdata/dbpedia/datasets/entity_types.py @@ -24,7 +24,6 @@ def entity_types(lang: str = "en") -> Dataset[tuple[str, list[str]]]: .get_extended_rdd() .map(get_instanceof) .map(orjson.dumps) - .auto_coalesce(cache=True) .save_like_dataset(ds, auto_coalesce=True, shuffle=True) ) diff --git a/kgdata/dbpedia/datasets/generic_extractor_dump.py b/kgdata/dbpedia/datasets/generic_extractor_dump.py index 7d69b7b..5a2b243 100644 --- a/kgdata/dbpedia/datasets/generic_extractor_dump.py +++ b/kgdata/dbpedia/datasets/generic_extractor_dump.py @@ -41,7 +41,7 @@ def generic_extractor_dump(lang: str = "en") -> Dataset[RDFResource]: ) ( - ExtendedRDD.textFile(str(split_dump_dir / "*/*.gz")) + ExtendedRDD.textFile(split_dump_dir / "*/*.gz") .filter(ignore_comment) .map(ntriple_loads) .groupBy(lambda x: x[0]) diff --git a/kgdata/dbpedia/datasets/mapping_extractor_dump.py b/kgdata/dbpedia/datasets/mapping_extractor_dump.py index e6644d5..5457be8 100644 --- a/kgdata/dbpedia/datasets/mapping_extractor_dump.py +++ b/kgdata/dbpedia/datasets/mapping_extractor_dump.py @@ -40,7 +40,7 @@ def mapping_extractor_dump(lang: str = "en") -> Dataset[RDFResource]: ) ( - ExtendedRDD.textFile(str(split_dump_dir / "*/*.gz")) + ExtendedRDD.textFile(split_dump_dir / "*/*.gz") .filter(ignore_comment) .map(ntriple_loads) .groupBy(lambda x: x[0]) diff --git a/kgdata/dbpedia/datasets/ontology_dump.py b/kgdata/dbpedia/datasets/ontology_dump.py index dbb3763..04659ed 100644 --- a/kgdata/dbpedia/datasets/ontology_dump.py +++ b/kgdata/dbpedia/datasets/ontology_dump.py @@ -6,13 +6,14 @@ from functools import lru_cache from typing import Any, Callable, Iterable +from rdflib import OWL, RDF, RDFS, BNode, URIRef + from kgdata.dataset import Dataset from kgdata.dbpedia.config import DBpediaDirCfg from kgdata.misc.ntriples_parser import Triple, ignore_comment, ntriple_loads from kgdata.misc.resource import RDFResource from kgdata.spark import ExtendedRDD from kgdata.splitter import split_a_file, split_a_list -from rdflib import OWL, RDF, RDFS, BNode, URIRef rdf_type = str(RDF.type) rdfs_label = str(RDFS.label) @@ -65,7 +66,7 @@ def ontology_dump() -> Dataset[RDFResource]: ) ( - ExtendedRDD.textFile(str(step1_dir / "*.gz")) + ExtendedRDD.textFile(step1_dir / "*.gz") .filter(ignore_comment) .map(ntriple_loads) .groupBy(lambda x: x[0]) diff --git a/kgdata/dbpedia/datasets/redirection_dump.py b/kgdata/dbpedia/datasets/redirection_dump.py index af2e8cf..29c5965 100644 --- a/kgdata/dbpedia/datasets/redirection_dump.py +++ b/kgdata/dbpedia/datasets/redirection_dump.py @@ -29,7 +29,7 @@ def redirection_dump(lang: str = "en"): ) ( - ExtendedRDD.textFile(str(cfg.redirection_dump / f"raw-{lang}/*.gz")) + ExtendedRDD.textFile(cfg.redirection_dump / f"raw-{lang}/*.gz") .filter(ignore_comment) .map(ntriple_loads) .map(norm_redirection) # extracted redirection (source -> target) diff --git a/kgdata/misc/funcs.py b/kgdata/misc/funcs.py index 6bbe4d4..74e1839 100644 --- a/kgdata/misc/funcs.py +++ b/kgdata/misc/funcs.py @@ -1,7 +1,7 @@ from __future__ import annotations import importlib -from io import BytesIO +from io import BufferedReader, BytesIO, TextIOWrapper from typing import Type import zstandard as zstd @@ -46,4 +46,6 @@ def import_attr(attr_ident: str): def deser_zstd_records(dat: bytes): cctx = zstd.ZstdDecompressor() datobj = BytesIO(dat) - return [x.decode() for x in cctx.stream_reader(datobj).readall().splitlines()] + # readlines will result in an extra \n at the end + # we do not want this because it's different from spark implementation + return cctx.stream_reader(datobj).readall().splitlines() diff --git a/kgdata/spark/common.py b/kgdata/spark/common.py index 37faad3..158d1bc 100644 --- a/kgdata/spark/common.py +++ b/kgdata/spark/common.py @@ -1,6 +1,7 @@ """Utility functions for Apache Spark.""" from __future__ import annotations +import hashlib import math import os import random @@ -20,6 +21,7 @@ Tuple, TypeVar, Union, + cast, ) import orjson @@ -32,7 +34,7 @@ # SparkContext singleton _sc = None - +StrPath = Union[Path, str] R1 = TypeVar("R1") R2 = TypeVar("R2") @@ -506,26 +508,47 @@ def save_as_text_file( def save_partition(partition: Iterable[str] | Iterable[bytes]): partition_id = assert_not_null(TaskContext.get()).partitionId() - lst = [] it = iter(partition) first_val = next(it, None) + if first_val is None: + # empty partition + return + + lst = [] + if isinstance(first_val, str): - lst.append(first_val.encode()) + first_val = first_val.encode() + if not first_val.endswith(b"\n"): + lst.append(first_val + b"\n") + else: + lst.append(first_val) + for x in it: - lst.append(x.encode()) # type: ignore + x = x.encode() # type: ignore + if not x.endswith(b"\n"): + x = x + b"\n" + lst.append(x) else: - lst.append(first_val) + if not first_val.endswith(b"\n"): + lst.append(first_val + b"\n") + else: + lst.append(first_val) for x in it: + if not x.endswith(b"\n"): + x = x + b"\n" lst.append(x) - datasize = sum(len(x) + 1 for x in lst) # 1 for newline + + lst[-1] = lst[-1][:-1] # exclude last \n + + datasize = sum(len(x) for x in lst) cctx = zstd.ZstdCompressor(level=compression_level, write_content_size=True) with open(outdir / f"part-{partition_id:05d}.zst", "wb") as fh: with cctx.stream_writer(fh, size=datasize) as f: - for record in lst: - f.write(record) - f.write(b"\n") + for x in lst: + f.write(x) + outdir.mkdir(parents=True, exist_ok=True) rdd.foreachPartition(save_partition) (outdir / "_SUCCESS").touch() return @@ -534,7 +557,7 @@ def save_partition(partition: Iterable[str] | Iterable[bytes]): def text_file( - filepattern: Path, min_partitions: Optional[int] = None, use_unicode: bool = True + filepattern: StrPath, min_partitions: Optional[int] = None, use_unicode: bool = True ): """Drop-in replacement for SparkContext.textFile that supports zstd files.""" filepattern = Path(filepattern) @@ -546,15 +569,64 @@ def text_file( for file in filepattern.iterdir() ) ) or filepattern.name.endswith(".zst"): + if filepattern.is_dir(): + n_parts = sum( + 1 for file in filepattern.iterdir() if file.name.startswith("part-") + ) + else: + n_parts = sum(1 for _ in filepattern.parent.glob("*.zst")) + return ( get_spark_context() - .binaryFiles(str(filepattern), min_partitions) - .flatMap(lambda x: deser_zstd_records(x[1])) + .binaryFiles(str(filepattern)) + .repartition(n_parts) + .flatMap(lambda x: deser_zstd_records(x[1]), preservesPartitioning=True) ) return get_spark_context().textFile(str(filepattern), min_partitions, use_unicode) +def diff_rdd(rdd1: RDD[str], rdd2: RDD[str], key: Callable[[str], str]): + """Compare content of two RDDs + + Parameters + ---------- + rdd1 : RDD[str] + first RDD + rdd2 : RDD[str] + second RDD + key : Callable[[str], str] + function that extract key from a record + + Returns + ------- + RDD[str] + records that are in rdd1 but not in rdd2 + """ + + def convert(x): + k = key(x) + if not isinstance(x, bytes): + x = x.encode() + return k, hashlib.sha256(x).digest().hex() + + max_size = 100 + records = ( + rdd1.map(convert) + .fullOuterJoin(rdd2.map(convert)) + .filter(lambda x: x[1][0] != x[1][1]) + .take(max_size) + ) + if len(records) == 0: + print("No difference") + return + print( + f"Found {'at least' if len(records) >= max_size else ''} {len(records)} difference:" + ) + for r in records: + print(r[0], r[1][0], r[1][1]) + + @dataclass class EmptyBroadcast(Generic[V]): value: V diff --git a/kgdata/spark/extended_rdd.py b/kgdata/spark/extended_rdd.py index 05f81d0..eea3729 100644 --- a/kgdata/spark/extended_rdd.py +++ b/kgdata/spark/extended_rdd.py @@ -28,6 +28,7 @@ from kgdata.misc.funcs import deser_zstd_records from kgdata.spark.common import ( + StrPath, are_records_unique, estimate_num_partitions, get_spark_context, @@ -57,7 +58,6 @@ def __lt__(self, other: SupportsOrdering) -> bool: S = TypeVar("S", bound=SupportsOrdering) -StrPath = Union[Path, str] NEW_DATASET_NAME = "__new__" NO_CHECKSUM = (b"\x00" * 32).hex() @@ -335,9 +335,11 @@ def save_as_dataset( save_as_text_file(self.rdd, Path(outdir), compression, compression_level) else: tmp_dir = str(outdir) + "_tmp" + if os.path.exists(tmp_dir): + shutil.rmtree(tmp_dir) save_as_text_file(self.rdd, Path(tmp_dir), compression, compression_level) - rdd = text_file(Path(tmp_dir)) + rdd = text_file(tmp_dir) num_partitions = math.ceil( sum((os.path.getsize(file) for file in glob.glob(tmp_dir + "/part-*"))) / partition_size @@ -356,9 +358,7 @@ def save_as_dataset( name = name or os.path.basename(outdir) if checksum: # compute checksum and save it to a file -- reload from the file so we do not have to process the data again. - ds_checksum = ExtendedRDD( - get_spark_context().textFile(outdir), self.sig - ).hash() + ds_checksum = ExtendedRDD(text_file(outdir), self.sig).hash() else: ds_checksum = b"\x00" * 32 @@ -543,6 +543,9 @@ def parallelize( def take(self: ExtendedRDD[T], num: int) -> list[T]: return self.rdd.take(num) + def count(self) -> int: + return self.rdd.count() + def union(self: ExtendedRDD[T], other: ExtendedRDD[U]) -> ExtendedRDD[T | U]: return ExtendedRDD(self.rdd.union(other.rdd), self.sig.use(other.sig)) diff --git a/kgdata/wikidata/datasets/entities.py b/kgdata/wikidata/datasets/entities.py index 3b2b3ed..b59d98a 100644 --- a/kgdata/wikidata/datasets/entities.py +++ b/kgdata/wikidata/datasets/entities.py @@ -58,7 +58,7 @@ def entities(lang: str = "en") -> Dataset[WDEntity]: dependencies=[entity_dump()], ) fixed_ds = Dataset( - cfg.entities / lang / "*.gz", + cfg.entities / lang / "*.zst", deserialize=deser_entity, name=f"entities/{lang}/fixed", dependencies=[entity_dump(), entity_ids(), entity_redirections()], @@ -130,6 +130,7 @@ def entities(lang: str = "en") -> Dataset[WDEntity]: auto_coalesce=True, shuffle=True, trust_dataset_dependencies=True, + compression_level=9, ) ) need_verification = True @@ -300,5 +301,14 @@ def extract_invalid_qualifier(ent: WDEntity) -> Optional[WDEntity]: if __name__ == "__main__": - WikidataDirCfg.init("~/kgdata/wikidata/20211213") + WikidataDirCfg.init("/var/tmp/kgdata/wikidata/20230619") + # from sm.misc.ray_helper import ray_map + + # def deser(infile): + # (infile, "rb") + + # ray_map( + # (WikidataDirCfg.get_instance().entities / "en").glob("*.zst"), + # ) + print("Total:", entities().get_rdd().count()) diff --git a/kgdata/wikidata/datasets/entity_dump.py b/kgdata/wikidata/datasets/entity_dump.py index b903a5b..37a47d6 100644 --- a/kgdata/wikidata/datasets/entity_dump.py +++ b/kgdata/wikidata/datasets/entity_dump.py @@ -60,4 +60,4 @@ def _record_postprocess(record: str): if __name__ == "__main__": WikidataDirCfg.init("/var/tmp/kgdata/wikidata/20230619") - entity_dump() + print(entity_dump().get_extended_rdd().count()) diff --git a/kgdata/wikidata/datasets/entity_redirections.py b/kgdata/wikidata/datasets/entity_redirections.py index be467d7..f2d4b2a 100644 --- a/kgdata/wikidata/datasets/entity_redirections.py +++ b/kgdata/wikidata/datasets/entity_redirections.py @@ -152,6 +152,7 @@ def entity_redirections() -> Dataset[tuple[str, str]]: dataset=unk_target_ds, checksum=False, auto_coalesce=True, + trust_dataset_dependencies=True, ) ) diff --git a/kgdata/wikidata/datasets/meta_graph.py b/kgdata/wikidata/datasets/meta_graph.py index d85ed72..d84795b 100644 --- a/kgdata/wikidata/datasets/meta_graph.py +++ b/kgdata/wikidata/datasets/meta_graph.py @@ -5,6 +5,8 @@ from typing import Dict, Iterable, List, Optional, Tuple, TypeAlias, Union import orjson +from sm.misc.funcs import filter_duplication + from kgdata.dataset import Dataset from kgdata.wikidata.config import WikidataDirCfg from kgdata.wikidata.datasets.entities import entities @@ -12,16 +14,15 @@ from kgdata.wikidata.datasets.entity_types import entity_types from kgdata.wikidata.models.wdentity import WDEntity from kgdata.wikidata.models.wdvalue import WDValue, WDValueKind -from sm.misc.funcs import filter_duplication def meta_graph(): cfg = WikidataDirCfg.get_instance() ds = Dataset( - cfg.entity_types / "*.gz", + cfg.meta_graph / "*.gz", deserialize=orjson.loads, - name="entity-types", + name="meta-graph", dependencies=[entities(), entity_outlinks(), entity_types()], ) diff --git a/scripts/aggregate_datasets_signature.py b/scripts/aggregate_datasets_signature.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/build.sh b/scripts/build.sh index 2d688ff..81adcf1 100644 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -75,7 +75,7 @@ wikidata_dataset entity_outlinks wikidata_dataset entity_pagerank wikidata_dataset entity_wiki_aliases -wikidata_dataset main_property_connections +wikidata_dataset meta_graph # ====================================================================== # WIKIPEDIA Datasets diff --git a/scripts/compare_dataset.py b/scripts/compare_dataset.py new file mode 100644 index 0000000..59d8920 --- /dev/null +++ b/scripts/compare_dataset.py @@ -0,0 +1,92 @@ +# %% + +import hashlib +from pathlib import Path + +import orjson + +from kgdata.spark.common import diff_rdd, get_spark_context, text_file + +files1 = "/var/tmp/kgdata/wikidata/20230619/024_entity_types/*.gz" +files2 = "/data/binhvu/data/wikidata/20230619/024_entity_types/*.gz" + + +# %% + + +def norm(x): + a, ts = orjson.loads(x) + return orjson.dumps([a, sorted(ts)]) + + +rdd1 = text_file(files1).map(norm) +rdd2 = text_file(files2).map(norm) + +diff_rdd(rdd1, rdd2, lambda x: orjson.loads(x)[0]) +# %% + +val1 = rdd1.map(orjson.loads).filter(lambda x: x[0] == "Q98130237").take(1) +val2 = rdd2.map(orjson.loads).filter(lambda x: x[0] == "Q98130237").take(1) + +# %% + + +def convert(x): + key = orjson.loads(x)["id"] + return key, x + + +def hash(self): + """Hash the RDD. To get a commutative hash, we use add function with little worry about hashing items to zero. + + Reference: https://kevinventullo.com/2018/12/24/hashing-unordered-sets-how-far-will-cleverness-take-you/ + """ + + zero = (0).to_bytes(32, byteorder="little") + maxint = (2**256 - 1).to_bytes(32, byteorder="little") + + def hash(line: str | bytes): + if isinstance(line, str): + line = line.encode() + + return hashlib.sha256(line).digest() + + def sum_hash(hash1: bytes, hash2: bytes): + val = int.from_bytes(hash1, byteorder="little", signed=False) + int.from_bytes( + hash2, byteorder="little", signed=False + ) + return ( + val % int.from_bytes(maxint, byteorder="little", signed=False) + ).to_bytes(32, byteorder="little") + + return self.map(hash).fold(zero, sum_hash) + + +# print(text_file(files2).count()) +# %% +filepattern = Path(files1) +if filepattern.is_dir(): + n_parts = sum(1 for file in filepattern.iterdir() if file.name.startswith("part-")) +else: + n_parts = sum(1 for _ in filepattern.parent.glob("*.zst")) + +# %% +text_file(files1).getNumPartitions() + +# %% + +rdd2 = text_file(files2).map(convert) +rdd1 = text_file(files1).map(convert) + +lst1 = rdd1.take(1000) +lst2 = rdd2.take(1000) + +# %% + +dict1 = dict(lst1) +dict2 = dict(lst2) +# %% + +set(dict1.keys()).intersection(set(dict2.keys())) + +# %% diff --git a/scripts/download-data.ipynb b/scripts/download-data.ipynb index 1f03018..557d526 100644 --- a/scripts/download-data.ipynb +++ b/scripts/download-data.ipynb @@ -435,13 +435,23 @@ " wget.monitor()" ] }, + { + "cell_type": "markdown", + "id": "9ec0c028", + "metadata": {}, + "source": [ + "convert bz2 to zst in parallel for faster decompressing" + ] + }, { "cell_type": "code", "execution_count": null, "id": "3b74a6c3-8238-433d-8a82-e2dc5a16a9b8", "metadata": {}, "outputs": [], - "source": [] + "source": [ + "lbzip2 -cd wikidata-20240101-all.json.bz2 | zstd -9 -o wikidata-20240101-all.json.zst" + ] } ], "metadata": {