Skip to content

Commit

Permalink
feat(tcp): add keep-alive, split client/server messages
Browse files Browse the repository at this point in the history
This commit sets a 30-second keep-alive on opened TcpStreams (using
net2::TcpStreamExt as keep-alive functionality is not currently in the
standard library) and splits the previous EventNotification enum into a
ServerMessage and a ClientMessage enum respectively.

I have changed the semantics between ServerMessage and ClientMessage
slightly so that ClientMessage variants are imperative requests that
align closer with the names of the handler functions, ie. ChangeLayer
calls fn change_layer().

Whenever a client's TcpStream cannot be written to, either because it
has notified the server of a disconnect or because it has failed the
keep-alive, it will be removed from the connections HashMap on the
TcpServer struct.

re jtroo#47
  • Loading branch information
LGUG2Z committed Jul 21, 2022
1 parent a8270fd commit 469ccd1
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 21 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ once_cell = "1"
kanata-keyberon = "0.2.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
net2 = "0.2"

[target.'cfg(target_os = "linux")'.dependencies]
evdev-rs = "0.5.0"
Expand Down
14 changes: 7 additions & 7 deletions src/kanata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::cfg::LayerInfo;
use crate::custom_action::*;
use crate::keys::*;
use crate::oskbd::*;
use crate::tcp_server::EventNotification;
use crate::tcp_server::ServerMessage;
use crate::{cfg, ValidatedArgs};

use kanata_keyberon::key_code::*;
Expand Down Expand Up @@ -98,7 +98,7 @@ impl Kanata {
}

/// Advance keyberon layout state and send events based on changes to its state.
fn handle_time_ticks(&mut self, tx: &Option<Sender<EventNotification>>) -> Result<()> {
fn handle_time_ticks(&mut self, tx: &Option<Sender<ServerMessage>>) -> Result<()> {
let now = time::Instant::now();
let ms_elapsed = now.duration_since(self.last_tick).as_millis();

Expand Down Expand Up @@ -266,15 +266,15 @@ impl Kanata {
}
}

fn check_handle_layer_change(&mut self, tx: &Option<Sender<EventNotification>>) {
fn check_handle_layer_change(&mut self, tx: &Option<Sender<ServerMessage>>) {
let cur_layer = self.layout.current_layer();
if cur_layer != self.prev_layer {
let new = self.layer_info[cur_layer].name.clone();
self.prev_layer = cur_layer;
self.print_layer(cur_layer);

if let Some(tx) = tx {
match tx.try_send(EventNotification::LayerChange { new }) {
match tx.try_send(ServerMessage::LayerChange { new }) {
Ok(_) => {}
Err(error) => {
log::error!("could not sent event notification: {}", error);
Expand All @@ -289,7 +289,7 @@ impl Kanata {
}

pub fn start_notification_loop(
rx: Receiver<EventNotification>,
rx: Receiver<ServerMessage>,
clients: Arc<Mutex<HashMap<String, TcpStream>>>,
) {
info!("Kanata: listening for event notifications to relay to connected clients");
Expand Down Expand Up @@ -321,12 +321,12 @@ impl Kanata {
Err(_) => {
// the client is no longer connected, let's remove them
stale_clients.push(id.clone());
log::debug!("removing disconnected notification client");
}
}
}

for id in &stale_clients {
log::warn!("removing disconnected tcp client: {id}");
clients.remove(id);
}
}
Expand All @@ -339,7 +339,7 @@ impl Kanata {
pub fn start_processing_loop(
kanata: Arc<Mutex<Self>>,
rx: Receiver<KeyEvent>,
tx: Option<Sender<EventNotification>>,
tx: Option<Sender<ServerMessage>>,
) {
info!("Kanata: entering the processing loop");
std::thread::spawn(move || {
Expand Down
39 changes: 25 additions & 14 deletions src/tcp_server.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
use crate::Kanata;
use anyhow::Result;
use net2::TcpStreamExt;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::{BufReader, Read};
use std::io::Read;
use std::net::{TcpListener, TcpStream};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

#[derive(Debug, Serialize, Deserialize)]
pub enum EventNotification {
pub enum ServerMessage {
LayerChange { new: String },
}

impl EventNotification {
#[derive(Debug, Serialize, Deserialize)]
pub enum ClientMessage {
ChangeLayer { new: String },
}

impl ServerMessage {
pub fn as_bytes(&self) -> Result<Vec<u8>> {
Ok(serde_json::to_string(self)?.as_bytes().to_vec())
}
}

impl FromStr for EventNotification {
impl FromStr for ClientMessage {
type Err = serde_json::Error;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
Expand All @@ -44,37 +51,41 @@ impl TcpServer {
let listener = TcpListener::bind(format!("0.0.0.0:{}", self.port))
.expect("could not start the tcp server");

let cl = self.connections.clone();
let connections = self.connections.clone();

std::thread::spawn(move || {
for stream in listener.incoming() {
match stream {
Ok(stream) => {
stream
.set_keepalive(Some(Duration::from_secs(30)))
.expect("could not set tcp connection keepalive");

let addr = stream
.peer_addr()
.expect("could not find peer address")
.to_string();

{
cl.lock().insert(addr.clone(), stream);
connections.lock().insert(addr.clone(), stream);
}

if let Some(stream) = cl.lock().get(&addr) {
let stream = stream
if let Some(stream) = connections.lock().get(&addr) {
let mut stream = stream
.try_clone()
.expect("could not clone tcpstream to read incoming messages");

let k_cl = kanata.clone();
std::thread::spawn(move || {
log::info!("listening for incoming messages {}", &addr);
loop {
let mut buffer: [u8; 1024] = [0; 1024];
let mut reader = BufReader::new(&stream);
if let Ok(size) = reader.read(&mut buffer) {
if let Ok(event) = EventNotification::from_str(
&String::from_utf8_lossy(&buffer[..size]),
let mut buf = vec![0; 1024];
if let Ok(size) = stream.read(&mut buf) {
if let Ok(event) = ClientMessage::from_str(
&String::from_utf8_lossy(&buf[..size]),
) {
match event {
EventNotification::LayerChange { new } => {
ClientMessage::ChangeLayer { new } => {
k_cl.lock().change_layer(new);
}
}
Expand Down

0 comments on commit 469ccd1

Please sign in to comment.