Skip to content

Commit

Permalink
update mapreduce & scripts to get representative value of a class
Browse files Browse the repository at this point in the history
  • Loading branch information
Binh Vu committed Aug 29, 2023
1 parent 4fe6a3b commit 744893f
Show file tree
Hide file tree
Showing 17 changed files with 555 additions and 101 deletions.
22 changes: 22 additions & 0 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"version": "2.0.0",
"tasks": [
{
"type": "cargo",
"command": "run",
"args": [
"--package",
"kgdata",
"--bin",
"kgdata",
"--features",
"pyo3/auto-initialize"
],
"problemMatcher": [
"$rustc"
],
"group": "build",
"label": "rust: run kgdata"
}
]
}
17 changes: 13 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ flate2 = { version = "1.0.24", features = [
"zlib-ng",
], default-features = false }
glob = "0.3.1"
hashbrown = { version = "0.13.2", features = ["serde"] }
hashbrown = { version = "0.13.2", features = ["serde", "rayon"] }
log = "0.4.17"
ord_subset = "3.1.1"
petgraph = "0.6.3"
pyo3 = { version = "0.19.1", features = ["anyhow", "multiple-pymethods"] }
rayon = "1.5.3"
pyo3 = { version = "0.19.1", features = [
"anyhow",
"multiple-pymethods",
"hashbrown",
] }
rayon = "1.7.0"
rocksdb = "0.20.1"
serde = { version = "1.0.137", features = ["derive"] }
serde_json = "1.0.81"
Expand Down
File renamed without changes.
2 changes: 2 additions & 0 deletions data/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*
!.gitignore
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ pub fn into_pyerr<E: Into<KGDataError>>(err: E) -> PyErr {
anyerror.into()
}
}

pub type KGResult<T> = Result<T, KGDataError>;
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ fn core(py: Python<'_>, m: &PyModule) -> PyResult<()> {

m.add_function(wrap_pyfunction!(init_env_logger, m)?)?;
python::models::register(py, m)?;
m.add_class::<python::scripts::GetRepresentativeValue>()?;

Ok(())
}
23 changes: 23 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use anyhow::Result;
use hashbrown::HashSet;
use kgdata::mapreduce::*;
use kgdata::python::scripts::GetRepresentativeValue;
use kgdata::{error::into_pyerr, mapreduce::from_jl_files, python::scripts::EntityTypesAndDegrees};
use pyo3::prelude::*;

fn main() -> PyResult<()> {
let args = GetRepresentativeValue {
data_dir: "/Volumes/research/kgdata/data/dbpedia/20221201".to_string(),
class_ids: HashSet::from_iter(vec!["http://dbpedia.org/ontology/Person".to_string()]),
kgname: "dbpedia".to_string(),
topk: 1000,
};

// Python::with_gil(|py| {
// let res = GetRepresentativeValue::calculate_stats(py, &args).unwrap();
// println!("{:?}", res);
// });

println!("Hello, world!");
Ok(())
}
73 changes: 70 additions & 3 deletions src/mapreduce/dataset.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,48 @@
use std::path::PathBuf;

use core::hash::Hash;
use hashbrown::HashMap;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use std::path::PathBuf;

use crate::error::KGDataError;

use super::{FromParallelDataset, ParallelDataset};

pub struct Dataset<I> {
items: Vec<I>,
pub items: Vec<I>,
}

pub struct MapDataset<K, V>
where
K: Hash + Eq + Send,
V: Send,
{
pub map: HashMap<K, V>,
}

pub struct RefDataset<'t, I> {
pub items: &'t Vec<I>,
}

impl<I> ParallelDataset for Dataset<I> where I: Send {}

impl<'t, I> ParallelDataset for RefDataset<'t, I> where I: Sync + 't {}

impl<'t, I> RefDataset<'t, I>
where
I: 't,
{
pub fn new(items: &'t Vec<I>) -> Self {
Self { items }
}
}

impl<K, V> ParallelDataset for MapDataset<K, V>
where
K: Hash + Eq + Send,
V: Send,
{
}

impl<I> IntoParallelIterator for Dataset<I>
where
I: Send,
Expand All @@ -24,6 +55,31 @@ where
}
}

impl<'t, I> IntoParallelIterator for RefDataset<'t, I>
where
I: Sync + 't,
{
type Iter = rayon::slice::Iter<'t, I>;
type Item = &'t I;

fn into_par_iter(self) -> Self::Iter {
self.items.into_par_iter()
}
}

impl<K, V> IntoParallelIterator for MapDataset<K, V>
where
K: Hash + Eq + Send,
V: Send,
{
type Iter = hashbrown::hash_map::rayon::IntoParIter<K, V>;
type Item = (K, V);

fn into_par_iter(self) -> Self::Iter {
self.map.into_par_iter()
}
}

impl Dataset<PathBuf> {
pub fn files(glob: &str) -> Result<Self, KGDataError> {
let items = glob::glob(glob)?
Expand Down Expand Up @@ -61,3 +117,14 @@ where
Ok(Dataset { items })
}
}

