From 7944c795b6f107e15ed9b899ade8dd39aa50cbfb Mon Sep 17 00:00:00 2001 From: Binh Vu Date: Fri, 19 Jan 2024 22:00:39 +0700 Subject: [PATCH] add meta graph dataset. --- kgdata/wikidata/datasets/entity_types.py | 10 +- kgdata/wikidata/datasets/meta_graph.py | 153 ++++++++++++++++++++--- 2 files changed, 140 insertions(+), 23 deletions(-) diff --git a/kgdata/wikidata/datasets/entity_types.py b/kgdata/wikidata/datasets/entity_types.py index 379643f..01ebc8f 100644 --- a/kgdata/wikidata/datasets/entity_types.py +++ b/kgdata/wikidata/datasets/entity_types.py @@ -1,11 +1,11 @@ from __future__ import annotations import orjson - from kgdata.dataset import Dataset from kgdata.wikidata.config import WikidataDirCfg from kgdata.wikidata.datasets.entities import entities from kgdata.wikidata.models.wdentity import WDEntity +from sm.misc.funcs import filter_duplication def entity_types() -> Dataset[tuple[str, list[str]]]: @@ -31,10 +31,4 @@ def entity_types() -> Dataset[tuple[str, list[str]]]: def get_instanceof(ent: WDEntity) -> tuple[str, list[str]]: - instanceof = "P31" - return ( - ent.id, - list( - {stmt.value.as_entity_id_safe() for stmt in ent.props.get(instanceof, [])} - ), - ) + return (ent.id, filter_duplication(ent.instance_of())) diff --git a/kgdata/wikidata/datasets/meta_graph.py b/kgdata/wikidata/datasets/meta_graph.py index c055228..d85ed72 100644 --- a/kgdata/wikidata/datasets/meta_graph.py +++ b/kgdata/wikidata/datasets/meta_graph.py @@ -2,30 +2,153 @@ from collections import defaultdict from dataclasses import dataclass -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, Iterable, List, Optional, Tuple, TypeAlias, Union import orjson from kgdata.dataset import Dataset from kgdata.wikidata.config import WikidataDirCfg from kgdata.wikidata.datasets.entities import entities +from kgdata.wikidata.datasets.entity_outlinks import entity_outlinks from kgdata.wikidata.datasets.entity_types import entity_types from kgdata.wikidata.models.wdentity import WDEntity +from kgdata.wikidata.models.wdvalue import WDValue, WDValueKind +from sm.misc.funcs import filter_duplication -# def get_meta_graph_dataset(with_dep: bool = True): -# cfg = WikidataDirCfg.get_instance() -# if with_dep: -# deps = [entities(), entity_types()] -# else: -# deps = [] +def meta_graph(): + cfg = WikidataDirCfg.get_instance() -# return Dataset( -# cfg.main_property_connections / "*.gz", -# deserialize=deser_connection, -# name="property-connections", -# dependencies=deps, -# ) + ds = Dataset( + cfg.entity_types / "*.gz", + deserialize=orjson.loads, + name="entity-types", + dependencies=[entities(), entity_outlinks(), entity_types()], + ) + if not ds.has_complete_data(): -# def meta_graph(): -# entity_outlinks() + def join_outlink_and_types( + g: tuple[str, tuple[Iterable[str], Optional[list[str]]]] + ): + (target_ent_id, (source_ent_ids, target_ent_types)) = g + target_ent_types = target_ent_types or [] + return [ + (source_ent_id, (target_ent_id, target_ent_types)) + for source_ent_id in source_ent_ids + ] + + def convert_wdvalue(value: WDValueKind) -> Optional[list[str]]: + if WDValue.is_entity_id(value): + return [value.as_entity_id()] + else: + return None + + def get_raw_meta_entity(entity: WDEntity) -> tuple[str, MetaEntity]: + props = {} + + for pid, stmts in entity.props.items(): + meta_stmts = [] + for stmt in stmts: + meta_stmts.append( + MetaStatement( + value=convert_wdvalue(stmt.value), + qualifiers={ + k: [convert_wdvalue(v) for v in vs] + for k, vs in stmt.qualifiers.items() + }, + ) + ) + props[pid] = meta_stmts + return entity.id, MetaEntity( + classes=filter_duplication(entity.instance_of()), props=props + ) + + def join_target_types_meta_entity( + g: tuple[str, tuple[Iterable[tuple[str, list[str]]], MetaEntity]] + ) -> MetaEntity: + (entity_id, (target_ent_and_types, meta_entity)) = g + target_ent_and_types = list(target_ent_and_types) + map_target_ent_to_types = dict(target_ent_and_types) + assert len(map_target_ent_to_types) == len(target_ent_and_types) + + return MetaEntity( + classes=meta_entity.classes, + props={ + pid: [ + MetaStatement( + value=map_target_ent_to_types[stmt.value[0]] + if stmt.value is not None + else None, + qualifiers={ + k: [ + map_target_ent_to_types[v[0]] + for v in vs + if v is not None + ] + for k, vs in stmt.qualifiers.items() + }, + ) + for stmt in stmts + ] + for pid, stmts in meta_entity.props.items() + }, + ) + + ( + entity_outlinks() + .get_extended_rdd() + .flatMap(lambda x: [(t, x.id) for t in x.targets]) + .groupByKey() + .leftOuterJoin(entity_types().get_extended_rdd()) + .flatMap(join_outlink_and_types) + .groupByKey() + .join(entities().get_extended_rdd().map(get_raw_meta_entity)) + .map(join_target_types_meta_entity) + .map(lambda x: orjson.dumps(x.to_dict())) + .save_like_dataset(ds, auto_coalesce=True, shuffle=True) + ) + + return ds + + +@dataclass +class MetaEntity: + # list of classes that original entity is an instance of + classes: list[str] + props: dict[str, list[MetaStatement]] + + def to_dict(self): + return { + "classes": self.classes, + "props": {k: [v.to_dict() for v in vals] for k, vals in self.props.items()}, + } + + @staticmethod + def from_dict(o): + return MetaEntity( + o["classes"], + { + k: [MetaStatement.from_dict(v) for v in vals] + for k, vals in o["props"].items() + }, + ) + + +# list of classes that target entity is an instance of, None if target entity is a literal +TargetMetaClass: TypeAlias = Optional[list[str]] + + +@dataclass +class MetaStatement: + value: TargetMetaClass # either class id or None (for literal) + qualifiers: dict[str, list[TargetMetaClass]] + + def to_dict(self): + return { + "value": self.value, + "qualifiers": self.qualifiers, + } + + @staticmethod + def from_dict(o): + return MetaStatement(o["value"], o["qualifiers"])