Skip to content

Commit

Permalink
feat(listeners): decouple tcp server from kanata struct
Browse files Browse the repository at this point in the history
  • Loading branch information
LGUG2Z authored and jtroo committed Jul 19, 2022
1 parent 48fba22 commit ff87be4
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 94 deletions.
34 changes: 17 additions & 17 deletions src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ fn parse_cfg_raw(
.iter()
.filter(&deflayer_filter)
.collect::<Vec<_>>();

if layer_exprs.is_empty() {
bail!("No deflayer expressions exist. At least one layer must be defined.")
}
Expand All @@ -257,21 +258,23 @@ fn parse_cfg_raw(
}

let layer_idxs = parse_layer_indexes(&layer_exprs, mapping_order.len())?;
let mut sorted_idxs: Vec<(String, usize)> = layer_idxs
.iter()
.map(|tuple| (tuple.0.clone(), tuple.1.clone()))
.collect();
sorted_idxs.sort_by(|&(_, a), &(_, b)| a.cmp(&b));
let mut sorted_idxs: Vec<(&String, &usize)> =
layer_idxs.iter().map(|tuple| (tuple.0, tuple.1)).collect();

let layer_names: Vec<String> = sorted_idxs
.iter()
.map(|(name, _)| name.clone())
sorted_idxs.sort_by_key(|f| f.1);

#[allow(clippy::needless_collect)]
// Clippy suggests using the sorted_idxs iter directly and manipulating it
// to produce the layer_names vec when creating Vec<LayerInfo> below
let layer_names = sorted_idxs
.into_iter()
.map(|(name, _)| (*name).clone())
.flat_map(|s| {
// Duplicate the same layer for `layer_strings` because the keyberon layout itself has
// two versions of each layer.
std::iter::repeat(s).take(2)
})
.collect();
.collect::<Vec<_>>();

let layer_strings = root_expr_strs
.into_iter()
Expand All @@ -284,14 +287,11 @@ fn parse_cfg_raw(
})
.collect::<Vec<_>>();

let mut layer_info = vec![];

for (name, cfg_text) in layer_names.iter().zip(layer_strings) {
layer_info.push(LayerInfo {
name: name.clone(),
cfg_text,
})
}
let layer_info: Vec<LayerInfo> = layer_names
.into_iter()
.zip(layer_strings)
.map(|(name, cfg_text)| LayerInfo { name, cfg_text })
.collect();

let alias_exprs = root_exprs
.iter()
Expand Down
92 changes: 21 additions & 71 deletions src/kanata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,70 +7,23 @@ use crossbeam_channel::{Receiver, Sender, TryRecvError};
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::io::Write;
use std::net::{TcpListener, TcpStream};
use std::net::TcpStream;
use std::path::PathBuf;
use std::time;

use parking_lot::Mutex;
use std::sync::Arc;

use crate::cfg::LayerInfo;
use crate::custom_action::*;
use crate::keys::*;
use crate::oskbd::*;
use crate::tcp_server::EventNotification;
use crate::{cfg, ValidatedArgs};

use kanata_keyberon::key_code::*;
use kanata_keyberon::layout::*;

#[derive(Debug, Serialize)]
pub enum EventNotification {
LayerChange { new: String },
}

impl EventNotification {
pub fn as_bytes(&self) -> Result<Vec<u8>> {
Ok(serde_json::to_string(self)?.as_bytes().to_vec())
}
}

pub struct NotificationServer {
pub port: i32,
pub connections: Arc<Mutex<HashMap<String, TcpStream>>>,
}

impl NotificationServer {
pub fn new(port: i32) -> Self {
let server = Self {
port,
connections: Arc::new(Mutex::new(HashMap::new())),
};

server
}

pub fn start(&mut self) {
let listener = TcpListener::bind(format!("0.0.0.0:{}", self.port))
.expect("Could not start the server");

let cl = self.connections.clone();
std::thread::spawn(move || {
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let addr = stream
.peer_addr()
.expect("could not find peer address")
.to_string();

cl.lock().insert(addr, stream);
}
Err(_) => log::error!("not able to accept client connection"),
}
}
});
}
}