impl<I> FromIterator<I> for Dataset<I> {
fn from_iter<T>(iter: T) -> Self
where
T: IntoIterator<Item = I>,
{
Self {
items: iter.into_iter().collect::<Vec<_>>(),
}
}
}
34 changes: 34 additions & 0 deletions src/mapreduce/foldop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use rayon::prelude::*;

use super::ParallelDataset;

#[derive(Clone)]
pub struct FoldOp<D: ParallelDataset, ID, F> {
pub base: D,
pub identity: ID,
pub op: F,
}

impl<T, D, ID, F> IntoParallelIterator for FoldOp<D, ID, F>
where
D: ParallelDataset,
F: (Fn(T, D::Item) -> T) + Sync + Send,
ID: Fn() -> T + Sync + Send,
T: Send,
{
type Iter = rayon::iter::Fold<D::Iter, ID, F>;
type Item = T;

fn into_par_iter(self) -> Self::Iter {
self.base.into_par_iter().fold(self.identity, self.op)
}
}

impl<D, T, ID, F> ParallelDataset for FoldOp<D, ID, F>
where
D: ParallelDataset,
F: (Fn(T, D::Item) -> T) + Sync + Send,
ID: Fn() -> T + Sync + Send,
T: Send,
{
}
64 changes: 55 additions & 9 deletions src/mapreduce/functions.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,47 @@
use std::{fs::File, io::BufRead, io::BufReader, path::PathBuf};
use std::{ffi::OsStr, fs::File, io::BufRead, io::BufReader, path::PathBuf};

use rayon::prelude::*;
use flate2::read::GzDecoder;
use serde::Deserialize;

use crate::error::KGDataError;

use super::*;

pub fn make_try_flat_map_fn<T, E, F, OPI>(func: F) -> impl Fn(T) -> Vec<Result<OPI::Item, E>>
pub fn make_begin_try_flat_map_fn<I, E, F, OPI>(func: F) -> impl Fn(I) -> Vec<Result<OPI::Item, E>>
where
F: Fn(T) -> Result<OPI, E>,
OPI: IntoParallelIterator,
F: Fn(I) -> Result<OPI, E>,
OPI: IntoIterator,
E: Send,
{
move |value| {
let out = func(value);
match out {
Ok(v) => v.into_par_iter().map(Ok).collect::<Vec<_>>(),
Ok(v) => v.into_iter().map(Ok).collect::<Vec<_>>(),
Err(e) => vec![Err(e)],
}
}
}

pub fn make_try_flat_map_fn<I, E, F, OPI>(
func: F,
) -> impl Fn(Result<I, E>) -> Vec<Result<OPI::Item, E>>
where
F: Fn(I) -> Result<OPI, E>,
OPI: IntoIterator,
E: Send,
{
move |value| match value {
Ok(value) => {
let out = func(value);
match out {
Ok(v) => v.into_iter().map(Ok).collect::<Vec<_>>(),
Err(e) => vec![Err(e)],
}
}
Err(e) => vec![Err(e)],
}
}

pub fn make_try_fn<I, O, E, F>(func: F) -> impl Fn(Result<I, E>) -> Result<O, E>
where
F: Fn(I) -> Result<O, E>,
Expand Down Expand Up @@ -50,19 +70,45 @@ pub fn from_jl_files<T>(
where
for<'de> T: Deserialize<'de> + Send,
{
let ds = Dataset::files(glob)?.flat_map(make_try_flat_map_fn(deser_json_lines));
let ds = Dataset::files(glob)?.flat_map(make_begin_try_flat_map_fn(deser_json_lines));
Ok(ds)
}

fn deser_json_lines<T>(path: PathBuf) -> Result<Vec<T>, KGDataError>
pub fn deser_json_lines<T>(path: PathBuf) -> Result<Vec<T>, KGDataError>
where
for<'de> T: Deserialize<'de>,
{
let ext = path.extension().and_then(OsStr::to_str).map(str::to_owned);
let file = File::open(path)?;
let reader = BufReader::new(file);

let reader: Box<dyn BufRead> = if let Some(ext) = ext {
match ext.as_str() {
"gz" => Box::new(BufReader::new(GzDecoder::new(file))),
_ => unimplemented!(),
}
} else {
Box::new(BufReader::new(file))
};
reader
.lines()
.map(|line| serde_json::from_str::<T>(&line?).map_err(|err| err.into()))
.collect::<Result<Vec<T>, _>>()
}

pub fn merge_map_list<K, V>(
mut map: HashMap<K, Vec<V>>,
another: HashMap<K, Vec<V>>,
) -> HashMap<K, Vec<V>>
where
K: Hash + Eq,
{
for (k, v) in another.into_iter() {
match map.get_mut(&k) {
Some(lst) => lst.extend(v),
None => {
map.insert(k, v);
}
}
}
map
}
Loading

0 comments on commit 744893f

Please sign in to comment.