Skip to content

Commit

Permalink
Io plugin (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 8, 2024
1 parent 7680fdd commit a55bed8
Show file tree
Hide file tree
Showing 15 changed files with 620 additions and 12 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ Cargo.lock
.idea/
venv/
target/
rust-toolchain.toml
rust-toolchain.toml
*.so
**/*.pyc
__pycache__/
7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ resolver = "2"
members = [
"example/derive_expression/expression_lib",
"example/extend_polars_python_dispatch/extend_polars",
"example/io_plugin/io_plugin",
"pyo3-polars",
"pyo3-polars-derive",
]
Expand All @@ -14,6 +15,12 @@ polars-ffi = { version = "0.41.0", default-features = false }
polars-plan = { version = "0.41.0", default-feautres = false }
polars-lazy = { version = "0.41.0", default-features = false }

[workspace.dependencies.arrow]
package = "polars-arrow"
version = "0.41.3"
path = "../polars/crates/polars-arrow"
default-features = false

[patch.crates-io]
polars = { git = "https://github.com/pola-rs/polars.git" }
polars-core = { git = "https://github.com/pola-rs/polars.git" }
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright (c) 2020 Ritchie Vink
Copyright (c) 2024 Ritchie Vink

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ struct PigLatinKwargs {
fn pig_latinnify(inputs: &[Series], kwargs: PigLatinKwargs) -> PolarsResult<Series> {
let ca = inputs[0].str()?;
let out: StringChunked =
ca.apply_to_buffer(|value, output| pig_latin_str(value, kwargs.capitalize, output));
ca.apply_into_string_amortized(|value, output| pig_latin_str(value, kwargs.capitalize, output));
Ok(out.into_series())
}
```
Expand Down
14 changes: 8 additions & 6 deletions example/derive_expression/expression_lib/src/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ fn pig_latin_str(value: &str, capitalize: bool, output: &mut String) {
#[polars_expr(output_type=String)]
fn pig_latinnify(inputs: &[Series], kwargs: PigLatinKwargs) -> PolarsResult<Series> {
let ca = inputs[0].str()?;
let out: StringChunked =
ca.apply_to_buffer(|value, output| pig_latin_str(value, kwargs.capitalize, output));
let out: StringChunked = ca.apply_into_string_amortized(|value, output| {
pig_latin_str(value, kwargs.capitalize, output)
});
Ok(out.into_series())
}

Expand Down Expand Up @@ -63,8 +64,9 @@ fn pig_latinnify_with_paralellism(
let ca = inputs[0].str()?;

if context.parallel() {
let out: StringChunked =
ca.apply_to_buffer(|value, output| pig_latin_str(value, kwargs.capitalize, output));
let out: StringChunked = ca.apply_into_string_amortized(|value, output| {
pig_latin_str(value, kwargs.capitalize, output)
});
Ok(out.into_series())
} else {
POOL.install(|| {
Expand All @@ -75,7 +77,7 @@ fn pig_latinnify_with_paralellism(
.into_par_iter()
.map(|(offset, len)| {
let sliced = ca.slice(offset as i64, len);
let out = sliced.apply_to_buffer(|value, output| {
let out = sliced.apply_into_string_amortized(|value, output| {
pig_latin_str(value, kwargs.capitalize, output)
});
out.downcast_iter().cloned().collect::<Vec<_>>()
Expand Down Expand Up @@ -155,7 +157,7 @@ fn append_kwargs(input: &[Series], kwargs: MyKwargs) -> PolarsResult<Series> {
let ca = input.str().unwrap();

Ok(ca
.apply_to_buffer(|val, buf| {
.apply_into_string_amortized(|val, buf| {
write!(
buf,
"{}-{}-{}-{}-{}",
Expand Down
2 changes: 1 addition & 1 deletion example/extend_polars_python_dispatch/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ run: install
source venv/bin/activate && python run.py

run-release: install-release
source venv/bin/activate && python run.py
source venv/bin/activate && python run.py
25 changes: 25 additions & 0 deletions example/io_plugin/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

SHELL=/bin/bash

venv: ## Set up virtual environment
python3 -m venv venv
venv/bin/pip install -r requirements.txt

install: venv
unset CONDA_PREFIX && \
source venv/bin/activate && maturin develop -m io_plugin/Cargo.toml

install-release: venv
unset CONDA_PREFIX && \
source venv/bin/activate && maturin develop --release -m io_plugin/Cargo.toml

clean:
-@rm -r venv
-@cd extend_polars && cargo clean


run: install
source venv/bin/activate && python run.py

run-release: install-release
source venv/bin/activate && python run.py
15 changes: 15 additions & 0 deletions example/io_plugin/io_plugin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "io_plugin"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
name = "io_plugin"
crate-type = ["cdylib"]

[dependencies]
polars = { workspace = true, features = ["fmt", "dtype-date", "timezones", "lazy"], default-features = false }
pyo3 = { version = "0.22.2", features = ["abi3-py38"] }
pyo3-polars = { version = "*", path = "../../../pyo3-polars", features = ["derive", "lazy"] }
rand = { version = "0.8.5", features = [] }
45 changes: 45 additions & 0 deletions example/io_plugin/io_plugin/io_plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from .io_plugin import new_bernoulli, new_uniform, RandomSource
from typing import Any, Iterator
from polars.io.plugins import register_io_source
import polars as pl


def scan_random(samplers: list[Any], size: int = 1000) -> pl.LazyFrame:
def source_generator(
with_columns: list[str] | None,
predicate: pl.Expr | None,
n_rows: int | None,
batch_size: int | None,
) -> Iterator[pl.DataFrame]:
"""
Generator function that creates the source.
This function will be registered as IO source.
"""

new_size = size
if n_rows is not None and n_rows < size:
new_size = n_rows

src = RandomSource(samplers, batch_size, new_size)
if with_columns is not None:
src.set_with_columns(with_columns)

# Set the predicate.
predicate_set = True
if predicate is not None:
try:
src.try_set_predicate(predicate)
except pl.exceptions.ComputeError:
predicate_set = False

while (out := src.next()) is not None:
# If the source could not apply the predicate
# (because it wasn't able to deserialize it), we do it here.
if not predicate_set and predicate is not None:
out = out.filter(predicate)

yield out

# create src again to compute the schema
src = RandomSource(samplers, 0, 0)
return register_io_source(callable=source_generator, schema=src.schema())
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ requires = ["maturin>=1.0,<2.0"]
build-backend = "maturin"

[project]
name = "extend_polars"
requires-python = ">=3.7"
name = "io_plugin"
requires-python = ">=3.10"
classifiers = [
"Programming Language :: Rust",
"Programming Language :: Python :: Implementation :: CPython",
Expand Down
123 changes: 123 additions & 0 deletions example/io_plugin/io_plugin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
mod samplers;

use crate::samplers::PySampler;
use polars::prelude::*;
use pyo3::prelude::*;
use pyo3_polars::error::PyPolarsErr;
use pyo3_polars::{PyDataFrame, PyExpr, PySchema};

#[pyclass]
pub struct RandomSource {
columns: Vec<PySampler>,
size_hint: usize,
n_rows: usize,
predicate: Option<Expr>,
with_columns: Option<Vec<usize>>,
}

#[pymethods]
impl RandomSource {
#[new]
#[pyo3(signature = (columns, size_hint, n_rows))]
fn new_source(
columns: Vec<PySampler>,
size_hint: Option<usize>,
n_rows: Option<usize>,
) -> Self {
let n_rows = n_rows.unwrap_or(usize::MAX);
let size_hint = size_hint.unwrap_or(10_000);

Self {
columns,
size_hint,
n_rows,
predicate: None,
with_columns: None,
}
}

fn schema(&self) -> PySchema {
let schema = self
.columns
.iter()
.map(|s| {
let s = s.0.lock().unwrap();
Field::new(s.name(), s.dtype())
})
.collect::<Schema>();
PySchema(Arc::new(schema))
}

fn try_set_predicate(&mut self, predicate: PyExpr) {
self.predicate = Some(predicate.0);
}

fn set_with_columns(&mut self, columns: Vec<String>) {
let schema = self.schema().0;

let indexes = columns
.iter()
.map(|name| {
schema
.index_of(name.as_ref())
.expect("schema should be correct")
})
.collect();

self.with_columns = Some(indexes)
}

fn next(&mut self) -> PyResult<Option<PyDataFrame>> {
if self.n_rows > 0 {
// Apply projection pushdown.
// This prevents unneeded sampling.
let s_iter = if let Some(idx) = &self.with_columns {
Box::new(idx.iter().copied().map(|i| &self.columns[i]))
as Box<dyn Iterator<Item = _>>
} else {
Box::new(self.columns.iter())
};

let columns = s_iter
.map(|s| {
let mut s = s.0.lock().unwrap();

// Apply slice pushdown.
// This prevents unneeded sampling.
s.next_n(std::cmp::min(self.size_hint, self.n_rows))
})
.collect::<Vec<_>>();

let mut df = DataFrame::new(columns).map_err(PyPolarsErr::from)?;
self.n_rows = self.n_rows.saturating_sub(self.size_hint);

// Apply predicate pushdown.
// This is done after the fact, but there could be sources where this could be applied
// lower.
if let Some(predicate) = &self.predicate {
df = df
.lazy()
.filter(predicate.clone())
._with_eager(true)
.collect()
.map_err(PyPolarsErr::from)?;
}

Ok(Some(PyDataFrame(df)))
} else {
Ok(None)
}
}
}

#[pymodule]
fn io_plugin(m: &Bound<PyModule>) -> PyResult<()> {
m.add_class::<RandomSource>().unwrap();
m.add_class::<PySampler>().unwrap();
m.add_wrapped(wrap_pyfunction!(samplers::new_bernoulli))
.unwrap();
m.add_wrapped(wrap_pyfunction!(samplers::new_uniform))
.unwrap();

Ok(())
}
Loading

0 comments on commit a55bed8

Please sign in to comment.