pub struct Kanata {
pub kbd_in_path: PathBuf,
pub kbd_out: KbdOut,
Expand All @@ -81,13 +34,10 @@ pub struct Kanata {
pub prev_keys: Vec<KeyCode>,
pub layer_info: Vec<LayerInfo>,
pub prev_layer: usize,
pub server: NotificationServer,
last_tick: time::Instant,
}

use crate::cfg::LayerInfo;
use once_cell::sync::Lazy;
use serde::Serialize;

static MAPPED_KEYS: Lazy<Mutex<cfg::MappedKeys>> = Lazy::new(|| Mutex::new([false; 256]));

Expand All @@ -96,7 +46,7 @@ static PRESSED_KEYS: Lazy<Mutex<HashSet<OsCode>>> = Lazy::new(|| Mutex::new(Hash

impl Kanata {
/// Create a new configuration from a file.
pub fn new(args: ValidatedArgs) -> Result<Self> {
pub fn new(args: &ValidatedArgs) -> Result<Self> {
let cfg = cfg::Cfg::new_from_file(&args.path)?;

let kbd_out = match KbdOut::new() {
Expand All @@ -116,25 +66,22 @@ impl Kanata {
#[cfg(target_os = "windows")]
let kbd_in_path = "unused".into();

let mut server = NotificationServer::new(args.port);
server.start();
Ok(Self {
kbd_in_path,
kbd_out,
cfg_path: args.path,
cfg_path: args.path.clone(),
mapped_keys: cfg.mapped_keys,
key_outputs: cfg.key_outputs,
layout: cfg.layout,
layer_info: cfg.layer_info,
prev_keys: Vec::new(),
prev_layer: 0,
server,
last_tick: time::Instant::now(),
})
}

/// Create a new configuration from a file, wrapped in an Arc<Mutex<_>>
pub fn new_arc(args: ValidatedArgs) -> Result<Arc<Mutex<Self>>> {
pub fn new_arc(args: &ValidatedArgs) -> Result<Arc<Mutex<Self>>> {
Ok(Arc::new(Mutex::new(Self::new(args)?)))
}

Expand All @@ -151,7 +98,7 @@ impl Kanata {
}

/// Advance keyberon layout state and send events based on changes to its state.
fn handle_time_ticks(&mut self, tx: &Sender<EventNotification>) -> Result<()> {
fn handle_time_ticks(&mut self, tx: &Option<Sender<EventNotification>>) -> Result<()> {
let now = time::Instant::now();
let ms_elapsed = now.duration_since(self.last_tick).as_millis();

Expand Down Expand Up @@ -310,17 +257,19 @@ impl Kanata {
Ok(())
}

fn check_handle_layer_change(&mut self, tx: &Sender<EventNotification>) {
fn check_handle_layer_change(&mut self, tx: &Option<Sender<EventNotification>>) {
let cur_layer = self.layout.current_layer();
if cur_layer != self.prev_layer {
let new = self.layer_info[cur_layer].name.clone();
self.prev_layer = cur_layer;
self.print_layer(cur_layer);

match tx.try_send(EventNotification::LayerChange { new }) {
Ok(_) => {}
Err(error) => {
log::error!("could not sent event notification: {}", error);
if let Some(tx) = tx {
match tx.try_send(EventNotification::LayerChange { new }) {
Ok(_) => {}
Err(error) => {
log::error!("could not sent event notification: {}", error);
}
}
}
}
Expand All @@ -330,17 +279,18 @@ impl Kanata {
log::info!("Entered layer:\n{}", self.layer_info[layer].cfg_text);
}

pub fn start_notification_loop(kanata: Arc<Mutex<Self>>, rx: Receiver<EventNotification>) {
info!("Kanata: entering the event notification loop");
pub fn start_notification_loop(
rx: Receiver<EventNotification>,
clients: Arc<Mutex<HashMap<String, TcpStream>>>,
) {
info!("Kanata: listening for event notifications to relay to connected clients");
std::thread::spawn(move || {
loop {
match rx.recv() {
Err(_) => {
panic!("channel disconnected")
}
Ok(event) => {
let k = kanata.lock();

let notification = match event.as_bytes() {
Ok(serialized_notification) => serialized_notification,
Err(error) => {
Expand All @@ -352,7 +302,7 @@ impl Kanata {
}
};

let mut clients = k.server.connections.lock();
let mut clients = clients.lock();
let mut stale_clients = vec![];
for (id, client) in &mut *clients {
match client.write(&notification) {
Expand Down Expand Up @@ -380,7 +330,7 @@ impl Kanata {
pub fn start_processing_loop(
kanata: Arc<Mutex<Self>>,
rx: Receiver<KeyEvent>,
tx: Sender<EventNotification>,
tx: Option<Sender<EventNotification>>,
) {
info!("Kanata: entering the processing loop");
std::thread::spawn(move || {
Expand Down
27 changes: 21 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ mod kanata;
mod keys;
mod layers;
mod oskbd;
mod tcp_server;

use clap::Parser;
use kanata::Kanata;
use tcp_server::NotificationServer;

type CfgPath = PathBuf;

pub struct ValidatedArgs {
path: CfgPath,
port: i32,
port: Option<i32>,
}

#[derive(Parser, Debug)]
Expand All @@ -28,8 +30,8 @@ struct Args {
cfg: String,

/// Port to run the notification server on
#[clap(short, long, default_value = "35948")]
port: i32,
#[clap(short, long)]
port: Option<i32>,

/// Enable debug logging
#[clap(short, long)]
Expand Down Expand Up @@ -68,7 +70,7 @@ fn cli_init() -> Result<ValidatedArgs> {
}

fn main_impl(args: ValidatedArgs) -> Result<()> {
let kanata_arc = Kanata::new_arc(args)?;
let kanata_arc = Kanata::new_arc(&args)?;
info!("Kanata: config parsed");
info!("Sleeping for 2s. Please release all keys and don't press additional ones.");

Expand All @@ -77,10 +79,23 @@ fn main_impl(args: ValidatedArgs) -> Result<()> {
// The reason for two different event loops is that the "event loop" only listens for keyboard
// events, which it sends to the "processing loop". The processing loop handles keyboard events
// while also maintaining `tick()` calls to keyberon.

let (server, ntx, nrx) = if let Some(port) = args.port {
let mut server = NotificationServer::new(port);
server.start();
let (ntx, nrx) = crossbeam_channel::bounded(10);
(Some(server), Some(ntx), Some(nrx))
} else {
(None, None, None)
};

let (tx, rx) = crossbeam_channel::bounded(10);
let (ntx, nrx) = crossbeam_channel::bounded(10);
Kanata::start_processing_loop(kanata_arc.clone(), rx, ntx);
Kanata::start_notification_loop(kanata_arc.clone(), nrx);

if let (Some(server), Some(nrx)) = (server, nrx) {
Kanata::start_notification_loop(nrx, server.connections);
}

Kanata::event_loop(kanata_arc, tx)?;

Ok(())
Expand Down
53 changes: 53 additions & 0 deletions src/tcp_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use anyhow::Result;
use parking_lot::Mutex;
use serde::Serialize;
use std::collections::HashMap;
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;

#[derive(Debug, Serialize)]
pub enum EventNotification {
LayerChange { new: String },
}

impl EventNotification {
pub fn as_bytes(&self) -> Result<Vec<u8>> {
Ok(serde_json::to_string(self)?.as_bytes().to_vec())
}
}

pub struct NotificationServer {
pub port: i32,
pub connections: Arc<Mutex<HashMap<String, TcpStream>>>,
}

impl NotificationServer {
pub fn new(port: i32) -> Self {
Self {
port,
connections: Arc::new(Mutex::new(HashMap::new())),
}
}

pub fn start(&mut self) {
let listener = TcpListener::bind(format!("0.0.0.0:{}", self.port))
.expect("could not start the tcp server");

let cl = self.connections.clone();
std::thread::spawn(move || {
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let addr = stream
.peer_addr()
.expect("could not find peer address")
.to_string();

cl.lock().insert(addr, stream);
}
Err(_) => log::error!("not able to accept client connection"),
}
}
});
}
}

0 comments on commit ff87be4

Please sign in to comment.