Skip to content

Commit

Permalink
add meta graph dataset.
Browse files Browse the repository at this point in the history
  • Loading branch information
Binh Vu committed Jan 19, 2024
1 parent 1ace2dc commit 7944c79
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 23 deletions.
10 changes: 2 additions & 8 deletions kgdata/wikidata/datasets/entity_types.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from __future__ import annotations

import orjson

from kgdata.dataset import Dataset
from kgdata.wikidata.config import WikidataDirCfg
from kgdata.wikidata.datasets.entities import entities
from kgdata.wikidata.models.wdentity import WDEntity
from sm.misc.funcs import filter_duplication


def entity_types() -> Dataset[tuple[str, list[str]]]:
Expand All @@ -31,10 +31,4 @@ def entity_types() -> Dataset[tuple[str, list[str]]]:


def get_instanceof(ent: WDEntity) -> tuple[str, list[str]]:
instanceof = "P31"
return (
ent.id,
list(
{stmt.value.as_entity_id_safe() for stmt in ent.props.get(instanceof, [])}
),
)
return (ent.id, filter_duplication(ent.instance_of()))
153 changes: 138 additions & 15 deletions kgdata/wikidata/datasets/meta_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,153 @@

from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, Iterable, List, Optional, Tuple, TypeAlias, Union

import orjson
from kgdata.dataset import Dataset
from kgdata.wikidata.config import WikidataDirCfg
from kgdata.wikidata.datasets.entities import entities
from kgdata.wikidata.datasets.entity_outlinks import entity_outlinks
from kgdata.wikidata.datasets.entity_types import entity_types
from kgdata.wikidata.models.wdentity import WDEntity
from kgdata.wikidata.models.wdvalue import WDValue, WDValueKind
from sm.misc.funcs import filter_duplication

# def get_meta_graph_dataset(with_dep: bool = True):
# cfg = WikidataDirCfg.get_instance()

# if with_dep:
# deps = [entities(), entity_types()]
# else:
# deps = []
def meta_graph():
cfg = WikidataDirCfg.get_instance()

# return Dataset(
# cfg.main_property_connections / "*.gz",
# deserialize=deser_connection,
# name="property-connections",
# dependencies=deps,
# )
ds = Dataset(
cfg.entity_types / "*.gz",
deserialize=orjson.loads,
name="entity-types",
dependencies=[entities(), entity_outlinks(), entity_types()],
)

if not ds.has_complete_data():

# def meta_graph():
# entity_outlinks()
def join_outlink_and_types(
g: tuple[str, tuple[Iterable[str], Optional[list[str]]]]
):
(target_ent_id, (source_ent_ids, target_ent_types)) = g
target_ent_types = target_ent_types or []
return [
(source_ent_id, (target_ent_id, target_ent_types))
for source_ent_id in source_ent_ids
]

def convert_wdvalue(value: WDValueKind) -> Optional[list[str]]:
if WDValue.is_entity_id(value):
return [value.as_entity_id()]
else:
return None

def get_raw_meta_entity(entity: WDEntity) -> tuple[str, MetaEntity]:
props = {}

for pid, stmts in entity.props.items():
meta_stmts = []
for stmt in stmts:
meta_stmts.append(
MetaStatement(
value=convert_wdvalue(stmt.value),
qualifiers={
k: [convert_wdvalue(v) for v in vs]
for k, vs in stmt.qualifiers.items()
},
)
)
props[pid] = meta_stmts
return entity.id, MetaEntity(
classes=filter_duplication(entity.instance_of()), props=props
)

def join_target_types_meta_entity(
g: tuple[str, tuple[Iterable[tuple[str, list[str]]], MetaEntity]]
) -> MetaEntity:
(entity_id, (target_ent_and_types, meta_entity)) = g
target_ent_and_types = list(target_ent_and_types)
map_target_ent_to_types = dict(target_ent_and_types)
assert len(map_target_ent_to_types) == len(target_ent_and_types)

return MetaEntity(
classes=meta_entity.classes,
props={
pid: [
MetaStatement(
value=map_target_ent_to_types[stmt.value[0]]
if stmt.value is not None
else None,
qualifiers={
k: [
map_target_ent_to_types[v[0]]
for v in vs
if v is not None
]
for k, vs in stmt.qualifiers.items()
},
)
for stmt in stmts
]
for pid, stmts in meta_entity.props.items()
},
)

(
entity_outlinks()
.get_extended_rdd()
.flatMap(lambda x: [(t, x.id) for t in x.targets])
.groupByKey()
.leftOuterJoin(entity_types().get_extended_rdd())
.flatMap(join_outlink_and_types)
.groupByKey()
.join(entities().get_extended_rdd().map(get_raw_meta_entity))
.map(join_target_types_meta_entity)
.map(lambda x: orjson.dumps(x.to_dict()))
.save_like_dataset(ds, auto_coalesce=True, shuffle=True)
)

return ds


@dataclass
class MetaEntity:
# list of classes that original entity is an instance of
classes: list[str]
props: dict[str, list[MetaStatement]]

def to_dict(self):
return {
"classes": self.classes,
"props": {k: [v.to_dict() for v in vals] for k, vals in self.props.items()},
}

@staticmethod
def from_dict(o):
return MetaEntity(
o["classes"],
{
k: [MetaStatement.from_dict(v) for v in vals]
for k, vals in o["props"].items()
},
)


# list of classes that target entity is an instance of, None if target entity is a literal
TargetMetaClass: TypeAlias = Optional[list[str]]


@dataclass
class MetaStatement:
value: TargetMetaClass # either class id or None (for literal)
qualifiers: dict[str, list[TargetMetaClass]]

def to_dict(self):
return {
"value": self.value,
"qualifiers": self.qualifiers,
}

@staticmethod
def from_dict(o):
return MetaStatement(o["value"], o["qualifiers"])

0 comments on commit 7944c79

Please sign in to comment.