diff --git a/kgdata/wikidata/datasets/entity_pagerank.py b/kgdata/wikidata/datasets/entity_pagerank.py index 8a15ff1..3efafbe 100644 --- a/kgdata/wikidata/datasets/entity_pagerank.py +++ b/kgdata/wikidata/datasets/entity_pagerank.py @@ -65,8 +65,15 @@ def entity_pagerank() -> Dataset[EntityPageRank]: .sortBy(lambda x: (x[0], int(x[1:]))) # type: ignore .zipWithIndex() .map(tab_ser) - .save_like_dataset(idmap_ds, checksum=False) + .save_like_dataset( + idmap_ds, + checksum=False, + auto_coalesce=True, + max_num_partitions=512, + ) ) + # write the total number of entity + (cfg.entity_pagerank / "idmap.txt").write_text(str(idmap_ds.get_rdd().count())) edges_dataset = Dataset( cfg.entity_pagerank / "graph/*.gz", @@ -137,12 +144,10 @@ def create_edges_npy(infiles: List[str], outfile: str): ) if not pagerank_ds.has_complete_data(): assert does_result_dir_exist( - cfg.entity_pagerank / "graphtool_pagerank_en", allow_override=False + cfg.entity_pagerank / "graphtool_pagerank", allow_override=False ), "Must run graph-tool pagerank at `kgdata/scripts/pagerank_v2.py` first" - n_files = len( - glob(str(cfg.entity_pagerank / "graphtool_pagerank_en" / "*.npz")) - ) + n_files = len(glob(str(cfg.entity_pagerank / "graphtool_pagerank" / "*.npz"))) def deserialize_np(dat: bytes) -> List[Tuple[int, float]]: f = BytesIO(dat) @@ -158,7 +163,7 @@ def process_join( ( ExtendedRDD.binaryFiles( - cfg.entity_pagerank / "graphtool_pagerank_en" / "*.npz", + cfg.entity_pagerank / "graphtool_pagerank" / "*.npz", ) .repartition(n_files) .flatMap(lambda x: deserialize_np(x[1])) @@ -170,9 +175,7 @@ def process_join( pagerank_stat_outfile = cfg.entity_pagerank / f"pagerank.pkl" if not pagerank_stat_outfile.exists(): - n_files = len( - glob(str(cfg.entity_pagerank / "graphtool_pagerank_en" / "*.npz")) - ) + n_files = len(glob(str(cfg.entity_pagerank / "graphtool_pagerank" / "*.npz"))) def deserialize_np2(dat: bytes) -> np.ndarray: f = BytesIO(dat) @@ -182,7 +185,7 @@ def deserialize_np2(dat: bytes) -> np.ndarray: rdd = ( get_spark_context() .binaryFiles( - str(cfg.entity_pagerank / "graphtool_pagerank_en" / "*.npz"), + str(cfg.entity_pagerank / "graphtool_pagerank" / "*.npz"), ) .repartition(n_files) .map(lambda x: deserialize_np2(x[1])) diff --git a/pyproject.toml b/pyproject.toml index 7bb8cfa..43496a0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "kgdata" -version = "5.0.0a6" +version = "5.0.0a7" description = "Library to process dumps of knowledge graphs (Wikipedia, DBpedia, Wikidata)" readme = "README.md" authors = [{ name = "Binh Vu", email = "binh@toan2.com" }] diff --git a/scripts/pagerank_v2.py b/scripts/pagerank_v2.py index 84ca707..17a1710 100644 --- a/scripts/pagerank_v2.py +++ b/scripts/pagerank_v2.py @@ -11,12 +11,12 @@ wd_dir = Path(os.environ["WD_DIR"]) working_dir = wd_dir / "entity_pagerank" -pagerank_outdir = working_dir / "graphtool_pagerank_en" -edge_files = sorted(glob.glob(str((working_dir / "graphtool_en" / "part-*.npz")))) +pagerank_outdir = working_dir / "graphtool_pagerank" +edge_files = sorted(glob.glob(str((working_dir / "graphtool" / "part-*.npz")))) logger.info("Creating graph from data...") g = Graph(directed=True) -n_vertices = int((working_dir / "idmap_en.txt").read_text()) +n_vertices = int((working_dir / "idmap.txt").read_text()) g.add_vertex(n=n_vertices) logger.info("Creating graph... added vertices")