Skip to content

Commit

Permalink
feat: Add scripting support
Browse files Browse the repository at this point in the history
  • Loading branch information
obaraelijah committed Jul 28, 2024
1 parent ec434f5 commit da3aa73
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 1 deletion.
15 changes: 15 additions & 0 deletions scripts/semaphore.x9
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
(def-redis-fn sum-list
(li)
(apply + (map int (redis "lrange" li 0 -1))))

(defn any?
(p l)
(reduce (fn (a b) (or a b)) (map p l)))

(def-redis-fn is-prime?
(key)
(bind
(n (int (redis "get" key)))
(not
(any? (fn (x) (= 0 (% n x)))
(range 2 n)))))
86 changes: 85 additions & 1 deletion src/scripting.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
use crate::server::process_command;
use crate::types::RedisValueRef;
use num_traits::cast::ToPrimitive;
use std::{error::Error, sync::Arc};
use tokio::sync::mpsc::{Receiver, Sender};

use crate::startup::Config;
use crate::types::DumpFile;
use crate::types::RedisValueRef;
use crate::{logger::LOGGER, types::StateStoreRef};
use x9::ast::Expr;
use x9::ffi::{ForeignData, IntoX9Function, Variadic, X9Interpreter};

fn bytes_to_string(s: &[u8]) -> String {
String::from_utf8_lossy(s).to_string()
}

struct FFIError {
reason: String,
}
Expand Down Expand Up @@ -78,3 +85,80 @@ impl ForeignData for RedisValueRef {
Ok(res)
}
}
#[derive(Debug)]
pub enum Program {
String(String),
Function(String, Vec<RedisValueRef>),
}

pub struct ScriptingEngine {
interpreter: X9Interpreter,
#[allow(clippy::type_complexity)]
prog_revc: Receiver<(
Program,
OneShotSender<Result<RedisValueRef, Box<dyn Error + Send>>>,
)>,
// prog_send: Sender<Result<RedisValueRef, Box<dyn Error + Send>>>,
cmd_send: Arc<Sender<(Vec<RedisValueRef>, OneShotSender<RedisValueRef>)>>,
}

impl ScriptingEngine {
pub fn new(
prog_revc: Receiver<(
Program,
OneShotSender<Result<RedisValueRef, Box<dyn Error + Send>>>,
)>,
cmd_send: Sender<(Vec<RedisValueRef>, OneShotSender<RedisValueRef>)>,
state_store: StateStoreRef,
opts: &Config,
) -> Result<Self, Box<dyn Error>> {
let res = Self {
interpreter: X9Interpreter::new(),
prog_revc,
cmd_send: Arc::new(cmd_send),
};
res.setup_interpreter(state_store);
res.load_scripts_dir(opts)?;
Ok(res)
}

fn load_scripts_dir(&self, opts: &Config) -> Result<(), Box<dyn Error>> {
if let Some(path) = &opts.scripts_dir {
info!(LOGGER, "Loading scripts in {:?}", path);
self.interpreter.load_lib_dir(path)
} else {
Ok(())
}
}

fn add_redis_fn(&self) {
let send_clone = self.cmd_send.clone();
let send_fn = move |args: Variadic<RedisValueRef>| {
let args = args.into_vec();
let (sx, mut rx) = oneshot_channel();
if let Err(e) = send_clone.blocking_send((args, sx)) {
return Err(FFIError::boxed(format!(
"redis-proto failed to send the command: {}",
e
)));
}
loop {
match rx.try_recv() {
Ok(ret_value) => return Ok(ret_value),
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Closed) => {
return Err(FFIError::boxed(
"redix-proto failed to return a value!".into(),
))
}
}
}
};
self.interpreter.add_function("redis", send_fn.to_x9_fn());
}

fn setup_interpreter(&self, state_store: StateStoreRef) {
// "redis"
self.add_redis_fn();
}
}

0 comments on commit da3aa73

Please sign in to comment.